首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >大数据项目实训之Hive环境集成

大数据项目实训之Hive环境集成

作者头像
张哥编程
发布2024-12-07 16:29:17
发布2024-12-07 16:29:17
21600
代码可运行
举报
文章被收录于专栏:云计算linux云计算linux
运行总次数:0
代码可运行

Hive环境集成

Hive环境配置

Hive是数据仓库中最常用的一个组件, 但是第一代的Hive的执行引擎是MapReduce,运行起来比较慢, 后面Hive的执行引擎用的比较多的有Tez,Spark

Hive on Spark 核心组件是Hive, 只是把运行的执行引擎替换为了Spark内存计算框架, 提高的程序运行的效率

其中Hive主要负责数据的存储以及SQL语句的解析

Spark on Hive 核心组件是Spark, 只是把Spark的的数据存储使用Hive以及元数据管理使用Hive, Spark负责SQL的解析并且进行计算

在这里我们采用Hive-on-Spark的设计架构

安装Hive环境

使用编译好的源码软件
代码语言:javascript
代码运行次数:0
运行
复制
# 上传安装文件
apache-hive-3.1.2-bin.tar.gz
# 解压到指定目录
tar -zxvf  apache-hive-3.1.2-bin.tar.gz -C /bigdata/server/
# 创建软链接
ln -s apache-hive-3.1.2-bin  hive
配置环境变量
代码语言:javascript
代码运行次数:0
运行
复制
# vim /etc/profile.d/custom_env.sh
## hive
export HIVE_HOME=/bigdata/server/hive
export PATH=$PATH:$HIVE_HOME/bin

同步环境变量 xsync /etc/profile.d/custom_env.sh

加载环境变量 source /etc/profile

修改配置文件

在/bigdata/server/hive下的conf目录下创建

创建配置文件 hive-site.xml

代码语言:javascript
代码运行次数:0
运行
复制
<configuration>
    <-- 元数据存储的数据库配置 -->
    <property>
        <name>javax.jdo.option.ConnectionURL</name>
        <value>jdbc:mysql://biz01:3306/hive?useSSL=false</value>
    </property>

    <property>
        <name>javax.jdo.option.ConnectionDriverName</name>
        <value>com.mysql.jdbc.Driver</value>
    </property>

    <property>
        <name>javax.jdo.option.ConnectionUserName</name>
        <value>root</value>
    </property>

    <property>
        <name>javax.jdo.option.ConnectionPassword</name>
        <value>123456</value>
    </property>
    <-- 数据文件存储的目录配置 -->
    <property>
        <name>hive.metastore.warehouse.dir</name>
        <value>/user/hive/warehouse</value>
    </property>
    <-- 去掉metastore的校验 -->
    <property>
        <name>hive.metastore.schema.verification</name>
        <value>false</value>
    </property>

    <-- 设置thrift的访问端口 hiveserver2 -->
    <property>
        <name>hive.server2.thrift.port</name>
        <value>10000</value>
    </property>
    <-- 设置hiveserver2绑定的主机 -->
    <property>
        <name>hive.server2.thrift.bind.host</name>
        <value>hadoop03</value>
    </property>
   <-- 禁用权限认证 -->
    <property>
        <name>hive.metastore.event.db.notification.api.auth</name>
        <value>false</value>
    </property>
    <-- hive客户端配置, 显示表头信息 -->
    <property>
        <name>hive.cli.print.header</name>
        <value>true</value>
    </property>
     <-- hive客户端配置, 显示当前数据库 -->
    <property>
        <name>hive.cli.print.current.db</name>
        <value>true</value>
    </property>
</configuration>
初始化元数据信息
代码语言:javascript
代码运行次数:0
运行
复制
schematool -initSchema -dbType mysql -verbose
代码语言:javascript
代码运行次数:0
运行
复制
-- 解决元数据中文乱码
# 设置注释中文乱码的问题 在MySQL的元数据库设置
alter table COLUMNS_V2 modify column COMMENT varchar(256) character set utf8;
alter table TABLE_PARAMS modify column PARAM_VALUE varchar(4000) character set utf8;
alter table PARTITION_PARAMS modify column PARAM_VALUE varchar(4000) character set utf8;
alter table PARTITION_KEYS modify column PKEY_COMMENT varchar(4000) character set utf8;
alter table INDEX_PARAMS modify column PARAM_VALUE varchar(4000) character set utf8;
测试环境

