前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Flink集成数据湖之实时数据写入iceberg

Flink集成数据湖之实时数据写入iceberg

作者头像
大数据技术与应用实战
修改2021-02-07 19:13:12
6.2K5
修改2021-02-07 19:13:12
举报
文章被收录于专栏:大数据技术与应用实战

背景

随着大数据处理结果的实时性要求越来越高,越来越多的大数据处理从离线转到了实时,其中以flink为主的实时计算在大数据处理中占有重要地位。

Flink消费kafka等实时数据流。然后实时写入hive,在大数据处理方面有着广泛的应用。此外由于列式存储格式如parquet或者orc在查询性能方面有着显著的提高,所以大家都会优先选择列式存储作为我们的存储格式。

传统的这种架构看似不错,但是还是有很多没有解决的问题:

  • 实时写入造成大量小文件,需要单独的程序来进行合并
  • 实时的写入,读取,还有合并小文件在同时进行,那么如何保证事务,读取数据的时候不会出现脏读。
  • Hdfs的数据一般是一次写入。多次读写,但是如果因为程序出错导致数据错了,确实要修改某一条数据改怎么办
  • 消费kafka的数据落地到hive,有一天kafka的数据多了几个字段,如何同步到hive?必须删了重建吗?
  • 订单等业务数据一般存储在传统数据库,如mysql等。如何实时同步这些cdc数据到hive仓库呢,包括ddl和dml

如果你有上面的需求,那么你可以考虑一下数据湖了,目前开源的数据湖技术主要有以下几个:delta、hudi、iceberg,但是侧重点有所不同,我上面说的问题也不完全都能实现,但是这些都是数据湖要做的东西,随着社区的不断发展,这些功能都会有的。

但是目前世面上这些数据湖技术都与spark紧密绑定。而我们目前实时计算主要以flink为主,而且我个人觉得未来实时计算也将以flink为主,所以我选择了iceberg为我们的数据湖,虽然他有一些功能不是很完善,但是有着良好的抽象,并且不强制绑定spark,所以对于iceberg没有的功能,我们可以自己给补全,再回馈给社区,一起成长。

iceberg简介

其实对于iceberg,官方的定义是一种表格式。

Apache Iceberg is an open table format for huge analytic datasets. Iceberg adds tables to Presto and Spark that use a high-performance format that works just like a SQL table.

我们可以简单理解为他是基于计算层(flink , spark)和存储层(orc,parqurt)的一个中间层,我们在hive建立一个iceberg格式的表。用flink或者spark写入iceberg,然后再通过其他方式来读取这个表,比如spark,flink,presto等。

当然数据湖的概念远不止这点,我们今天就先简单的这么理解,后续写一篇文章专门介绍一下iceberg。

flink实时写入

准备sql client环境

目前官方的测试版本是基于scala 2.12版本的flink。所以我们也用和官方同步的版本来测试下,下载下面的两个jar放到flink的lib下面,然后启动一下flink集群,standalone模式。

  • 下载flink :flink-1.11.2-bin-scala_2.12.tgz
  • 下载 iceberg-flink-runtime-xxx.jar
  • 下载flink 集成hive的connector,flink-sql-connector-hive-2.3.6_2.12-1.11.2.jar
  • 目前官方的hive测试版本是 2.3.7,其他的版本可能有不兼容

注意要配置flink的checkpoint,因为目前flink提交iceberg的信息是在每次checkpoint的时候提交的。在sql client配置checkpoint的方法如下:

在flink-conf.yaml添加如下配置

代码语言:javascript
复制
execution.checkpointing.interval: 10s   # checkpoint间隔时间
execution.checkpointing.tolerable-failed-checkpoints: 10  # checkpoint 失败容忍次数

创建catalog

目前系统提供的catalog有hivecatalog和hadoopcatalog以及自定义catlog

代码语言:javascript
复制
CREATE CATALOG iceberg WITH (
  'type'='iceberg',
  'catalog-type'='hive',
  'uri'='thrift://localhost:9083',
  'clients'='5',
  'property-version'='1',
  'warehouse'='hdfs://nn:8020/warehouse/path'
);

执行完之后,显示如下:

代码语言:javascript
复制
Flink SQL> show catalogs;
default_catalog
iceberg

