需求架构图:
实时中的宽表其实是在退维,退维是数据处理中的一种操作,它是将细粒度的数据合并、归并为粗粒度数据的过程。在数据处理的过程中,原始数据往往包含大量的细节和细粒度信息,而有时候我们需要将这些细粒度数据转化为更高层次、更普遍的概要信息,以支持更广泛的数据分析和业务需求,这个过程就是退维。
具体来说,退维的过程可以通过聚合、归并、分组等方式来实现,它的目的是将数据从细粒度退化为粗粒度,以便更好地理解数据和从中获取有用的信息。退维的应用场景包括但不限于以下几个方面:
总之,退维是数据处理中的重要环节,它帮助我们从复杂的细粒度数据中提取出有用的信息,简化数据分析过程,同时还有助于节约存储空间和提高数据处理效率。通过退维,我们可以更好地理解数据,发现数据的规律和趋势,并支持更广泛的业务应用和决策。
[root@hadoop10 ~]# kafka-topics.sh --zookeeper hadoop10:2181 --create --topic ods_event --partitions 1 --replication-factor 1
WARNING: Due to limitations in metric names, topics with a period ('.') or underscore ('_') could collide. To avoid issues it is best to use either, but not both.
Created topic "ods_event".
[root@hadoop10 kafka0.11]# bin/kafka-console-producer.sh --broker-list hadoop10:9092 --topic ods_event
>{"release_channel":"360应用市场","device_type":"mi6","session_id":"s01","lat":38.089969323508726,"lng":114.35731900345093,"username":"tiger","eventId":"add_cart","eventTime":1670583693000,"properties":{"url":"/content/article/2354.html?a=3","itemId":"item002"}}
package demo;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
public class CommonDimensionOdsDwd {
public static void main(String[] args) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
tenv.executeSql("create table ods_event(\n" +
" release_channel string,\n" +
" device_type string,\n" +
" session_id string,\n" +
" lat double,\n" +
" lng double,\n" +
" username string,\n" +
" eventId string,\n" +
" eventTime bigint,\n" +
" properties Map<String,String>,\n" +
" proc as proctime()\n" +
")with(\n" +
" 'connector' = 'kafka',\n" +
" 'topic' = 'ods_event',\n" +
" 'properties.bootstrap.servers' = 'hadoop10:9092',\n" +
" 'properties.group.id' = 'x1',\n" +
" 'scan.startup.mode' = 'group-offsets',\n" +
" 'format' = 'json'\n" +
")").print();
tenv.executeSql("select * from ods_event").print();
}
}
两条示例,甩进kafka的生产者ods_event窗口
{"release_channel":"360应用市场","device_type":"mi6","session_id":"s01","lat":38.089969323508726,"lng":114.35731900345093,"username":"guoyachao","eventId":"add_cart","eventTime":1670583693000,"properties":{"url":"/content/article/2354.html?a=3","itemId":"item002"}}
{"release_channel":"360应用市场","device_type":"mi6","session_id":"s02","lat":38.089969323508726,"lng":114.35731900345093,"username":"arthas","eventId":"add_cart","eventTime":1670583694000,"properties":{"url":"/content/article/2354.html?a=3","itemId":"item002"}}
通过生产者发送消息,在idea中可以接收到,再进行下一步。
我们还需要关联另外三张维表,页面信息维表、用户信息维和地理信息维,将kafka接收到的事件消息和这两个维度进行关联,然后构建宽表,这个宽表有三表中所有信息。
hbase:002:0> create 'dim_page_info','f'
Created table dim_page_info
Took 1.2378 seconds
=> Hbase::Table - dim_page_info
[root@hadoop10 ~]# vim dim_page_info_loadhbase.sh
put 'dim_page_info' , '/mall/' , 'f:pt', '商品详情页'
put 'dim_page_info' , '/mall/' , 'f:sv', '商城服务'
put 'dim_page_info' , '/cotent/article/' , 'f:pt', '文章页'
put 'dim_page_info' , '/content/article/' , 'f:sv', '内容服务'
put 'dim_page_info' , '/mall/promotion/' , 'f:pt', '活动页'
put 'dim_page_info' , '/mall/promotion/' , 'f:sv', '商城服务'
put 'dim_page_info' , '/mall/search/' , 'f:pt', '搜索结果页'
put 'dim_page_info' , '/mall/search/' , 'f:sv', '搜索服务'
用hbase shell执行脚本
[root@hadoop10 ~]# hbase shell /root/dim_page_info_loadhbase.sh
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/opt/installs/hbase2.4/lib/client-facing-thirdparty/slf4j-reload4j-1.7.33.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/opt/installs/hadoop-3.1.4/share/hadoop/common/lib/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Reload4jLoggerFactory]
hbase:001:0> put 'dim_page_info' , '/mall/' , 'f:pt', '商品详情页'
Took 0.7823 seconds
hbase:002:0> put 'dim_page_info' , '/mall/' , 'f:sv', '商城服务'
Took 0.0391 seconds
hbase:003:0> put 'dim_page_info' , '/cotent/article/' , 'f:pt', '文章页'
Took 0.0230 seconds
hbase:004:0> put 'dim_page_info' , '/content/article/' , 'f:sv', '内容服务'
Took 0.0120 seconds
hbase:005:0> put 'dim_page_info' , '/mall/promotion/' , 'f:pt', '活动页'
Took 0.0061 seconds
hbase:006:0> put 'dim_page_info' , '/mall/promotion/' , 'f:sv', '商城服务'
Took 0.0046 seconds
hbase:007:0> put 'dim_page_info' , '/mall/search/' , 'f:pt', '搜索结果页'
Took 0.0164 seconds
hbase:008:0> put 'dim_page_info' , '/mall/search/' , 'f:sv', '搜索服务'
Took 0.0060 seconds
hbase:009:0>
查看导入完成:
hbase:001:0> scan 'dim_page_info',{LIMIT=>10,FORMATTER => 'toString'}
ROW COLUMN+CELL
/content/article/ column=f:sv, timestamp=2023-07-16T12:29:01.720, value=内容服务
/cotent/article/ column=f:pt, timestamp=2023-07-16T12:29:01.669, value=文章页
/mall/ column=f:pt, timestamp=2023-07-16T12:29:01.542, value=商品详情页
/mall/ column=f:sv, timestamp=2023-07-16T12:29:01.619, value=商城服务
/mall/promotion/ column=f:pt, timestamp=2023-07-16T12:29:01.751, value=活动页
/mall/promotion/ column=f:sv, timestamp=2023-07-16T12:29:01.770, value=商城服务
/mall/search/ column=f:pt, timestamp=2023-07-16T12:29:01.798, value=搜索结果页
/mall/search/ column=f:sv, timestamp=2023-07-16T12:29:01.829, value=搜索服务
5 row(s)
Took 0.0726 seconds
如果表的数据量不大,可以将他整理到shell脚本,一次写入hbase。
这个维表在Hbase中已经创建过创建,且已经通过Flink CDC与业务库MySQL实时关联。此Hbase业务表的构建方法和思路依托前一个需求:http://t.csdn.cn/FVBO4。
tenv.executeSql(" CREATE TABLE dim_user_info (\n" +
" username String,\n" +
" f ROW<id INT,phone String,status INT,create_time timestamp(3),gender int,birthday date,city String,job String,source_type int>,\n" +
" PRIMARY KEY (username) NOT ENFORCED\n" +
") WITH (\n" +
" 'connector' = 'hbase-2.2',\n" +
" 'table-name' = 'dim_user_info',\n" +
" 'zookeeper.quorum' = 'hadoop10:2181',\n" +
" 'lookup.cache.max-rows' = '1000',\n" + //缓存最大条数
" 'lookup.cache.ttl' = '1min'\n" + //缓存最大保存时间
")");
ChatGPT对该Flink SQL语句的解释:
CREATE TABLE dim_user_info: 这部分指定了创建名为 dim_user_info 的 Flink 表。
username String: 这是表的第一个字段,类型为 String,代表 HBase 表的 RowKey。
f ROW<id INT,phone String,status INT,create_time timestamp(3),gender int,birthday date,city String,job String,source_type int>: 这是表的第二个字段 f,它是一个复合类型的字段。在 HBase 表中,这个字段将被表示为一个列族,包含多个列。
PRIMARY KEY (username) NOT ENFORCED: 这部分定义了 username 字段为主键,NOT ENFORCED 表示这个主键不是强制性的,即在写入数据时可以重复。
WITH (...): 这是一个表的连接器选项部分,它指定了 Flink 如何连接到 HBase 表。具体的选项如下:
'connector' = 'hbase-2.2': 这里指定了使用 HBase 2.2 版本的连接器。
'table-name' = 'dim_user_info': 指定了连接的 HBase 表的名称为 dim_user_info。
'zookeeper.quorum' = 'hadoop10:2181': 指定了连接的 HBase ZooKeeper 的地址。
'lookup.cache.max-rows' = '1000': 指定了查询结果的最大缓存行数为 1000 行。在执行 Flink 查询时,它会将查询的结果缓存在 Flink 中,以提高查询性能。
'lookup.cache.ttl' = '1min': 指定了查询结果的缓存过期时间为 1 分钟。即如果某个查询结果在 1 分钟内没有被使用,则会从缓存中移除。
该查询语句的含义是在 Flink 中创建了一个连接到 HBase 表 dim_user_info 的表,其中 username 字段作为主键,字段 f 是一个复合类型的字段,用于存储 HBase 表中的列族和列。通过这个表,你可以在 Flink 中方便地查询和操作 HBase 表中的数据。
在这一步我们只需要运行代码,然后在控制台打印出这个表即可,如下图所示。证明可以读取到Hbase中的数据,以供下一步做宽表连接用户信息时备用。如果没有数据,则需要按照本项目需求一的内容排查。
该维表同样要提前写在Hbase中,其中地理信息的解析思路和方法在我的Spark数仓项目中已经给出,这里不再赘述。
具体的代码验证方法同用户信息维一样:
tenv.executeSql(" CREATE TABLE dim_geo_area (\n" +
" geohash String,\n" +
" f ROW<prov String,city String,area String>,\n" +
" PRIMARY KEY (geohash) NOT ENFORCED\n" +
") WITH (\n" +
" 'connector' = 'hbase-2.2',\n" +
" 'table-name' = 'dim_geo_area',\n" +
" 'zookeeper.quorum' = 'hadoop10:2181',\n" +
" 'lookup.cache.max-rows' = '1000',\n" + //缓存最大条数
" 'lookup.cache.ttl' = '1min'\n" + //缓存最大保存时间
")");
下图显示已经成功读取到Hbase中的地理信息表数据打印:
tenv.executeSql("select t1.*,t2.f.phone,t2.f.status,t3.f.prov,t3.f.city,t3.f.area \n" +
"from ods_event t1 \n" +
"left join dim_user_info for system_time as of t1.proc t2 on t1.username = t2.username " +
"left join dim_geo_area for system_time as of t1.proc t3 on geohash(t1.lat,t1.lng) = t3.geohash ").print();
通过Kafka发送的生产者消息,将事件信息与用户及地理维表关联,可以读取到关于用户信息和地理位置的事件信息详细,页面信息表这里还没有关联。其中用户信息支持修改,并通过FlinkCDC同步到Hbase,地理信息在下图中太长了,一张图没有截取到。此截图的实验是修改了MySQL业务库表中用户guoyachao的phone信息,从15516000447改为13253161303,然后重新通过kafka发送事件消息,得到新的维表关联结果。
完整版的该需求代码如下:
体现了上文三个表的查询和关联,包括页面信息表,用户信息表,地理信息表。对于地理的解析需要用到自定义函数,在spark数仓项目中已经给出过。
package demo;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
public class CommonDimensionOdsDwd {
public static void main(String[] args) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
/**
* {"release_channel":"华为应用市场",
* "device_type":"mi8",
* "session_id":"s09",
* "lat":38.089969323508726,
* "lng":114.35731900345093,
* "username":"windy",
* "eventId":"add_cart",
* "eventTime":1670586083000,
* "properties":{"url":"/content/article/2354.html?a=3","itemId":"item003"}}
*
* create table ods_event(
* release_channel string,
* device_type string,
* session_id string,
* lat double,
* lng double,
* username string,
* eventId string,
* eventTime bigint,
* properties Map<String,String>
* )with(
* 'connector' = 'kafka',
* 'topic' = 'ods_event',
* 'properties.bootstrap.servers' = 'hadoop10:9092',
* 'properties.group.id' = 'x1',
* 'scan.startup.mode' = 'group-offsets',
* 'format' = 'json'
* )
*/
tenv.executeSql("create table ods_event(\n" +
" release_channel string,\n" +
" device_type string,\n" +
" session_id string,\n" +
" lat double,\n" +
" lng double,\n" +
" username string,\n" +
" eventId string,\n" +
" eventTime bigint,\n" +
" properties Map<String,String>,\n" +
" proc as proctime()\n" +
")with(\n" +
" 'connector' = 'kafka',\n" +
" 'topic' = 'ods_event',\n" +
" 'properties.bootstrap.servers' = 'hadoop10:9092',\n" +
" 'properties.group.id' = 'x1',\n" +
" 'scan.startup.mode' = 'group-offsets',\n" +
" 'format' = 'json'\n" +
")").print();
//tenv.executeSql("select * from ods_event").print();
/**
* CREATE TABLE dim_user_info (
* username String,
* f ROW<id INT,phone String,status String,create_time String,gender String,birthday String,city String,job String,source_type String>
* PRIMARY KEY (username) NOT ENFORCED
* ) WITH (
* 'connector' = 'hbase-2.2',
* 'table-name' = 'dim_user_info',
* 'zookeeper.quorum' = 'hadoop10:2181'
* )
*/
tenv.executeSql(" CREATE TABLE dim_user_info (\n" +
" username String,\n" +
" f ROW<id INT,phone String,status INT,create_time timestamp(3),gender int,birthday date,city String,job String,source_type int>,\n" +
" PRIMARY KEY (username) NOT ENFORCED\n" +
") WITH (\n" +
" 'connector' = 'hbase-2.2',\n" +
" 'table-name' = 'dim_user_info',\n" +
" 'zookeeper.quorum' = 'hadoop10:2181',\n" +
" 'lookup.cache.max-rows' = '1000',\n" + //缓存最大条数
" 'lookup.cache.ttl' = '1min'\n" + //缓存最大保存时间
")");
// tenv.executeSql("select * from dim_user_info").print();
tenv.executeSql(" CREATE TABLE dim_geo_area (\n" +
" geohash String,\n" +
" f ROW<prov String,city String,area String>,\n" +
" PRIMARY KEY (geohash) NOT ENFORCED\n" +
") WITH (\n" +
" 'connector' = 'hbase-2.2',\n" +
" 'table-name' = 'dim_geo_area',\n" +
" 'zookeeper.quorum' = 'hadoop10:2181',\n" +
" 'lookup.cache.max-rows' = '1000',\n" + //缓存最大条数
" 'lookup.cache.ttl' = '1min'\n" + //缓存最大保存时间
")");
// tenv.executeSql("select * from dim_geo_area").print();
/**
* select t1.*,t2.f.phone,t2.f.status,t3.f.prov,t3.f.city,t3.f.area
* from ods_event t1
* left join dim_user_info for system_time as of t1.proc t2 on t1.username = t2.username
* left join dim_geo_area for system_time as of t1.proc t3 on geohash(t1.lat,t1.lng) = t3.geohash
*/
// tenv.executeSql("select t1.*,t2.f.phone,t2.f.status,t3.f.prov,t3.f.city,t3.f.area \n" +
// "from ods_event t1 \n" +
// "left join dim_user_info for system_time as of t1.proc t2 on t1.username = t2.username " +
// "left join dim_geo_area for system_time as of t1.proc t3 on geohash(t1.lat,t1.lng) = t3.geohash ").print();
//宽表整合代码见下:
tenv.executeSql(
" create table page_hbase( "+
" url_prefix STRING, "+
" f ROW< "+
" sv STRING, "+
" pt STRING> "+
" ) WITH( "+
" 'connector' = 'hbase-2.2', "+
" 'table-name' = 'dim_page_info', "+
" 'zookeeper.quorum' = 'hadoop10:2181' "+
" ) "
);
tenv.executeSql(
" CREATE TABLE dwd_kafka( "
+" user_id BIGINT, "
+" username string, "
+" session_id string, "
+" event_Id string, "
+" event_time bigint, "
+" lat double, "
+" lng double, "
+" release_channel string, "
+" device_type string, "
+" properties map<string,string>, "
+" register_phone STRING, "
+" user_status INT, "
+" register_time TIMESTAMP(3), "
+" register_gender INT, "
+" register_birthday DATE, "
+" register_city STRING, "
+" register_job STRING, "
+" register_source_type INT, "
+" gps_province STRING, "
+" gps_city STRING, "
+" gps_region STRING, "
+" page_type STRING, "
+" page_service STRING "
+" ) WITH ( "
+" 'connector' = 'kafka', "
+" 'topic' = 'dwd_events', "
+" 'properties.bootstrap.servers' = 'hadoop10:9092', "
+" 'properties.group.id' = 'testGroup', "
+" 'scan.startup.mode' = 'earliest-offset', "
+" 'value.format'='json') "
);
tenv.createTemporaryFunction("geohash",GeoHashUDF.class);
tenv.executeSql("insert into dwd_kafka select " +
"cast(t2.f.id as bigint) as user_id,t1.username,t1.session_id,t1.eventId as event_id,t1.eventTime,\n" +
"t1.lat,t1.lng,t1.release_channel,t1.device_type,t1.properties,t2.f.phone as register_phone,\n" +
"t2.f.status as user_status,t2.f.create_time as register_time,t2.f.gender as register_gender,\n" +
"t2.f.birthday as register_birthday,t2.f.city as register_city,t2.f.job as register_job,\n" +
"t2.f.source_type as register_source_type,t3.f.prov,\n" +
"t3.f.city as gps_city,t3.f.area,t4.f.pt as page_type,t4.f.sv as page_service \n" +
"from ods_event t1 \n" +
"left join dim_user_info for system_time as of t1.proc t2 on t1.username = t2.username " +
"left join dim_geo_area for system_time as of t1.proc t3 on geohash(t1.lat,t1.lng) = t3.geohash " +
"left join page_hbase for system_time as of t1.proc t4 on regexp_extract(t1.properties['url'],'(.*/).*',1) = t4.url_prefix")
.print();
}
}
这是我们最终的实验,事件消息从kafka进入,经过hbase维表的关联,再从kafka消费者中出来,作为dwd层宽表明细。下面是详细完整的kafka生产者操作代码和测试数据,与截图保持一致。
[root@hadoop10 ~]# kafka-topics.sh --zookeeper hadoop10:2181 --create --topic dwd_events --partitions 1 --replication-factor 1
WARNING: Due to limitations in metric names, topics with a period ('.') or underscore ('_') could collide. To avoid issues it is best to use either, but not both.
Created topic "dwd_events".
[root@hadoop10 ~]# cd /opt/installs/kafka0.11/
[root@hadoop10 kafka0.11]# bin/kafka-console-consumer.sh --bootstrap-server hadoop10:9092 --topic dwd_events
{"user_id":16,"username":"guoyachao","session_id":"s01","event_Id":"add_cart","event_time":1670583693000,"lat":38.089969323508726,"lng":114.35731900345093,"release_channel":"360应用市场","device_type":"mi6","properties":{"itemId":"item002","url":"/content/article/2354.html?a=3"},"register_phone":"13253161303","user_status":1,"register_time":"2023-07-10 12:34:56","register_gender":1,"register_birthday":"1990-01-01","register_city":"New York","register_job":"Engineer","register_source_type":1,"gps_province":"河北省","gps_city":"石家庄市","gps_region":"鹿泉区","page_type":null,"page_service":"内容服务"}
{"user_id":21,"username":"arthas","session_id":"s02","event_Id":"add_cart","event_time":1670583694000,"lat":38.089969323508726,"lng":114.35731900345093,"release_channel":"360应用市场","device_type":"mi6","properties":{"itemId":"item002","url":"/content/article/2354.html?a=3"},"register_phone":"15516000000","user_status":1,"register_time":"2023-07-06 15:30:20","register_gender":1,"register_birthday":"1998-03-25","register_city":"Berlin","register_job":"Writer","register_source_type":1,"gps_province":"河北省","gps_city":"石家庄市","gps_region":"鹿泉区","page_type":null,"page_service":"内容服务"}
{"user_id":10,"username":"user1","session_id":"s09","event_Id":"add_cart","event_time":1670585523000,"lat":39.28996932350873,"lng":112.35731900345093,"release_channel":"华为应用市场","device_type":"mi8","properties":{"itemId":"item003","url":"/content/article/2354.html?a=3"},"register_phone":"1234567890","user_status":1,"register_time":"2023-07-10 12:34:56","register_gender":1,"register_birthday":"1990-01-01","register_city":"New York","register_job":"Engineer","register_source_type":1,"gps_province":"山西省","gps_city":"朔州市","gps_region":"朔城区","page_type":null,"page_service":"内容服务"}