前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >大数据技术栈之-实时数仓构建

大数据技术栈之-实时数仓构建

作者头像
小四的技术之旅
发布2023-03-02 19:08:44
9860
发布2023-03-02 19:08:44
举报

前言

之前说了数据采集方案,数据库的数据,前端埋点数据,IOT数据经过一些中间件或者应用程序采集到Kafka后,分为了两条路线,一条是走离线,一条走实时,离线的会存储到HDFS,然后时候Hive构建离线数据仓库,实时的则进入flink做流式计算后再根据需求建模,然后写入到对应的数据库中提供使用,今天我们来说一下实时这条线路。

flink流式处理

flink是一个流批一体处理框架,不过我们一般都是用它来做流式处理,flink提供了丰富的connector,我们可以轻松地对接不同的数据源,如flink-doris-connector,flink-connector-kafka,flink-connector-jdbc,flink-connector-redis等,下面我们主要演示flink从kafka中获取数据,然后经过流式处理后,写入到doris中 ,当然,写入redis,mysql,es这些也是比较简单。

main方法

main方法就是flink的处理流程,主要分为几步,配置运行环境的一些选项,读取kfaka数据源,构建doris sink,进行计算,sink数据到doris。

代码语言:javascript
复制
/**
 * 功能说明:kafka -> flink -> doris
 * <p>
 * Original @Author: steakliu-刘牌, 2022-11-14  17:17
 * <p>
 * Copyright (C)2020-2022  steakliu All rights reserved.
 */
public class Kafka2Flink2DorisApp {
  public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = MyExecutionEnvironment.getWebUIExecutionEnvironment();
    //1.设置环境
    ExecutionEnvironmentUtil.setExecutionEnv(env);
    //2.获取kafka数据源
    KafkaSource<String> kafkaSource = KafkaDataSource.getKafkaSource();
    //3.获取Doris builder
    DorisSink.Builder<String> builder = DorisBuilder.build();
    //4. 计算
    SingleOutputStreamOperator<String> operator = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "kafka-source")
      .flatMap(new FlatMapFunction<String, String>() {
        @Override
        public void flatMap(String s, Collector<String> collector) throws Exception {
            String json = s.replace("成都","chengdu");
          collector.collect(json);
        }
      });
    //5. sink到doris
    operator.sinkTo(builder.build()).setParallelism(8);
    env.execute("kafka-flink-doris");
  }
}

环境配置

要使用flink流式计算,我们首先要获取StreamExecutionEnvironment,并按需进行配置,如下配置了checkpoint保存点的时间间隔,设置了并发度等,还有许多配置项,我们可以按需配置。

代码语言:javascript
复制
/**
 * 功能说明: 环境配置
 * <p>
 * Original @Author: steakliu-刘牌, 2022-11-14  17:24
 * <p>
 * Copyright (C)2020-2022  steakliu All rights reserved.
 */
public class ExecutionEnvironmentUtil {
  public static void setExecutionEnv(StreamExecutionEnvironment env){
    //设置保存时间,3s保存一下,
    env.enableCheckpointing(10000);
    env.setParallelism(4);
    CheckpointConfig checkpointConfig = env.getCheckpointConfig();
    //checkpoit失败重试次数
    checkpointConfig.setTolerableCheckpointFailureNumber(3);
    //job取消时是否清除checkpoit数据,设置为不清除
    checkpointConfig.setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
    //chenckpoit最大并发数
    checkpointConfig.setMaxConcurrentCheckpoints(3);
    //保证只有一次,是基于2PC提交
    checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
    env.setRestartStrategy(RestartStrategies.fixedDelayRestart(5, Time.milliseconds(30000)));
    env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
  }
}

获取kafka数据源

通过kafka connector获取kafka数据源,从kafka的dorisUser主题获取数据,消费者组为userInfo,读取数据偏移量的策略是earliest,表示从最新的偏移量位置获取数据。

代码语言:javascript
复制
/**
 * 功能说明: 获取kafka数据源
 * <p>
 * Original @Author: steakliu-刘牌, 2022-11-14  17:20
 * <p>
 * Copyright (C)2020-2022  steakliu All rights reserved.
 */
