前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >【Flink实战】用户统计:按照省份维度统计新老用户

【Flink实战】用户统计:按照省份维度统计新老用户

作者头像
大数据小禅
发布2023-09-14 08:11:28
2180
发布2023-09-14 08:11:28
举报
文章被收录于专栏:YO大数据YO大数据

数据源JSON格式数据

代码语言:javascript
复制
{
"deviceType":"iPhone 10",
"uid":"user_1",
"product":{
"name":"宝马",
"category":"车"
},
"os":"iOS",
"ip":"171.11.85.21",
"nu":1,
"channel":"华为商城",
"time":1735419335423,
"event":"browse",
"net":"WiFi",
"device":"4759947c-cd47-433c-ac8f-ae923a6d38b6",
"version":"V1.2.0"
}

统计分析

  • 关键字
    • 省份 :ip ==>省份,城市,运行商,经纬度…
    • 解决方案: 1)请求商业接口 ,高德百度 2)开源版 github.com ipparse 3) 从缓存中找 ,请求
    • https://www.free-api.com/doc/90 免费接口
  • IP测试类: 导入maven依赖http请求
代码语言:javascript
复制
Apache HttpComponents是Apache软件基金会的开源项目,它提供了一系列的高性能,高可用性的Java组件,用于实现HTTP协议,包括客户端,服务器,代理,缓存,身份验证,Cookie管理和HTTP协议处理。它的目标是提供一个完整的,高性能的HTTP解决方案,而不需要任何外部依赖。HttpComponents是一个基于Java的客户端/服务器HTTP协议实现,它提供了一个完整的,高性能的HTTP解决方案,而不需要任何外部依赖。

        <dependency>
            <groupId>org.apache.httpcomponents</groupId>
            <artifactId>httpclient</artifactId>
            <version>4.5.6</version>
        </dependency>

IP提取测试

代码语言:javascript
复制
public class IPRequest {
    public static void main(String[] args) {
        String ip="120.79.75.140";
        String province="-";
        String city="-";
        String url="https://apis.juhe.cn/ip/ipNew?ip="+ip+"&key="+ Key.key;
        CloseableHttpResponse response=null;
        System.out.println(url);
        CloseableHttpClient httpClient = HttpClients.createDefault();
        try {
            HttpGet httpGet = new HttpGet(url);
            response = httpClient.execute(httpGet);
            int code = response.getStatusLine().getStatusCode();
            if(code==200){
                HttpEntity entity = response.getEntity();
                String result = EntityUtils.toString(entity, "UTF-8");
                //{"resultcode":"200","reason":"查询成功","result":{"Country":"中国","Province":"广东省","City":"深圳市","Isp":"阿里云"},"error_code":0}
                JSONObject jsonData = JSON.parseObject(result);
                JSONObject data = jsonData.getJSONObject("result");
                province=data.getString("Province");
                city=data.getString("City");
                System.out.println(province+city);
            }
        }catch (Exception e){
            e.printStackTrace();
        }

    }
}

结果:
广东省深圳市
  • 总体代码 写入Redis复用上一篇文章的代码修改即可
代码语言:javascript
复制
public class ProvinceUserCntV1 {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> stream = environment.readTextFile("data/access.json");
        environment.setParallelism(1); //设置并行度为1方便观察
        SingleOutputStreamOperator<Access> filter = stream.map(new MapFunction<String, Access>() {
            @Override
            public Access map(String s) throws Exception {
                // json 转 Access
                try {
                    return JSON.parseObject(s, Access.class);
                } catch (Exception e) {
                    e.printStackTrace();
                    return null;
                }
            }
            //这里是只要不为空的数据  x != null等于把上面的空的数据过滤掉
        }).filter(x -> x != null).filter(
                new FilterFunction<Access>() {
            @Override
            public boolean filter(Access access) throws Exception {
                //只过滤出来 event='startup'的数据
                return "startup".equals(access.event);
            }
        });
        //使用Rich额外定义一个类来实现map
        SingleOutputStreamOperator<Access> result = filter.map(new IPMapFunction());
        DataStreamSink<Tuple3<String, Integer, Integer>> user = result.map(new MapFunction<Access, Tuple3<String, Integer, Integer>>() {
            @Override
            public Tuple3<String, Integer, Integer> map(Access access) throws Exception {
                return Tuple3.of(access.province, access.nu, 1);
            }
        }).keyBy(new KeySelector<Tuple3<String, Integer, Integer>, Tuple2<String, Integer>>() {
            @Override
            public Tuple2<String, Integer> getKey(Tuple3<String, Integer, Integer> value) throws Exception {
                return Tuple2.of(value.f0, value.f1);
            }
        }).sum(2).print("按照省份维度统计新老用户");


        environment.execute("OsUserCntAppV1");
    }
}
在这里插入图片描述
在这里插入图片描述
  • 关于flink异步IO的一篇文章 https://zhuanlan.zhihu.com/p/365232338
  • 上面的操作写入是同步的 IO 这样会存在问题,出现问题的时候后面的数据就阻塞了**(流处理吞吐量问题)**
  • 日志中是商品与相关信息是记录的ID,需要去连表查询数据库补全数据,考虑与外部数据连通的性能
  • 需要支持异步的数据库 只需要按照Flink给的模板实现一个RichAsync就可以了
  • 后面有单独的一篇文章完成Flink异步IO
在这里插入图片描述
在这里插入图片描述
思考:上面的代码还存在哪些问题?
  • 工作中:很大程度都是各种维度的统计分析
    • 离线数仓
    • 实时数仓
  • 较多的维度
    • 操作系统 + 新老用户
    • 新老用户
    • 省份 + 新老用户
    • 操作系统 + 省份 + 新老用户
    • 运营商 + 省份 + 新老用户
    • 运营商 + 新老用户
    • ==> KeyBy(…).sum(index)
  • 会遇到的统计问题
    • 每N(小时/分钟)统计一次
    • 每10分钟统计一次
    • 从xxxx==>xxxx事件段内的各种维度(…) 统计
本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 统计分析
  • IP提取测试
    • 思考:上面的代码还存在哪些问题?
    相关产品与服务
    云数据库 Redis
    腾讯云数据库 Redis(TencentDB for Redis)是腾讯云打造的兼容 Redis 协议的缓存和存储服务。丰富的数据结构能帮助您完成不同类型的业务场景开发。支持主从热备,提供自动容灾切换、数据备份、故障迁移、实例监控、在线扩容、数据回档等全套的数据库服务。
    领券
    问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档