前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >电力行业数据湖技术方案Flink、Hudi、Hive on Spark案例全攻略记录及Hive查询MOR rt表异常修复

电力行业数据湖技术方案Flink、Hudi、Hive on Spark案例全攻略记录及Hive查询MOR rt表异常修复

作者头像
用户9421738
发布2024-09-12 19:27:33
970
发布2024-09-12 19:27:33
举报
文章被收录于专栏:大数据从业者

前言

本文主要记录电力行业客户的数据湖技术方案实践案例,方案概括为基于FlinkSQL+Hudi流式入湖、同步表元数据到Hive,基于Hive catalog统一元数据管理,然后基于Hive on Spark离线分析计算。该方案主要考虑与已有Hive数据仓库、数据解析、报表应用等结合。欢迎关注微信:大数据从业者

组件版本信息

Hadoop

3.1.1

Hive

3.1.3

Spark

3.3.2

Flink

1.17.2

Hudi

0.14.1

Spark编译部署

代码语言:javascript
复制
wget https://github.com/apache/spark/archive/refs/tags/v3.3.2.tar.gz

tar -xvf v3.3.2.tar.gz

cd spark-3.3.2/

修改pom.xml中maven.version为自己环境已部署的版本

代码语言:javascript
复制
./dev/make-distribution.sh -tgz -Phive -Phive-thriftserver -Pyarn

涉及修改源码,可以只编译指定模块,如下:

代码语言:javascript
复制
./build/mvn -pl :spark-streaming_2.12 clean package

./build/mvn -Phive-thriftserver -DskipTests clean package

之前文章已经记录Spark整合Hadoop3与Hive3,本文不再重复赘述!

Hive编译部署

代码语言:javascript
复制
wget  https://github.com/apache/hive/archive/refs/tags/rel/release-3.1.3.tar.gz

tar -xvf release-3.1.3.tar.gz

cd hive-rel-release-3.1.3/

mvn clean package -DskipTests -Pdist -Dmaven.test.skip=true -T 1C
代码语言:javascript
复制
cp packaging/target/apache-hive-3.1.3-bin.tar.gz /home/myHadoopCluster/

cd  /home/myHadoopCluster/

tar -xvf apache-hive-3.1.3-bin.tar.gz

cd apache-hive-3.1.3-bin/conf

hive-env.sh内容

代码语言:javascript
复制
vim hive-env.sh

if [ "$SERVICE" = "metastore" ]; then

  export HADOOP_HEAPSIZE=8096 # Setting for HiveMetastore

  export HADOOP_OPTS="$HADOOP_OPTS -Xloggc:/var/log/hive313/hivemetastore-gc-%t.log -XX:+UseG1GC -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+PrintGCCause -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=10 -XX:GCLogFileSize=10M -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/var/log/hive313/hms_heapdump.hprof -Dhive.log.dir=/var/log/hive313 -Dhive.log.file=hivemetastore.log"    

fi
       
if [ "$SERVICE" = "hiveserver2" ]; then

  export HADOOP_HEAPSIZE=8096 # Setting for HiveServer2 and Client

  export HADOOP_OPTS="$HADOOP_OPTS -Xloggc:/var/log/hive313/hiveserver2-gc-%t.log -XX:+UseG1GC -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+PrintGCCause -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=10 -XX:GCLogFileSize=10M -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/var/log/hive313/hs2_heapdump.hprof -Dhive.log.dir=/var/log/hive313 -Dhive.log.file=hiveserver2.log"

fi

HADOOP_HOME=/usr/hdp/3.1.5.0-152/hadoop

export HIVE_HOME=/home/myHadoopCluster/apache-hive-3.1.3-bin

export HIVE_CONF_DIR=/home/myHadoopCluster/apache-hive-3.1.3-bin/conf

export METASTORE_PORT=19083          

hive-site.xml内容

代码语言:javascript
复制
vim hive-site.xml
<?xml version="1.0" encoding="UTF-8" standalone="no"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>