启动hive客户端

代码语言:javascript
代码运行次数:0
运行
复制
bin/hive

创建一张测试表

代码语言:javascript
代码运行次数:0
运行
复制
create table student(id int, name string);

通过insert插入测试数据

代码语言:javascript
代码运行次数:0
运行
复制
insert into student values(100,'wolf');

Spark环境配置

上传安装文件

spark-3.0.0-bin-without-hadoop.tgz

解压软件
代码语言:javascript
代码运行次数:0
运行
复制
tar -zxvf spark-3.0.0-bin-without-hadoop.tgz -C /bigdata/server/
# 
mv spark-3.0.0-bin-without-hadoop  spark
修改配置文件

默认是spark-env.sh.template

代码语言:javascript
代码运行次数:0
运行
复制
vim conf/spark-env.sh

# spark-on 配置
export HADOOP_CONF_DIR=/bigdata/server/hadoop/etc/hadoop
export YARN_CONF_DIR=/bigdata/server/hadoop/etc/hadoop
# spark的classpath依赖配置
export SPARK_DIST_CLASSPATH=$(/bigdata/server/hadoop/bin/hadoop classpath)
配置历史服务器
代码语言:javascript
代码运行次数:0
运行
复制
#直接在conf下创建
vim  spark-defaults.conf

# 默认提交到yarn集群运行
spark.master=yarn 
spark.eventLog.enabled=true
spark.eventLog.dir=hdfs://hadoop01:8020/spark/log
spark.executor.memory=1g
spark.driver.memory=1g

在hdfs上创建历史日志存放目录

代码语言:javascript
代码运行次数:0
运行
复制
hdfs dfs -mkdir -p /spark/log
配置环境变量
代码语言:javascript
代码运行次数:0
运行
复制
# vim /etc/profile.d/custom_env.sh
# spark
export SPARK_HOME=/bigdata/server/spark
export PATH=$PATH:$SPARK_HOME/bin

同步环境变量 xsync /etc/profile.d/custom_env.sh

加载环境变量 source /etc/profile

测试运行
代码语言:javascript
代码运行次数:0
运行
复制
# 在spark目录下,提交示例程序
spark-submit \
  --class org.apache.spark.examples.SparkPi \
  --master yarn \
  examples/jars/spark-examples_2.12-3.0.0.jar \
  10
代码语言:javascript
代码运行次数:0
运行
复制
#还在spark目录下执行
>bin/spark-shell
然后出来scala>执行下面语句
sc.textFile("hdfs://hadoop01/input/").flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _).collect()

Hive on Spark

上传spark的依赖到hdfs

spark下的jars下的所有jars

