Hive是数据仓库中最常用的一个组件, 但是第一代的Hive的执行引擎是MapReduce,运行起来比较慢, 后面Hive的执行引擎用的比较多的有Tez,Spark
Hive on Spark
核心组件是Hive, 只是把运行的执行引擎替换为了Spark内存计算框架, 提高的程序运行的效率
其中Hive主要负责数据的存储以及SQL语句的解析
Spark on Hive
核心组件是Spark, 只是把Spark的的数据存储使用Hive以及元数据管理使用Hive, Spark负责SQL的解析并且进行计算
在这里我们采用Hive-on-Spark的设计架构
# 上传安装文件
apache-hive-3.1.2-bin.tar.gz
# 解压到指定目录
tar -zxvf apache-hive-3.1.2-bin.tar.gz -C /bigdata/server/
# 创建软链接
ln -s apache-hive-3.1.2-bin hive
# vim /etc/profile.d/custom_env.sh
## hive
export HIVE_HOME=/bigdata/server/hive
export PATH=$PATH:$HIVE_HOME/bin
同步环境变量 xsync /etc/profile.d/custom_env.sh
加载环境变量 source /etc/profile
在/bigdata/server/hive下的conf目录下创建
创建配置文件 hive-site.xml
<configuration>
<-- 元数据存储的数据库配置 -->
<property>
<name>javax.jdo.option.ConnectionURL</name>
<value>jdbc:mysql://biz01:3306/hive?useSSL=false</value>
</property>
<property>
<name>javax.jdo.option.ConnectionDriverName</name>
<value>com.mysql.jdbc.Driver</value>
</property>
<property>
<name>javax.jdo.option.ConnectionUserName</name>
<value>root</value>
</property>
<property>
<name>javax.jdo.option.ConnectionPassword</name>
<value>123456</value>
</property>
<-- 数据文件存储的目录配置 -->
<property>
<name>hive.metastore.warehouse.dir</name>
<value>/user/hive/warehouse</value>
</property>
<-- 去掉metastore的校验 -->
<property>
<name>hive.metastore.schema.verification</name>
<value>false</value>
</property>
<-- 设置thrift的访问端口 hiveserver2 -->
<property>
<name>hive.server2.thrift.port</name>
<value>10000</value>
</property>
<-- 设置hiveserver2绑定的主机 -->
<property>
<name>hive.server2.thrift.bind.host</name>
<value>hadoop03</value>
</property>
<-- 禁用权限认证 -->
<property>
<name>hive.metastore.event.db.notification.api.auth</name>
<value>false</value>
</property>
<-- hive客户端配置, 显示表头信息 -->
<property>
<name>hive.cli.print.header</name>
<value>true</value>
</property>
<-- hive客户端配置, 显示当前数据库 -->
<property>
<name>hive.cli.print.current.db</name>
<value>true</value>
</property>
</configuration>
schematool -initSchema -dbType mysql -verbose
-- 解决元数据中文乱码
# 设置注释中文乱码的问题 在MySQL的元数据库设置
alter table COLUMNS_V2 modify column COMMENT varchar(256) character set utf8;
alter table TABLE_PARAMS modify column PARAM_VALUE varchar(4000) character set utf8;
alter table PARTITION_PARAMS modify column PARAM_VALUE varchar(4000) character set utf8;
alter table PARTITION_KEYS modify column PKEY_COMMENT varchar(4000) character set utf8;
alter table INDEX_PARAMS modify column PARAM_VALUE varchar(4000) character set utf8;
启动hive客户端
bin/hive
创建一张测试表
create table student(id int, name string);
通过insert插入测试数据
insert into student values(100,'wolf');
spark-3.0.0-bin-without-hadoop.tgz
tar -zxvf spark-3.0.0-bin-without-hadoop.tgz -C /bigdata/server/
#
mv spark-3.0.0-bin-without-hadoop spark
默认是spark-env.sh.template
vim conf/spark-env.sh
# spark-on 配置
export HADOOP_CONF_DIR=/bigdata/server/hadoop/etc/hadoop
export YARN_CONF_DIR=/bigdata/server/hadoop/etc/hadoop
# spark的classpath依赖配置
export SPARK_DIST_CLASSPATH=$(/bigdata/server/hadoop/bin/hadoop classpath)
#直接在conf下创建
vim spark-defaults.conf
# 默认提交到yarn集群运行
spark.master=yarn
spark.eventLog.enabled=true
spark.eventLog.dir=hdfs://hadoop01:8020/spark/log
spark.executor.memory=1g
spark.driver.memory=1g
在hdfs上创建历史日志存放目录
hdfs dfs -mkdir -p /spark/log
# vim /etc/profile.d/custom_env.sh
# spark
export SPARK_HOME=/bigdata/server/spark
export PATH=$PATH:$SPARK_HOME/bin
同步环境变量 xsync /etc/profile.d/custom_env.sh
加载环境变量 source /etc/profile
# 在spark目录下,提交示例程序
spark-submit \
--class org.apache.spark.examples.SparkPi \
--master yarn \
examples/jars/spark-examples_2.12-3.0.0.jar \
10
#还在spark目录下执行
>bin/spark-shell
然后出来scala>执行下面语句
sc.textFile("hdfs://hadoop01/input/").flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _).collect()
spark下的jars下的所有jars
# 这里需要上传spark纯净的jar包目录(不含hadoop的jar包)
hdfs dfs -put spark/jars/* /spark/jars/
vim conf/hive-site.xml
<!--Spark依赖位置(注意:端口号8020必须和namenode的端口号一致)-->
<property>
<name>spark.yarn.jars</name>
<value>hdfs://hadoop01:8020/spark/jars/*</value>
</property>
<!--Hive执行引擎-->
<property>
<name>hive.execution.engine</name>
<value>spark</value>
</property>
<!--Hive和Spark连接超时时间-->
<property>
<name>hive.spark.client.connect.timeout</name>
<value>10000ms</value>
</property>
启动hive客户端
bin/hive
创建一张测试表
create table student(id int, name string);
通过insert插入测试数据
insert into student values(100,'wolf');
# 使用工具连接Hive
mkdir /bigdata/server/hive/logs
nohup bin/hiveserver2 > /bigdata/server/hive/logs/hive.log 2>&1 &
#hive下的
bin/beeline
#执行之
beeline>!connect jdbc:hive2://localhost:10000
输入用户名和密码
#使用idea链接后
show databases;
select * from student;
-- 创建电信用户行为日志表
drop table if exists ods_behavior_log;
create external table ods_behavior_log
(
line string
)
partitioned by (dt string)
location '/behavior/ods/ods_behavior_log';
load data inpath '/behavior/origin/log/2022-08-08' into table ods_behavior_log partition (dt = '2022-08-08');
#!/bin/bash
if [ $# -ne 2 ]; then
echo "useage origin_to_ods_init_behavior_log.sh start_date end_date"
exit
fi
EXPORT_START_DATE=$1
EXPORT_END_DATE=$2
i=$EXPORT_START_DATE
while [[ $i < `date -d "+1 day $EXPORT_END_DATE" +%Y-%m-%d` ]]
do
SQL="load data inpath '/sb/origin/log/start/dt=$i' into table wolf.ods_start_log partition(dt='$i');"
hive -e "$SQL"
i=`date -d "+1 day $i" +%Y-%m-%d`
done
public class UrlHandlerUdf extends GenericUDF {
@Override
public ObjectInspector initialize(ObjectInspector[] objectInspectors) throws UDFArgumentException {
// 参数长度判断
if(objectInspectors.length != 1){
throw new UDFArgumentLengthException("传入的数据参数的长度不正确!");
}
// 判断输入参数的类型
if(!objectInspectors[0].getCategory().equals(ObjectInspector.Category.PRIMITIVE)){
throw new UDFArgumentTypeException(0,"输入的参数类型不正确!!!");
}
return PrimitiveObjectInspectorFactory.javaStringObjectInspector;
}
@Override
public Object evaluate(DeferredObject[] deferredObjects) throws HiveException {
if(deferredObjects[0].get() == null){
return "" ;
}
String data = deferredObjects[0].get().toString();
int index = data.indexOf("?");
if(index > 0 ){
data = data.substring(0,index);
}
if (data.startsWith("https://")){
data=data.replaceFirst("https://","http://");
}
return new Text(data.getBytes(StandardCharsets.UTF_8));
}
}
public class Ip2Loc extends GenericUDF {
public List<Map<String,String>> mapList = new ArrayList<>();
static {
String host = "192.168.113.145";
int port = 6379;
Jedis jedis = new Jedis(host, port);
Set<String> areas = jedis.smembers("areas");
for (String area : areas) {
JSONObject jsonObject = JSON.parseObject(area);
Map<String,String> map = new HashMap<>();
map.put("city",jsonObject.getString("city"));
map.put("province",jsonObject.getString("province"));
mapList.add(map);
}
// 把map数据写入到文件
}
public Ip2Loc
// 初始化参数判断
@Override
public ObjectInspector initialize(ObjectInspector[] objectInspectors) throws UDFArgumentException {
// 参数长度判断
if(objectInspectors.length != 1){
throw new UDFArgumentLengthException("传入的数据参数的长度不正确!");
}
// 判断输入参数的类型
if(!objectInspectors[0].getCategory().equals(ObjectInspector.Category.PRIMITIVE)){
throw new UDFArgumentTypeException(0,"输入的参数类型不正确!!!");
}
return PrimitiveObjectInspectorFactory.javaStringObjectInspector;
}
// 返回一个字符串 广东省|广州市
@Override
public Object evaluate(DeferredObject[] deferredObjects) throws HiveException {
if(deferredObjects[0].get() == null){
return "" ;
}
int index = (int) (Math.random() * mapList.size());
Text new_str = new Text((mapList.get(index).get("city")+"_"+(mapList.get(index).get("province"))).getBytes(StandardCharsets.UTF_8));
return new_str;
}
@Override
public String getDisplayString(String[] strings) {
return "";
}
}
create function get_city_by_ip
as 'cn.wolfcode.udf.Ip2Loc' using jar 'hdfs://hadoop01:8020//spark/jars/hive_udf_custom-1.0.0.jar';
create function url_trans_udf
as 'cn.wolfcode.udf.UrlHandlerUdf' using jar 'hdfs://hadoop01:8020//spark/jars/hive_udf_custom-1.0.0.jar';
select get_city_by_ip("192.168.113.1")
select url_trans_udf("http://www.baidu.com?name=kw")
-- 创建dwd明细表数据
-- 获取到城市, 省份
-- 获取到url的资源路径 去掉查询参数
-- 定义表
DROP TABLE IF EXISTS dwd_behavior_log;
CREATE EXTERNAL TABLE dwd_behavior_log
(
`client_ip` STRING COMMENT '客户端IP',
`device_type` STRING COMMENT '设备类型',
`type` STRING COMMENT '上网类型 4G 5G WiFi',
`device` STRING COMMENT '设备ID',
`url` STRING COMMENT '访问的资源路径',
`city` STRING COMMENT '城市',
`ts` bigint comment "时间戳"
) COMMENT '页面启动日志表'
PARTITIONED BY (`dt` STRING)
STORED AS ORC
LOCATION '/behavior/dwd/dwd_behavior_log'
TBLPROPERTIES ("orc.compress" = "snappy");
select * from dwd_behavior_log;
-- 设置支持动态分区处理
set hive.exec.dynamic.partition.mode=nonstrict;
insert overwrite table dwd_behavior_log partition (dt)
select get_json_object(line, '$.client_ip'),
get_json_object(line, '$.device_type'),
get_json_object(line, '$.type'),
get_json_object(line, '$.device'),
url_trans_udf(get_json_object(line, '$.url')),
split(get_city_by_ip(get_json_object(line, '$.client_ip')),"_")[0],
get_json_object(line, '$.time'),
dt
from ods_behavior_log;
-- 定义表
DROP TABLE IF EXISTS dws_behavior_log;
CREATE EXTERNAL TABLE dws_behavior_log
(
`client_ip` STRING COMMENT '客户端IP',
`device_type` STRING COMMENT '设备类型',
`type` STRING COMMENT '上网类型 4G 5G WiFi',
`device` STRING COMMENT '设备ID',
`url` STRING COMMENT '访问的资源路径',
`city` STRING COMMENT '城市',
`ts` bigint comment "时间戳"
) COMMENT '页面启动日志表'
PARTITIONED BY (`dt` STRING)
STORED AS ORC
LOCATION '/behavior/dws/dws_behavior_log'
TBLPROPERTIES ("orc.compress" = "snappy");
--装载数据
insert overwrite table dws_behavior_log partition (dt)
select client_ip, device_type, type, device, url, city, ts, dt
from dwd_behavior_log;
-- 创建时间维度
drop table if exists behavior.dim_date;
create external table dim_date
(
date_id string comment "日期",
week_id string comment "周",
week_day string comment "星期",
day string comment "一个月的第几天",
month string comment "第几个月",
quarter string comment "第几个季度",
year string comment "年度",
is_workday string comment "是否是工作日",
holiday_id string comment "国家法定假日"
)
row format delimited fields terminated by '\t'
lines terminated by '\n'
location '/behavior/dim/dim_date';
加载数据
直接把数据文件dim_date_2022
上传到/behavior/dim/dim_date
即可
查询数据
select * from dim_date
建表语句
-- 创建地区维度表
drop table if exists dim_area;
create external table dim_area
(
city string comment "城市",
province string comment "省份",
area string comment "地区"
)
row format delimited fields terminated by '\t'
lines terminated by '\n'
location '/behavior/dim/dim_area';
加载数据
直接把数据文件dim_area
上传到/behavior/dim/dim_area
即可
用户城市分布
drop table if exists ads_user_city;
create external table ads_user_city
(
city string comment "城市",
province STRING comment "省份",
area STRING comment "区域",
dt string comment "日期",
count bigint comment "访问数量"
) COMMENT '用户城市分布'
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
LOCATION '/behavior2/ads/ads_user_city';
insert into ads_user_city
select t1.city,t2.province,t2.area,t1.dt,count(1)
from dws_behavior_log t1 join dim_area t2 on t1.city=t2.city
group by t1.city, t2.province,t2.area, t1.dt;
每个网站的上网的模式
drop table if exists ads_visit_type;
create external table ads_visit_type
(
url string comment "访问地址",
type string comment "访问模式 4G 5G Wifi",
dt STRING comment "日期",
month String comment "月度",
quarter String comment "季度",
count bigint comment "统计数量"
)comment "网站访问的上网模式分布"
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
location "/behavior2/ads/ads_visit_type";
insert overwrite table ads_visit_type
select t1.url, t1.type,t1.dt, t2.month, t2.quarter, count(1)
from dws_behavior_log t1 join dim_date t2 on t1.dt=t2.date_id
group by t1.url, t1.type, t1.dt, t2.month, t2.quarter;
每个网站的上网类型
drop table if exists ads_visit_mode;
create external table ads_visit_mode
(
url string comment "访问地址",
device_type string comment "上网模式 移动 pc",
dt string comment "上网日期",
count bigint comment "统计数量"
) comment "网站的上网模式分布"
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
location "/behavior2/ads/ads_visit_mode";
insert into table ads_visit_mode
select url, device_type,dt, count(1)
from dws_behavior_log
group by url, device_type,dt;