<configuration>
  <!--zookeeper start-->
  <property>
    <name>hive.server2.support.dynamic.service.discovery</name>
    <value>true</value>
  </property>
  <property>
    <name>hive.zookeeper.quorum</name>
    <value>felixzh:2181</value>
  </property>
  <property>
    <name>hive.server2.zookeeper.namespace</name>
    <value>hiveserver313</value>
  </property>
  <!--zookeeper end-->

  <!--metastore start-->
  <property>
    <name>hive.metastore.uris</name>
    <value>thrift://felixzh:19083</value>
  </property>
  <property>
    <name>hive.metastore.warehouse.dir</name>
    <value>/user/hive313/warehouse</value>
  </property>
   <property>
    <name>hive.metastore.failure.retries</name>
    <value>10</value>
  </property>
  <property>
    <name>hive.metastore.connect.retries</name>
    <value>10</value>
  </property>
<!--metastore end-->

 <!--hiveserver start-->
  <property>
    <name>hive.server2.thrift.bind.host</name>
    <value>felixzh</value>
  </property>
  <property>
      <name>hive.server2.thrift.port</name>
      <value>10010</value>
  </property>
  <property>
    <name>hive.server2.webui.port</name>
    <value>10012</value>
  </property>
 <!--hiveserver end-->

 <!--postgresql start-->
  <property>
    <name>javax.jdo.option.ConnectionDriverName</name>
    <value>org.postgresql.Driver</value>
  </property>
  <property>
    <name>javax.jdo.option.ConnectionURL</name>
    <value>jdbc:postgresql://felixzh:5432/hive313?createDatabaseIfNotExist=true</value>
  </property>
  <property>
    <name>javax.jdo.option.ConnectionUserName</name>
    <value>postgres</value>
  </property>
  <property>
    <name>javax.jdo.option.ConnectionPassword</name>
    <value>123456</value>
  </property>
 <!--postgresql end-->

 <!--spark start-->
  <property>
    <name>spark.home</name>
    <value>/home/myHadoopCluster/spark-3.3.2-bin-3.3.2</value>
  </property>
 <!--spark end-->
</configuration>

hive-log4内容

代码语言:javascript
复制
cp hive-log4j2.properties.template hive-log4j2.properties          

创建Hive元数据库

代码语言:javascript
复制
psql -h felixzh -d postgres -U postgres -p 5432

postgres=# create database hive313;

CREATE DATABASE          

初始化Hive元数据库

代码语言:javascript
复制
./schematool -dbType postgres –initSchema

确保Hive与Hadoop使用相同版本guava

代码语言:javascript
复制
rm –rf /home/myHadoopCluster/apache-hive-3.1.3-bin/lib/guava*jar

cp guava-28.0-jre.jar /home/myHadoopCluster/apache-hive-3.1.3-bin/lib/

否则异常如下:

代码语言:javascript
复制
java.lang.NoSuchMethodError: com.google.common.base.Preconditions.checkArgument(ZLjava/lang/String;Ljava/lang/Object;)V             

确保Hive与Spark使用相同版本libthrift

代码语言:javascript
复制
rm –rf /home/myHadoopCluster/apache-hive-3.1.3-bin/lib/libthrift*jar

cp libthrift-0.12.0.jar /home/myHadoopCluster/apache-hive-3.1.3-bin/lib/

否则异常如下:

代码语言:javascript
复制
org.apache.thrift.transport.TTransportException: null

        at org.apache.thrift.transport.TIOStreamTransport.read(TIOStreamTransport.java:132) ~[hive-exec-3.1.3.jar:3.1.3]          

启动服务

代码语言:javascript
复制
nohup ./hive --service metastore &

nohup ./hive --service hiveserver2 &

Beeline登录验证

方法1:HA

代码语言:javascript
复制
./beeline -u 'jdbc:hive2://felixzh:2181/;serviceDiscoveryMode=zooKeeper;zooKeeperNamespace=hiveserver313' -n hive

方法2:非HA

代码语言:javascript
复制
./beeline -u 'jdbc:hive2://felixzh:10010' -n hive    

Hive on Spark功能验证

代码语言:javascript
复制
set hive.execution.engine=spark;

create table test(name string);

insert into test values('felixzh');

select * from test;

Flink编译部署

