前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Flink集成iceberg数据湖之合并小文件

Flink集成iceberg数据湖之合并小文件

作者头像
大数据技术与应用实战
发布2020-11-25 15:20:51
4.4K0
发布2020-11-25 15:20:51
举报
文章被收录于专栏:大数据技术与应用实战
  • 背景
  • 使用
    • 流式数据入湖
    • 开启压缩程序
    • 快照过期
    • 删除无用文件
    • 数据查询
  • 遇到的坑
    • 最大并发度问题
    • 文件被重复压缩
    • 扫描任务读取文件问题
    • 不读取大文件
    • 优化生成CombinedScanTask
  • 后续问题
    • 数据管理
    • 迁移问题
    • presto查询性能优化
  • 总结

背景

在传统的实时数仓中,由于列式存储相对行式存储有较高的查询性能,我们一般采用orc,parquet数据格式,但是这种列式格式无法追加,流式数据又不能等候太长时间,等到文件够了一个hdfs block块大小再写入,所以不可避免的产生了一个令人头大的问题,即小文件问题,由于使用小文件会增加namenode的压力,并且影响查询性能,所以我们在使用流式数据入库的时候一般会对小文件进行合并处理。

但是传统的流式数据入库的过程中对小文件进行合并会产生很多问题,比如流式数据不断的往hive表进行写入,如果同时有一个合并程序进行小文件的合并,那么这时候对同一份数据进行读写。会不会产生问题。如何保证事务,出错了怎么回滚呢,这些都是很棘手的问题。

我们的流任务以flink为主,查询引擎是presto,所以调研以后,我决定引入iceberg来解决小文件合并的问题。

使用

流式数据入湖

我们主要的数据来源是kafka,flink的任务主要就是消费kafka的数据,然后处理以后发送到iceberg,任务主要是以sql为主,也有部分jar包的任务,提交的方式主要是使用zeppelin来提交,使用zeppelin提交sql任务是使用的其自带的功能,提交jar包是我自己写了一个插件。

开启压缩程序

目前社区提供了一个spark版本的合并小文件的Action,我们的环境以flink为主,所以我参考spark版本把这个压缩程序改了一个flink版本,并经过测试,进行了多处bug修改和优化。目前社区新发布的1.10版本中没有带这个功能,我自己基于master分支打了一个jar,并且里面包含了flink 版本压缩小文件的程序,以及所有的优化,需要的朋友,可以 到这下载一下,https://github.com/zhangjun0x01/bigdata-examples/blob/master/iceberg/libs/iceberg-flink-runtime-0.9.1.jar 。社区版本我觉得应该会在下一个版本发布。

这个压缩程序是单独启动的一个shell任务,逻辑就是先把iceberg表进行一次压缩。然后sleep五分钟。然后再启动压缩,是一个死循环任务。

之所以没有采取定时任务,是因为如果五分钟一个定时任务来压缩,那么如果五分钟之内没有压缩完成,或者压缩程序出现异常,导致本次压缩没完成的时候,下一个定时任务又起来了,就会把上次没有压缩完的数据一起压缩,这样就导致任务量就增大了,以后的任务压缩的文件越积累越多

代码语言:javascript
复制
Table table = ..............
RewriteDataFilesActionResult result =
	Actions.forTable(env, table)
	       .rewriteDataFiles()
	       .maxParallelism(10)
	       .targetSizeInBytes(128*1024*1024)
		//.filter(Expressions.equal("day", day))
		   .execute();
		   

快照过期

目前我们的应用场景只需要查询当前数据就可以了,不需要查询历史数据,所以我只保留了最新的快照。在每次压缩程序之后,做了处理,使当前快照时间以前的快照过期。程序会自动删除以前的过期数据文件.

代码语言:javascript
复制
	Snapshot snapshot = table.currentSnapshot();
			if (snapshot != null){
				long time = snapshot.timestampMillis();
				table.expireSnapshots()
				     .expireOlderThan(time)
				     .commit();
			}

删除无用文件

我发现使程序的快照过期的代码并没有删除metadata里面的metadata.json结尾的json文件,所以在这之外,我单独启动了一个spark 任务来删除这些文件,(后续有时间可以改造成flink版本的)。

这个程序默认会删除三天之前的数据,我觉得对我来说可能不需要,我设置了删除一个小时之前的旧数据,但是有一点要强调,就是这个不能像快照过期一样,删除当前快照以前的数据,因为目前有入湖的流式数据,和压缩程序在同时操作一个表,如果该程序在删除无用文件的同时,其他两个程序很有可能正在读取或者写入,这样会导致删除了一些元数据文件,其他两个程序报错。