@Slf4j
public class KafkaDataSource {
  public static KafkaSource<String> getKafkaSource() {
    return KafkaSource.<String>builder()
      .setBootstrapServers("localhost:9092")
      .setTopics("dorisUser")
      .setGroupId("userInfo")
      .setStartingOffsets(OffsetsInitializer.earliest())
      .setDeserializer(new KafkaRecordDeserializationSchema<String>() {
        @Override
        public void deserialize(ConsumerRecord<byte[], byte[]> consumerRecord, Collector<String> collector) throws IOException {
          String s = new String(consumerRecord.value());
          System.out.println("从kafka获取的数据 =======> "+s);
          collector.collect(s);
        }
        @Override
        public TypeInformation<String> getProducedType() {
          return TypeInformation.of(String.class);
        }
      })
      .build();
  }
}

Doris建表

使用的AGGREGATE模型,Doris有Aggregate,Unique,Duplicate三种数据模型,根据需求选择合适自己业务的模型。

代码语言:javascript
复制
CREATE TABLE `user`
(
  `user_id`         largeint(40) NOT NULL COMMENT "用户id",
  `date`            date NOT NULL COMMENT "数据灌入日期时间",
  `city`            varchar(20) NULL COMMENT "用户所在城市",
  `age`             smallint(6) NULL COMMENT "用户年龄",
  `sex`             tinyint(4) NULL COMMENT "用户性别",
  `last_visit_date` datetime REPLACE NULL DEFAULT "1970-01-01 00:00:00" COMMENT "用户最后一次访问时间",
  `cost`            bigint(20) SUM NULL DEFAULT "0" COMMENT "用户总消费",
  `max_dwell_time`  int(11) MAX NULL DEFAULT "0" COMMENT "用户最大停留时间",
  `min_dwell_time`  int(11) MIN NULL DEFAULT "99999" COMMENT "用户最小停留时间"
) ENGINE=OLAP
AGGREGATE KEY(`user_id`, `date`, `city`, `age`, `sex`)
COMMENT "OLAP"
DISTRIBUTED BY HASH(`user_id`) BUCKETS 1
PROPERTIES (
"replication_allocation" = "tag.location.default: 1",
"in_memory" = "false",
"storage_format" = "V2"
);  

构建doris sink

格式使用的是json,因为从kafka传过来的是json字符串,这里在构建doris sink的时候设置格式为json,doris sink会帮忙解析。

代码语言:javascript
复制
/**
 * 功能说明:doris sink
 * <p>
 * Original @Author: steakliu-刘牌, 2022-11-14  17:27
 * <p>
 * Copyright (C)2020-2022  steakliu All rights reserved.
 */
public class DorisBuilder {
  public static DorisSink.Builder<String> build() {
    DorisSink.Builder<String> builder = DorisSink.builder();
    final DorisReadOptions.Builder readOptionBuilder = DorisReadOptions.builder();
    Properties pro = new Properties();
    pro.setProperty("format", "json");
    pro.setProperty("read_json_by_line", "true");
    pro.setProperty("line_delimiter", "\n");

    DorisOptions.Builder dorisBuilder = DorisOptions.builder();
    dorisBuilder.setFenodes("localhost:8030")
      .setTableIdentifier("demo.user")
      .setUsername("root")
      .setPassword("");

    DorisExecutionOptions.Builder executionBuilder = DorisExecutionOptions.builder();
    executionBuilder
      .setStreamLoadProp(pro)
      .setLabelPrefix("a1");

    builder.setDorisReadOptions(readOptionBuilder.build())
      .setDorisExecutionOptions(executionBuilder.build())
      .setSerializer(new SimpleStringSerializer())
      .setDorisOptions(dorisBuilder.build());
    return builder;
  }
}

上面一个基本的流程就走完了,主要就是获取数据源,然后进行计算,最后写入到目标库,上面flink做计算案例中只是简单的使用了FloatMap算子,做了一个字符替换,flink提供了丰富的算子供我们使用,可以根据实际需求进行选择。

❝通过上面简单的案例,我们能对实时数仓的构建路线有一个简单的认知,当然,实际使用还需要考虑很多问题,踩很多坑,不过最重要的是心中要有一个蓝图,这样才有去选择和考虑其他方式或框架的基础,不然就是一把抓黑。

❝今天的分享就到这里,感谢你的观看,我们下期见。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2022-12-14,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 刘牌 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 前言
  • flink流式处理
    • main方法
      • 环境配置
        • 获取kafka数据源
          • Doris建表
            • 构建doris sink
            相关产品与服务
            云数据库 Redis
            腾讯云数据库 Redis(TencentDB for Redis)是腾讯云打造的兼容 Redis 协议的缓存和存储服务。丰富的数据结构能帮助您完成不同类型的业务场景开发。支持主从热备,提供自动容灾切换、数据备份、故障迁移、实例监控、在线扩容、数据回档等全套的数据库服务。
            领券
            问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档