代码语言:javascript
复制
wget https://github.com/apache/flink/archive/refs/tags/release-1.17.2.tar.gz

tar -xvf release-1.17.2.tar.gz

cd flink-release-1.17.2/

mvn clean package -DskipTests -Dfast -Dhadoop.version=3.1.1 -Dhive.version=3.1.3 -Pscala-2.12 -T 1C
代码语言:javascript
复制
cp -r flink-dist/target/flink-1.17.2-bin/flink-1.17.2/ /home/myHadoopCluster/    

cp flink-connectors/flink-sql-connector-hive-3.1.3/target/flink-sql-connector-hive-3.1.3_2.12-1.17.2.jar /home/myHadoopCluster/flink-1.17.2/lib/

ln -s /usr/hdp/3.1.5.0-152/hadoop-mapreduce/hadoop-mapreduce-client-core-3.1.1.3.1.5.0-152.jar /home/myHadoopCluster/flink-1.17.2/lib/hadoop-mapreduce-client-core.jar          

声明环境变量

代码语言:javascript
复制
vi bin/config.sh

export HADOOP_CLASSPATH=/usr/hdp/3.1.5.0-152/hadoop-hdfs/*:/usr/hdp/3.1.5.0-152/hadoop-hdfs/lib/*:/usr/hdp/3.1.5.0-152/hadoop-yarn/*:/usr/hdp/3.1.5.0-152/hadoop-yarn/lib/*:/usr/hdp/3.1.5.0-152/hadoop/*:/usr/hdp/3.1.5.0-152/hadoop-mapreduce/*

export HADOOP_CONF_DIR=/etc/hadoop/conf/
代码语言:javascript
复制
vi conf/flink-conf.yaml

state.backend.type: rocksdb

state.checkpoints.dir: hdfs:///flink/flink-ck

state.backend.incremental: true              

整合Hive Catalog

代码语言:javascript
复制
vi  InitHiveCatalog

CREATE CATALOG HiveCatalog

WITH (

  'type' = 'hive',

  'hive-conf-dir' = '/home/myHadoopCluster/apache-hive-3.1.3-bin/conf/'

);

use catalog HiveCatalog   

启动FlinkSQL Client

代码语言:javascript
复制
./bin/sql-client.sh -i InitHiveCatalog

show tables

可以看到上文通过Hive beeline创建的测试表。

Hudi编译部署

代码语言:javascript
复制
wget https://github.com/apache/hudi/archive/refs/tags/release-0.14.1.tar.gz

tar -xvf release-0.14.1.tar.gz

cd hudi-release-0.14.1

mvn clean package -DskipTests -Dfast -Dspark3.3 -Dscaka-2.12 -Dflink1.17 -Pflink-bundle-shade-hive3 -Drat.skip=true -Dcheckstyle.skip         
代码语言:javascript
复制
cp packaging/hudi-flink-bundle/target/hudi-flink1.17-bundle-0.14.1.jar /home/myHadoopCluster/flink-1.17.2/lib/

cp packaging/hudi-hadoop-mr-bundle/target/hudi-hadoop-mr-bundle-0.14.1.jar /home/myHadoopCluster/apache-hive-3.1.3-bin/lib/    

cp packaging/hudi-hive-sync-bundle/target/hudi-hive-sync-bundle-0.14.1.jar /home/myHadoopCluster/apache-hive-3.1.3-bin/lib/          

Flink流写Hudi

启动Flink yarn-session集群

代码语言:javascript
复制
./bin/yarn-session.sh -jm 8G -tm 16G –d

./bin/sql-client.sh -i InitHiveCatalog

set execution.target=yarn-session;

set yarn.application.id=<上述yarn-session applicationId>;

set execution.checkpointing.inerval=30000;          

数据源表

代码语言:javascript
复制
create table if not exists datagen1

(id int, data string, ts timestamp(3), partitionId int)

with(

  'connector' = 'datagen',

  'number-of-rows' = '5000000',

  'rows-per-second' = '10000',

  'fields.partitionId.min' = '1',    

  'fields.partitionId.max' = '2'

);        

数据目的表

代码语言:javascript
复制
create table if not exists mor1

(id int primary key not enforced, data varchar(20), ts timestamp(3), `partition` int) partitioned by(`partition`)

with(

'connector' = 'hudi',

'path' = 'hdfs:///flink/mor1',

'table.type' = 'MERGE_ON_READ',

'write.operation' = 'upsert',

'hive_sync.enable' = 'true',

'hive_sync.metastore.uris' = 'thrift://felixzh:19083',

'hive_sync.table' = 'mor1_hive',

'compaction.schedule.enabled' = 'true',

'compaction.async.enabled' = 'true'

);          

提交作业

代码语言:javascript
复制
insert into mor1 select id,data,ts,partitionId from datagen1;

Hive查询Hudi

代码语言:javascript
复制
1.更新hudi jar到Hive/lib,需要重启Hive服务。

2.如果使用Hive on mr请自行将hudi相关jar(hudi-hadoop-mr-bundle-0.14.1.jar、hudi-hive-sync-bundle-0.14.1.jar)更新到mapred-site.xml配置参数mapreduce.application.framework.path所指定的hdfs目录,比如/hdp/apps/3.1.5.0-152/mapreduce/mapreduce.tar.gz

3.如果使用Hive n spark自行将hudi相关jar(hudi-spark3.3-bundle_2.12-0.14.1.jar)更新到spark/jars;如果指定了spark.yarn.archive或者spark.yarn.jars,需要同步更新。另外,需要将hive-cli-3.1.3.jar、hive-exec-3.1.3.jar需要放入spark/jars,删除原本的hive*jar          

Hive on MR相关参数如下:

代码语言:javascript
复制
set hive.execution.engine=mr;

set hive.input.format=org.apache.hudi.hadoop.hive.HoodieCombineHiveInputFormat;

set hive.vectorized.execution.enabled=false;

set mapreduce.map.memory.mb=8096;

set mapreduce.map.java.opts=-Xmx8096m;

set mapreduce.reduce.memory.mb=8096;

set maprdeuce.reduce.java.opts=-Xmx8096m;          

Hive on MR效果验证

Hive on Spark相关参数如下:

代码语言:javascript
复制
set hive.execution.engine=spark;

set hive.input.format=org.apache.hudi.hadoop.hive.HoodieCombineHiveInputFormat;

set hive.vectorized.execution.enabled=false;

set spark.task.maxFailures=1;

set spark.executor.instances=10;

set spark.executor.memory=2G;

set spark.executor.cores=1;

set spark.default.parallelism=3;              

Hive on Spark效果验证

遇到的问题

代码语言:javascript
复制
java.lang.ClassCastException: org.apache.hudi.hadoop.hive.HoodieCombineRealtimeFileSplit cannot be cast to org.apache.hadoop.hive.shims.HadoopShimsSecure$InputSplitShim

这个属于原生Bug,我已经修复并提交到社区,目前已经合入主线。

代码语言:javascript
复制
https://issues.apache.org/jira/browse/HUDI-8104    
代码语言:javascript
复制
IllegalArgumentException: HoodieRealtimeRecordReader can only work on RealtimeSplit and not with hdfs://felixzh1:8020/flink/mor1/

这个需要指定hive.input.format参数

代码语言:javascript
复制
set hive.input.format=org.apache.hudi.hadoop.hive.HoodieCombineHiveInputFormat;
代码语言:javascript
复制
ClassCastException: org.apache.hadoop.io.ArrayWritable cannot be cast to org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch

这个需要关闭向量化

代码语言:javascript
复制
set hive.vectorized.execution.enabled=false;
本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2024-09-07,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 大数据从业者 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 前言
  • Hive编译部署
  • Flink编译部署
  • Hudi编译部署
  • Flink流写Hudi
  • Hive查询Hudi
相关产品与服务
腾讯云 BI
腾讯云 BI(Business Intelligence,BI)提供从数据源接入、数据建模到数据可视化分析全流程的BI能力,帮助经营者快速获取决策数据依据。系统采用敏捷自助式设计,使用者仅需通过简单拖拽即可完成原本复杂的报表开发过程,并支持报表的分享、推送等企业协作场景。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档