代码语言:javascript
复制
	long olderThanTimestamp = System.currentTimeMillis() - TimeUnit.HOURS.toMillis(1);

	TableIdentifier identifier = TableIdentifier.of("db_name","table_name");
	org.apache.iceberg.Table table = catalog.loadTable(identifier);
	List<String> list = Actions.forTable(spark, table)
	                           .removeOrphanFiles()
	                           .olderThan(olderThanTimestamp)
	                           .execute();
	list.stream().forEach(System.out::println);
			

数据查询

我们使用的查询工具是presto,presto查询hive改成查询iceberg很简单,只需要添加一个catalog就可以了。

我们使用的是prestosql 331版本,其他的版本我没有做过测试

遇到的坑

最大并发度问题

目前系统是将扫描任务合并成任务,默认的并发度是合成任务的个数,但是当某一个表的分区数据比较多的时候,那么这个压缩任务的并发度可能会非常大,比如好几百,所以这样就会占用非常大的资源,为了避免这种压缩任务占用过多的资源而影响线上正常的任务,我们给他提供一个最大的并行度的设置,如果没有超过这个并行度就用系统的默认策略,超过了之后,就使用我们提供的最大并行度的参数

文件被重复压缩

比如我们设置压缩的大小128M,这个分区下面如果我们有三个120M大小的文件,那么压缩的时候这三个120M文件会被读取。然后再重新生成三个和原来旧数据一样大小的新文件,我觉得这个是无用且浪费资源的,所以我们也进行了改造,这种文件就不需要压缩了。

扫描任务读取文件问题

这个我在测试的时候发现。采用任务的默认读取大小,也就是读取大小是128M,压缩出来的文件使用hdfs命令查看的时候,发现才十几兆,和实际的128M相差太远,通过debug源码发现,扫描任务在扫描文件的时候读取的128的大小是按照实际读取的数据大小读取的,也就是压缩之前的数据,而我们这个orc文件是经过压缩的。

这个只有orc文件格式会出现这个问题,这个是一个bug,已经修复。https://github.com/apache/iceberg/issues/1666

不读取大文件

比如我们设置的目标大小128M,但是如果有文件超过了128M,那么压缩的时候这种文件就不需要读取了,这块也做了优化。

优化生成CombinedScanTask

默认情况下,系统是依次遍历查询到的数据文件,然后累加,直到达到target file size,比如有如下大小的数据文件,20M, 20M, 20M, 70M, 100M,目标文件大小是128M,系统将会生成三个CombinedScanTask:(20M, 20M, 20M), (70M), (100M),很显然,如果生成两个将会更加合理,(20M, 20M, 70M), (20M, 100M),这个是一个优化点,我还没有来得及做。

https://github.com/apache/iceberg/issues/1667

后续问题

数据管理

压缩完一个分区的数据,我想看看当前快照下面有多少个文件,每个文件大小是多少,是否符合我的预期,但是目前系统没有一个合适的工具,如果直接看data目录的数据文件,不知道哪些文件属于当前快照,我们需要通过api来写代码做查看。

我觉得后续我们需要一个最好是可视化的工具来方便的管理和查询iceberg表。

迁移问题

我们在测试过可以使用iceberg以后,如何将以前的hive表迁移成iceberg表呢,新建一个iceberg table,然后写批任务导入?如果我们有非常多的hive表,或者有的表下面分区比较多,这个时候怎么弄呢?如果写批任务导入将是一个巨大的工程,我现在是自己写了一个工具类,不过我觉得应该把它做成一个管理工具,更方便用户使用。

presto查询性能优化

对于一些相对较大的hive表,迁移到iceberg表之后,使用presto查询的时候,我发现速度变慢了,理论上查询iceberg比hive少了一层list操作,应该会快一些,这个不知道是我配置的问题,还是presto和iceberg集成的问题,需要排查一下。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2020-11-23,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 大数据技术与应用实战 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 背景
  • 使用
    • 流式数据入湖
      • 开启压缩程序
        • 快照过期
          • 删除无用文件
            • 数据查询
            • 遇到的坑
              • 最大并发度问题
                • 文件被重复压缩
                  • 扫描任务读取文件问题
                    • 不读取大文件
                      • 优化生成CombinedScanTask
                      • 后续问题
                        • 数据管理
                          • 迁移问题
                            • presto查询性能优化
                            相关产品与服务
                            文件存储
                            文件存储(Cloud File Storage,CFS)为您提供安全可靠、可扩展的共享文件存储服务。文件存储可与腾讯云服务器、容器服务、批量计算等服务搭配使用,为多个计算节点提供容量和性能可弹性扩展的高性能共享存储。腾讯云文件存储的管理界面简单、易使用,可实现对现有应用的无缝集成;按实际用量付费,为您节约成本,简化 IT 运维工作。
                            领券
                            问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档