代码语言:javascript
代码运行次数:0
运行
复制
# 这里需要上传spark纯净的jar包目录(不含hadoop的jar包)
hdfs dfs -put  spark/jars/*  /spark/jars/
修改hive的配置文件
代码语言:javascript
代码运行次数:0
运行
复制
vim conf/hive-site.xml	
<!--Spark依赖位置(注意:端口号8020必须和namenode的端口号一致)-->
    <property>
        <name>spark.yarn.jars</name>
        <value>hdfs://hadoop01:8020/spark/jars/*</value>
    </property>
    <!--Hive执行引擎-->
    <property>
        <name>hive.execution.engine</name>
        <value>spark</value>
    </property>
    <!--Hive和Spark连接超时时间-->
    <property>
        <name>hive.spark.client.connect.timeout</name>
        <value>10000ms</value>
    </property>
启动测试

启动hive客户端

代码语言:javascript
代码运行次数:0
运行
复制
bin/hive

创建一张测试表

代码语言:javascript
代码运行次数:0
运行
复制
create table student(id int, name string);

通过insert插入测试数据

代码语言:javascript
代码运行次数:0
运行
复制
insert into student values(100,'wolf');
image-20220829143810754.png
image-20220829143810754.png

使用开发工具Datagrip连接Hive

代码语言:javascript
代码运行次数:0
运行
复制
# 使用工具连接Hive
mkdir /bigdata/server/hive/logs
nohup bin/hiveserver2 > /bigdata/server/hive/logs/hive.log  2>&1 &
#hive下的
bin/beeline
#执行之
beeline>!connect jdbc:hive2://localhost:10000
输入用户名和密码

#使用idea链接后
show databases;
select  * from student;
image-20221009205030858.png
image-20221009205030858.png

数据仓库建设

ODS源数据层

  • 建表语句
代码语言:javascript
代码运行次数:0
运行
复制
-- 创建电信用户行为日志表
drop table if exists ods_behavior_log;
create external table ods_behavior_log
(
    line string
)
partitioned by (dt string)
location '/behavior/ods/ods_behavior_log';
  • 加载数据
代码语言:javascript
代码运行次数:0
运行
复制
load data inpath '/behavior/origin/log/2022-08-08' into table ods_behavior_log partition (dt = '2022-08-08');
  • 导入数据脚本 在bigdata下新建scripts vim创建:origin_to_ods_init_behavior_log.sh chmod x+x origin_to_ods_init_behavior_log.sh
代码语言:javascript
代码运行次数:0
运行
复制
#!/bin/bash
if [ $# -ne 2 ]; then
	echo "useage origin_to_ods_init_behavior_log.sh start_date end_date"
	exit
fi
EXPORT_START_DATE=$1
EXPORT_END_DATE=$2
i=$EXPORT_START_DATE
while [[ $i < `date -d "+1 day $EXPORT_END_DATE" +%Y-%m-%d` ]]
do
SQL="load data inpath '/sb/origin/log/start/dt=$i' into table wolf.ods_start_log partition(dt='$i');"
hive -e "$SQL"
i=`date -d "+1 day $i" +%Y-%m-%d`
done

创建udf函数

  • 定义udf函数
代码语言:javascript
代码运行次数:0
运行
复制
public class UrlHandlerUdf  extends GenericUDF {
    @Override
    public ObjectInspector initialize(ObjectInspector[] objectInspectors) throws UDFArgumentException {
        // 参数长度判断
        if(objectInspectors.length != 1){
            throw new UDFArgumentLengthException("传入的数据参数的长度不正确!");
        }
        // 判断输入参数的类型
        if(!objectInspectors[0].getCategory().equals(ObjectInspector.Category.PRIMITIVE)){
            throw new UDFArgumentTypeException(0,"输入的参数类型不正确!!!");
        }
        return PrimitiveObjectInspectorFactory.javaStringObjectInspector;
    }

    @Override
    public Object evaluate(DeferredObject[] deferredObjects) throws HiveException {
        if(deferredObjects[0].get() == null){
            return "" ;
        }
        String data = deferredObjects[0].get().toString();
        int index = data.indexOf("?");
        if(index > 0 ){
            data = data.substring(0,index);
        }
        if (data.startsWith("https://")){
            data=data.replaceFirst("https://","http://");
        }
        return new Text(data.getBytes(StandardCharsets.UTF_8));
    }
}
代码语言:javascript
代码运行次数:0
运行
复制
public class Ip2Loc extends GenericUDF {
    public List<Map<String,String>> mapList = new ArrayList<>();
    static {
        String host = "192.168.113.145";
        int port = 6379;
        Jedis jedis = new Jedis(host, port);
        Set<String> areas = jedis.smembers("areas");
        for (String area : areas) {
            JSONObject jsonObject = JSON.parseObject(area);
            Map<String,String> map = new HashMap<>();
            map.put("city",jsonObject.getString("city"));
            map.put("province",jsonObject.getString("province"));
            mapList.add(map);
        }
        // 把map数据写入到文件
    }
    public Ip2Loc 
    // 初始化参数判断
    @Override
    public ObjectInspector initialize(ObjectInspector[] objectInspectors) throws UDFArgumentException {
        // 参数长度判断
        if(objectInspectors.length != 1){
            throw new UDFArgumentLengthException("传入的数据参数的长度不正确!");
        }
        // 判断输入参数的类型
        if(!objectInspectors[0].getCategory().equals(ObjectInspector.Category.PRIMITIVE)){
            throw new UDFArgumentTypeException(0,"输入的参数类型不正确!!!");
        }
        return PrimitiveObjectInspectorFactory.javaStringObjectInspector;
    }

    // 返回一个字符串 广东省|广州市
    @Override
    public Object evaluate(DeferredObject[] deferredObjects) throws HiveException {

        if(deferredObjects[0].get() == null){
            return "" ;
        }
        int index = (int) (Math.random() * mapList.size());
        Text new_str = new Text((mapList.get(index).get("city")+"_"+(mapList.get(index).get("province"))).getBytes(StandardCharsets.UTF_8));
        return new_str;
    }

    @Override
    public String getDisplayString(String[] strings) {
        return "";
    }

}
  • 打包文件hive_udf_custom-1.0.0.jar 并且上传到指定目录(spark/jars)

  • 上传对应的依赖到hive的安装目录的lib/ jedis-3.3.0.jar commons-pool2-2.6.2.jar fastjson2-2.0.1.jar hive_udf_custom-1.0.0.jar
  • 注册全局函数
代码语言:javascript
代码运行次数:0
运行
复制
create function get_city_by_ip
    as 'cn.wolfcode.udf.Ip2Loc' using jar 'hdfs://hadoop01:8020//spark/jars/hive_udf_custom-1.0.0.jar';
代码语言:javascript
代码运行次数:0
运行
复制
create function url_trans_udf
    as 'cn.wolfcode.udf.UrlHandlerUdf' using jar 'hdfs://hadoop01:8020//spark/jars/hive_udf_custom-1.0.0.jar';
  • 测试函数
代码语言:javascript
代码运行次数:0
运行
复制
select  get_city_by_ip("192.168.113.1")
select url_trans_udf("http://www.baidu.com?name=kw")

DWD明细数据层

  • 创建dwd表数据
代码语言:javascript
代码运行次数:0
运行
复制
-- 创建dwd明细表数据
-- 获取到城市, 省份
-- 获取到url的资源路径  去掉查询参数
-- 定义表
DROP TABLE IF EXISTS dwd_behavior_log;

CREATE EXTERNAL TABLE dwd_behavior_log
(
    `client_ip`   STRING COMMENT '客户端IP',
    `device_type` STRING COMMENT '设备类型',
    `type`        STRING COMMENT '上网类型 4G 5G WiFi',
    `device`      STRING COMMENT '设备ID',
    `url`         STRING COMMENT '访问的资源路径',
    `city`        STRING COMMENT '城市',
    `ts`          bigint comment "时间戳"
) COMMENT '页面启动日志表'
    PARTITIONED BY (`dt` STRING)
    STORED AS ORC
    LOCATION '/behavior/dwd/dwd_behavior_log'
    TBLPROPERTIES ("orc.compress" = "snappy");
select * from dwd_behavior_log;
  • 加载数据
代码语言:javascript
代码运行次数:0
运行
复制
-- 设置支持动态分区处理
set hive.exec.dynamic.partition.mode=nonstrict;
insert overwrite table dwd_behavior_log partition (dt)
select get_json_object(line, '$.client_ip'),
       get_json_object(line, '$.device_type'),
       get_json_object(line, '$.type'),
       get_json_object(line, '$.device'),
       url_trans_udf(get_json_object(line, '$.url')),
       split(get_city_by_ip(get_json_object(line, '$.client_ip')),"_")[0],
       get_json_object(line, '$.time'),
       dt
from ods_behavior_log;

DWS宽表汇总数据

  • 建表语句
代码语言:javascript
代码运行次数:0
运行
复制
-- 定义表
DROP TABLE IF EXISTS dws_behavior_log;
CREATE EXTERNAL TABLE dws_behavior_log
(
    `client_ip`   STRING COMMENT '客户端IP',
    `device_type` STRING COMMENT '设备类型',
    `type`        STRING COMMENT '上网类型 4G 5G WiFi',
    `device`      STRING COMMENT '设备ID',
    `url`         STRING COMMENT '访问的资源路径',
    `city`        STRING COMMENT '城市',
    `ts`          bigint comment "时间戳"
) COMMENT '页面启动日志表'
    PARTITIONED BY (`dt` STRING)
    STORED AS ORC
    LOCATION '/behavior/dws/dws_behavior_log'
    TBLPROPERTIES ("orc.compress" = "snappy");
  • 加载数据
代码语言:javascript
代码运行次数:0
运行
复制
--装载数据
insert overwrite table dws_behavior_log partition (dt)
select client_ip, device_type, type, device, url, city, ts, dt
from dwd_behavior_log;

创建维度表

时间维度表
  • 建表语句
代码语言:javascript
代码运行次数:0
运行
复制
-- 创建时间维度
drop table if exists behavior.dim_date;
create external table dim_date
(
    date_id string comment  "日期",
    week_id string comment  "周",
    week_day string  comment  "星期",
    day string comment "一个月的第几天",
    month string comment "第几个月",
    quarter string comment "第几个季度",
    year string comment "年度",
    is_workday string comment "是否是工作日",
    holiday_id string comment "国家法定假日"
)
row format delimited fields terminated by '\t'
lines terminated by '\n'
location '/behavior/dim/dim_date';

加载数据

直接把数据文件dim_date_2022 上传到/behavior/dim/dim_date即可

查询数据

代码语言:javascript
代码运行次数:0
运行
复制
select * from dim_date
地区维度表

建表语句

代码语言:javascript
代码运行次数:0
运行
复制
-- 创建地区维度表
drop table if exists dim_area;
create external table dim_area
(
    city string comment "城市",
    province string comment "省份",
    area string comment "地区"
)
row format delimited fields terminated by '\t'
lines terminated by '\n'
location '/behavior/dim/dim_area';

加载数据

直接把数据文件dim_area 上传到/behavior/dim/dim_area即可

ADS层统计数据

用户城市分布

  • 创建表语句
代码语言:javascript
代码运行次数:0
运行
复制
drop table if exists ads_user_city;
create external table ads_user_city
(
    city    string comment "城市",
    province STRING comment "省份",
    area     STRING comment "区域",
    dt      string comment "日期",
    count   bigint comment "访问数量"
) COMMENT '用户城市分布'
    ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
    LOCATION '/behavior2/ads/ads_user_city';
  • 插入统计数据
代码语言:javascript
代码运行次数:0
运行
复制
insert into ads_user_city
select  t1.city,t2.province,t2.area,t1.dt,count(1)
from dws_behavior_log t1 join dim_area  t2 on t1.city=t2.city
group by t1.city, t2.province,t2.area, t1.dt;

每个网站的上网的模式

  • 建表语句
代码语言:javascript
代码运行次数:0
运行
复制
drop table if exists ads_visit_type;
create external table ads_visit_type
(
    url string comment "访问地址",
    type string comment  "访问模式 4G 5G Wifi",
    dt STRING comment "日期",
    month String comment "月度",
    quarter String comment "季度",
    count bigint comment "统计数量"
)comment "网站访问的上网模式分布"
    ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
    location "/behavior2/ads/ads_visit_type";
  • 插入数据
代码语言:javascript
代码运行次数:0
运行
复制
insert overwrite table  ads_visit_type
select t1.url, t1.type,t1.dt, t2.month, t2.quarter, count(1)
from dws_behavior_log t1 join dim_date t2 on t1.dt=t2.date_id
group by t1.url, t1.type, t1.dt, t2.month, t2.quarter;

每个网站的上网类型

  • 建表语句
代码语言:javascript
代码运行次数:0
运行
复制
drop table if exists ads_visit_mode;
create external table ads_visit_mode
(
    url         string comment "访问地址",
    device_type string comment "上网模式 移动 pc",
    dt          string comment "上网日期",
    count       bigint comment "统计数量"
) comment "网站的上网模式分布"
    ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
    location "/behavior2/ads/ads_visit_mode";
  • 插入数据
代码语言:javascript
代码运行次数:0
运行
复制
insert into table ads_visit_mode
select url, device_type,dt, count(1)
from dws_behavior_log
group by url, device_type,dt;
本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2024-05-23,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • Hive环境集成
    • Hive环境配置
    • 安装Hive环境
      • 使用编译好的源码软件
      • 配置环境变量
      • 修改配置文件
      • 初始化元数据信息
      • 测试环境
    • Spark环境配置
      • 上传安装文件
      • 解压软件
      • 修改配置文件
      • 配置历史服务器
      • 配置环境变量
      • 测试运行
    • Hive on Spark
      • 上传spark的依赖到hdfs
      • 修改hive的配置文件
      • 启动测试
  • 使用开发工具Datagrip连接Hive
  • 数据仓库建设
    • ODS源数据层
    • 创建udf函数
    • DWD明细数据层
    • DWS宽表汇总数据
    • 创建维度表
      • 时间维度表
      • 地区维度表
    • ADS层统计数据
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档