如果不想每次启动sql client都重新执行ddl,可以在sql-client-defaults.yaml 里面皮遏制一下:

代码语言:javascript
复制
catalogs: # empty list
# A typical catalog definition looks like:
  - name: hive
    type: hive
    hive-conf-dir: /Users/user/work/hive/conf
    default-database: default
  - name: iceberg
    type: iceberg
    warehouse: hdfs://localhost/user/hive2/warehouse
    uri: thrift://localhost:9083
    catalog-type: hive

创建db

代码语言:javascript
复制
use catalog iceberg;
CREATE DATABASE iceberg_db;
USE iceberg_db;

创建table

代码语言:javascript
复制
CREATE TABLE iceberg.iceberg_db.iceberg_001 (
    id BIGINT COMMENT 'unique id',
    data STRING
) WITH ('connector'='iceberg','write.format.default'='ORC');

插入数据

我们依然创建一个datagen的connector。

代码语言:javascript
复制
CREATE TABLE sourceTable (
 userid int,
 f_random_str STRING
) WITH (
 'connector' = 'datagen',
 'rows-per-second'='100',
 'fields.userid.kind'='random',
 'fields.userid.min'='1',
 'fields.userid.max'='100',
'fields.f_random_str.length'='10'
)

这时候我们看到有两个表了

代码语言:javascript
复制
Flink SQL> show tables;
iceberg_001
sourcetable

然后执行insert into插入数据:

代码语言:javascript
复制
insert into iceberg.iceberg_db.iceberg_001 select * from iceberg.iceberg_db.sourceTable

查询

我们这里使用presto来查询

presto的配置iceberg.properties 如下:

代码语言:javascript
复制
connector.name=iceberg
hive.metastore.uri=thrift://localhost:9083

代码版本

代码语言:javascript
复制
public class Flink2Iceberg{

	public static void main(String[] args) throws Exception{
		StreamExecutionEnvironment env =
				StreamExecutionEnvironment.getExecutionEnvironment();
		env.setParallelism(1);
		env.enableCheckpointing(10000);
		StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
		tenv.executeSql("CREATE CATALOG iceberg WITH (\n" +
		                "  'type'='iceberg',\n" +
		                "  'catalog-type'='hive'," +
		                "  'hive-conf-dir'='/Users/user/work/hive/conf/'" +
		                ")");

		tenv.useCatalog("iceberg");
		tenv.executeSql("CREATE DATABASE iceberg_db");
		tenv.useDatabase("iceberg_db");

		tenv.executeSql("CREATE TABLE sourceTable (\n" +
		                " userid int,\n" +
		                " f_random_str STRING\n" +
		                ") WITH (\n" +
		                " 'connector' = 'datagen',\n" +
		                " 'rows-per-second'='100',\n" +
		                " 'fields.userid.kind'='random',\n" +
		                " 'fields.userid.min'='1',\n" +
		                " 'fields.userid.max'='100',\n" +
		                "'fields.f_random_str.length'='10'\n" +
		                ")");

		tenv.executeSql(
				"insert into iceberg.iceberg_db.iceberg_001 select * from iceberg.iceberg_db.sourceTable");
	}
}

具体见:

https://github.com/zhangjun0x01/bigdata-examples/blob/master/iceberg/src/main/java/com/Flink2Iceberg.java

总结

总结一下,iceberg的资料比较少,很多设计或者讨论等需要关注issues,然后再去撸源码,可能对于刚入门的小伙伴来说有点困难。后续我也会多分享一些关于iceberg的文章,欢迎大家关注我公众号【大数据技术与应用实战】。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2020-09-30,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 大数据技术与应用实战 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 背景
  • iceberg简介
  • flink实时写入
    • 准备sql client环境
      • 创建catalog
        • 创建db
          • 创建table
            • 插入数据
              • 查询
                • 代码版本
                • 总结
                相关产品与服务
                流计算 Oceanus
                流计算 Oceanus 是大数据产品生态体系的实时化分析利器,是基于 Apache Flink 构建的企业级实时大数据分析平台,具备一站开发、无缝连接、亚秒延时、低廉成本、安全稳定等特点。流计算 Oceanus 以实现企业数据价值最大化为目标,加速企业实时化数字化的建设进程。
                领券
                问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档