前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >【Flink实战】Flink 商品销量统计-实战Bahir Connetor实战存储 数据到Redis6.X

【Flink实战】Flink 商品销量统计-实战Bahir Connetor实战存储 数据到Redis6.X

作者头像
大数据小禅
发布2023-09-14 08:08:56
1880
发布2023-09-14 08:08:56
举报
文章被收录于专栏:YO大数据YO大数据

Flink怎么操作Redis

Flink怎么操作redis?

  • 方式一:自定义sink
  • 方式二:使用connector

Redis Sink 核心是RedisMapper 是一个接口,使用时要编写自己的redis操作类实现这个接口中的三个方法

  • getCommandDescription 选择对应的数据结构和key名称配置
  • getKeyFromData 获取key
  • getValueFromData 获取value

使用

  • 添加依赖
代码语言:javascript
复制
<dependency>
    <groupId>org.apache.bahir</groupId>
    <artifactId>flink-connector-redis_2.11</artifactId>
    <version>1.0</version>
</dependency>

编码

代码语言:javascript
复制
public class MyRedisSink implements RedisMapper<Tuple2<String, Integer>> {
    @Override
    public RedisCommandDescription getCommandDescription() {
        return new RedisCommandDescription(RedisCommand.HSET, "VIDEO_ORDER_COUNTER");
    }

    @Override
    public String getKeyFromData(Tuple2<String, Integer> value) {
        return value.f0;
    }

    @Override
    public String getValueFromData(Tuple2<String, Integer> value) {
        return value.f1.toString();
    }
}

Flink 商品销量统计-转换-分组-聚合-存储自定义的Redis Sink实战

Redis环境说明 redis6

使用docker部署redis6.x 看个人主页docker相关文章

代码语言:javascript
复制
docker run -d  -p 6379:6379 redis

编码实战

数据源

代码语言:javascript
复制
public class VideoOrderSource extends RichParallelSourceFunction<VideoOrder> {


    private volatile Boolean flag = true;

    private Random random = new Random();

    private static List<String> list = new ArrayList<>();
    static {
        list.add("spring boot2.x课程");
        list.add("微服务SpringCloud课程");
        list.add("RabbitMQ消息队列");
        list.add("Kafka课程");
        list.add("小滴课堂面试专题第一季");
        list.add("Flink流式技术课程");
        list.add("工业级微服务项目大课训练营");
        list.add("Linux课程");
    }


    /**
     * run 方法调用前 用于初始化连接
     * @param parameters
     * @throws Exception
     */
    @Override
    public void open(Configuration parameters) throws Exception {
        System.out.println("-----open-----");
    }

    /**
     * 用于清理之前
     * @throws Exception
     */
    @Override
    public void close() throws Exception {
        System.out.println("-----close-----");
    }


    /**
     * 产生数据的逻辑
     * @param ctx
     * @throws Exception
     */
    @Override
    public void run(SourceContext<VideoOrder> ctx) throws Exception {

        while (flag){
            Thread.sleep(1000);
            String id = UUID.randomUUID().toString();
            int userId = random.nextInt(10);
            int money = random.nextInt(100);
            int videoNum = random.nextInt(list.size());
            String title = list.get(videoNum);
            VideoOrder videoOrder = new VideoOrder(id,title,money,userId,new Date());

            ctx.collect(videoOrder);
        }


    }

    /**
     * 控制任务取消
     */
    @Override
    public void cancel() {

        flag = false;
    }
}

保存的格式与存取的方法

代码语言:javascript
复制
public class VideoOrderCounterSink implements RedisMapper<Tuple2<String, Integer>> {


    /***
     * 选择需要用到的命令,和key名称
     * @return
     */
    @Override
    public RedisCommandDescription getCommandDescription() {
        return new RedisCommandDescription(RedisCommand.HSET, "VIDEO_ORDER_COUNTER");
    }

    /**
     * 获取对应的key或者filed
     *
     * @param data
     * @return
     */
    @Override
    public String getKeyFromData(Tuple2<String, Integer> data) {

        System.out.println("getKeyFromData=" + data.f0);
        return data.f0;
    }

    /**
     * 获取对应的值
     *
     * @param data
     * @return
     */
    @Override
    public String getValueFromData(Tuple2<String, Integer> data) {
        System.out.println("getValueFromData=" + data.f1.toString());
        return data.f1.toString();
    }
}

落地

代码语言:javascript
复制
public class Flink07RedisSinkApp {

    /**
     * source
     * transformation
     * sink
     *
     * @param args
     */
    public static void main(String[] args) throws Exception {

        //构建执行任务环境以及任务的启动的入口, 存储全局相关的参数
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
        env.setParallelism(1);

        //数据源 source
//        DataStream<VideoOrder> ds = env.fromElements(
//                new VideoOrder("21312","java",32,5,new Date()),
//                new VideoOrder("314","java",32,5,new Date()),
//                new VideoOrder("542","springboot",32,5,new Date()),
//                new VideoOrder("42","redis",32,5,new Date()),
//                new VideoOrder("4252","java",32,5,new Date()),
//                new VideoOrder("42","springboot",32,5,new Date()),
//                new VideoOrder("554232","flink",32,5,new Date()),
//                new VideoOrder("23323","java",32,5,new Date())
//        );
        DataStream<VideoOrder> ds = env.addSource(new VideoOrderSource());



        //transformation
       DataStream<Tuple2<String,Integer>> mapDS =  ds.map(new MapFunction<VideoOrder, Tuple2<String,Integer>>() {
            @Override
            public Tuple2<String, Integer> map(VideoOrder value) throws Exception {
                return new Tuple2<>(value.getTitle(),1);
            }
        });



//        DataStream<Tuple2<String,Integer>> mapDS = ds.flatMap(new FlatMapFunction<VideoOrder, Tuple2<String,Integer>>() {
//            @Override
//            public void flatMap(VideoOrder value, Collector<Tuple2<String, Integer>> out) throws Exception {
//                out.collect(new Tuple2<>(value.getTitle(),1));
//            }
//        });


       //分组
        KeyedStream<Tuple2<String,Integer>,String> keyByDS = mapDS.keyBy(new KeySelector<Tuple2<String,Integer>, String>() {
            @Override
            public String getKey(Tuple2<String, Integer> value) throws Exception {
                return value.f0;
            }
        });

        //统计每组有多少个
        DataStream<Tuple2<String,Integer>> sumDS =  keyByDS.sum(1);

        //控制台打印
        sumDS.print();

        //单机redis
        FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder().setHost("127.0.0.1").setPort(6379).build();

        sumDS.addSink(new RedisSink<>(conf,new VideoOrderCounterSink()));


        //DataStream需要调用execute,可以取个名称
        env.execute("custom redis sink job");
    }

}
在这里插入图片描述
在这里插入图片描述
本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • Flink怎么操作Redis
  • Flink 商品销量统计-转换-分组-聚合-存储自定义的Redis Sink实战
相关产品与服务
对象存储
对象存储(Cloud Object Storage,COS)是由腾讯云推出的无目录层次结构、无数据格式限制,可容纳海量数据且支持 HTTP/HTTPS 协议访问的分布式存储服务。腾讯云 COS 的存储桶空间无容量上限,无需分区管理,适用于 CDN 数据分发、数据万象处理或大数据计算与分析的数据湖等多种场景。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档