本项目需求运行在Hadoop10单机环境:
Windows11 开发环境:
由需求一清洗后的临时表中可得会话id和会话的时间戳,我们需要将单一设备的会话进一步细化分隔为新的会话。如上图查询结果所示。
Chatgpt: 将用户行为数据的会话分隔成细粒度的时间片可以带来以下好处:
综上所述,将用户行为数据的会话分隔成细粒度的时间片可以提供更详细、准确的用户行为分析和个性化服务,为企业决策和用户体验提供更有价值的数据支持。
该案例是本章节分隔会话需求的拆解测试。 需求是:会话差超过4,定义为一个新的会话。
* sesssionid newsessionid
* abc abc-0
* abc abc-0
* abc abc-1
* abc abc-1
* abc abc-2
* qwe qwe-0
* qwe qwe-0
* qwe qwe-1
create table tmp.test1(
sesssionid string,
ts int
)
insert into tmp.test1
values('abc',2),('abc',3),('abc',8),('abc',10),('abc',18),
('qwe',2),('qwe',3),('qwe',9)
select * from tmp.test1
-- 开启本地模式
set hive.exec.mode.local.auto=true;
select sesssionid ,ts,CONCAT(sesssionid,'-',new_row) AS newsessionid
FROM (
SELECT sesssionid ,ts,before_sid,diff,flag
,sum(flag) over(PARTITION by sesssionid order by ts) new_row
from (
select sesssionid ,ts,before_sid,diff,if(diff>4,1,0) as flag
from (
select sesssionid ,ts,before_sid,diff
from (
select sesssionid ,ts,before_sid,ts-before_sid as diff
from (
select sesssionid ,ts
,lag(ts,1,ts) over (PARTITION by sesssionid order by ts ) as before_sid
from tmp.test1
) t1
)t2
)t3
)t4
)T5
总结一下上述代码,需要用lag开窗计算会话的时间差,按照会话分隔的粒度计算出一个差值flag标志,flag是为了判断当前会话是否为重新开始的新会话。关键是掌握sum开窗函数的几种用法,sum可以按照给定分区进行累加,也可以进行分区内按照flag分别累加。具体步骤请按照上述HSQL中子查询一步一步验证该思路。
以上测试DEMO即为本章节DWD层需求,在开始DWD层会话分割前,先完成该测试,即可应用该思路在项目需求。
首先是创建tmp临时表event_log_splited,因为后续需求仍然需要不断完善。
create table tmp.event_log_splited(
account string,
appid string,
appversion string,
carrier string,
deviceid string,
devicetype string,
eventid string,
ip string,
latitude double,
longitude double,
nettype string,
osname string,
osversion string,
properties Map<String,String>,
releasechannel string,
resolution string,
sessionid string,
`timestamp` bigint,
newsessionid string
)
partitioned by(dt string)
STORED as ORC
TBLPROPERTIES ('orc.compress'='SNAPPY')
用于测试的语句:
select * from tmp.event_log_splited where dt = '2023-06-22'
alter table tmp.event_log_splited drop partition(dt='2023-06-22')
以下scala代码完成了从tmp.event_log_washed中的sessionid到 tmp.event_log_splited表中的newsessionid的需求。需求思路即为1.2小节内容。其中工具类已在本项目需求一中给出。
package com.yh.ods_etl
import com.yh.utils.SparkUtils
object AppLogSessionSplit_02 {
def main(args: Array[String]): Unit = {
if(args.length == 0){
println("缺失参数")
System.exit(0)
}
val spark = SparkUtils.getSparkSession("AppLogSessionSplit_02")
val dt = args(0)
spark.sql(
s"""
|
|insert overwrite table tmp.event_log_splited
|partition(dt='${dt}')
|select
| account
| ,appid
| ,appversion
| ,carrier
| ,deviceid
| ,devicetype
| ,eventid
| ,ip
| ,latitude
| ,longitude
| ,nettype
| ,osname
| ,osversion
| ,properties
| ,releasechannel
| ,resolution
| ,sessionid
| ,`timestamp`
| ,concat(sessionid,'-',sum(c1) over(partition by sessionid order by `timestamp`)) newsessionid
|from(
| select
| account
| ,appid
| ,appversion
| ,carrier
| ,deviceid
| ,devicetype
| ,eventid
| ,ip
| ,latitude
| ,longitude
| ,nettype
| ,osname
| ,osversion
| ,properties
| ,releasechannel
| ,resolution
| ,sessionid
| ,`timestamp`
| ,if( (`timestamp`-lag(`timestamp`,1,`timestamp`) over(partition by sessionid order by `timestamp`))/1000/60 > 10 ,1,0) c1
| from tmp.event_log_washed where dt = '${dt}'
|)t2
|
|""".stripMargin)
spark.stop()
}
}
Local本地测试运行成功hive截图:
编写一个shell03.DataSplit.sh
:
#! /bin/bash
dt=$1
if [ "x"$1 == "x" ]
then
dt=$(date -d "1 days ago" +%Y-%m-%d)
fi
echo " 执行日期 ---------- $dt --------------- "
spark-submit \
--master yarn \
--class com.yh.ods_etl.AppLogSessionSplit_02 \
--conf spark.yarn.jars=local:/opt/installs/spark3.2.0/jars/* \
--driver-memory 1G \
--driver-cores 2 \
--executor-memory 2G \
--num-executors 3 \
--executor-cores 2 \
--queue abc \
--jars /opt/app/spark-dw-jar-with-dependencies.jar \
/opt/app/spark-dw.jar $dt
相比较于需求一种的两个提交Shell,本次的shell只是更换了全类名和新上传的依赖jar包,需要注意是否有新的依赖加入。运行shell如下截图:
在我们前面需求处理的数据中存储的位置信息是经纬度,现在我们需要获取具体到省市区县的数据,因此我们就需要调用某地图的api来帮助我们解析字段。
获取api方式也较为简单,在高德开放平台申请即可,官方文档有详细的使用说明。
2.3 Json工具类hutool依赖 由于高德的返回位置信息是json,所以我们使用了hutool工具类解析json,请加入相关版本的依赖在pom文件。
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>5.8.20</version>
</dependency>
以下是dwd层数据库的创建,以及dwd.event_log_detail表的创建,数据将从前一章节的临时表中插入。
create database dwd;
create table dwd.event_log_detail(
account string,
appid string,
appversion string,
carrier string,
deviceid string,
devicetype string,
eventid string,
ip string,
latitude double,
longitude double,
nettype string,
osname string,
osversion string,
properties Map<String,String>,
releasechannel string,
resolution string,
sessionid string,
`timestamp` bigint,
newsessionid string,
province string,
city string,
district string
)
partitioned by(dt string)
STORED as orc
TBLPROPERTIES ('orc.compress'='SNAPPY')
select count(*)
from tmp.event_log_splited where dt = '2023-06-22' and account = '毕导'
alter table dwd.event_log_detail drop partition(dt='2023-06-22')
select *
from dwd.event_log_detail
这里自定义了udf函数,将api返回字段解析到表格字段中,并且,每一次省市县的解析只调用一次api的解析查询。我们暂时先指定一位叫毕导的用户测试。
package com.yh.ods_etl
import cn.hutool.json.{JSONObject, JSONUtil}
import com.yh.utils.SparkUtils
import org.apache.commons.lang3.StringUtils
import org.apache.spark.sql.DataFrame
import scalaj.http.{Http, HttpRequest}
object AppLogToDWD_03 {
def main(args: Array[String]): Unit = {
if(args.length == 0){
println("缺失参数")
System.exit(0)
}
val spark = SparkUtils.getSparkSession("AppLogToDWD_03")
spark.sparkContext.setLogLevel("WARN")
val dt = args(0)
spark.udf.register("parse_city",(latitude:Double,longitude:Double) => {
val request: HttpRequest = Http("https://restapi.amap.com/v3/geocode/regeo")
.param("key", "1ee98f687f76d5428f279cd0dbc5ad85")
.param("location", longitude+","+latitude)
val str: String = request.asString.body
print(str)
if(!StringUtils.isBlank(str)){
val jSONObject: JSONObject = JSONUtil.parseObj(str)
val jSONObject2: JSONObject = jSONObject.getJSONObject("regeocode").getJSONObject("addressComponent")
val province = jSONObject2.getStr("province")
val city = jSONObject2.getStr("city").replace("\\[]","")
val district = jSONObject2.getStr("district")
province+","+city+","+district
}else{
"null,null,null"
}
})
val df: DataFrame = spark.sql(
s"""
| select
| *
| ,split(parse_city(round(latitude,6),round(longitude,6)),',') c1
| from tmp.event_log_splited where dt = '${dt}' and account = '毕导'
|
|""".stripMargin)
df.cache()
df.createTempView("t2")
spark.sql(
s"""
|
|insert overwrite table dwd.event_log_detail
|partition(dt='${dt}')
|select
| account
| ,appid
| ,appversion
| ,carrier
| ,deviceid
| ,devicetype
| ,eventid
| ,ip
| ,latitude
| ,longitude
| ,nettype
| ,osname
| ,osversion
| ,properties
| ,releasechannel
| ,resolution
| ,sessionid
| ,`timestamp`
| ,newsessionid
| ,c1[0] prov
| ,c1[1] city
| ,c1[2] district
|from t2
|
|""".stripMargin)
spark.stop()
}
}
本地测试运行界面:
hive中查询dwd层: