HiveSQL ->AST(抽象语法树) -> QB(查询块) ->OperatorTree(操作树)->优化后的操作树->mapreduce任务树->优化后的mapreduce任务树
Hive 和数据库除了拥有类似的查询语言,再无类似之处。
1)数据存储位置
Hive 存储在 HDFS 。数据库将数据保存在块设备或者本地文件系统中。
2)数据更新
Hive中不建议对数据的改写。而数据库中的数据通常是需要经常进行修改的,
3)执行延迟
Hive 执行延迟较高。数据库的执行延迟较低。当然,这个是有条件的,即数据规模较小,当数据规模大到超过数据库的处理能力的时候,Hive的并行计算显然能体现出优势。
4)数据规模
Hive支持很大规模的数据计算;数据库可以支持的数据规模较小。
1. 内部表数据由 Hive 自身管理,外部表数据由 HDFS 管理;
2. 内部表数据存储的位置是 hive.metastore.warehouse.dir(默认:/user/hive/warehouse),外部表数据的存储位置由自己制定(如果没有LOCATION,Hive 将在 HDFS 上的/user/hive/warehouse 文件夹下以外部表的表名创建一个文件夹,并将属于这个表的数据存放在这里);
3. 删除内部表会直接删除元数据(metadata)及存储数据;删除外部表仅仅会删除元数据,HDFS 上的文件并不会被删除。
在生产环境下,创建内部表和外部表取决于数据处理和存储的需求。以下是一些常见的情况和最佳实践:
创建内部表:当数据需要经过多次处理和转换后才能被存储时,通常会先创建内部表。内部表将数据存储在Hive的默认文件格式(如ORC、Parquet)中,这些格式通常比其他文件格式(如CSV、JSON)更高效,并且支持更高级的查询和分析操作。
创建外部表:当数据已经在HDFS上以某种格式存储,并且需要将其暴露给其他系统(如Spark、Presto)使用时,通常会创建外部表。外部表不会在Hive的默认文件格式下存储数据,而是在HDFS上直接引用存储的数据文件。这样,数据文件的格式和存储位置可以被其他系统共享和使用,而不需要复制数据。
需要注意的是,创建内部表和外部表的选择也会受到存储成本、数据访问速度和数据安全性等因素的影响。在做出选择之前,需要综合考虑这些因素,并选择最适合特定需求的方法。
1)Order By:全局排序,只有一个Reducer;
2)Sort By:分区内有序;
3)Distrbute By:类似MR中Partition,进行分区,结合sort by使用。
4) Cluster By:当Distribute by和Sorts by字段相同时,可以使用Cluster by方式。Cluster by除了具有Distribute by的功能外还兼具Sort by的功能。但是排序只能是升序排序,不能指定排序规则为ASC或者DESC。
在生产环境中Order By用的比较少,容易导致OOM。在生产环境中Sort By + Distrbute By用的多。
1)date_add、date_sub函数(加减日期)
2)next_day函数(周指标相关)
3)date_format函数(根据格式整理日期)
4)last_day函数(求当月最后一天日期)
5)CONCAT、CONCAT_WS、COLLECT_SET
6)EXPLODE
7)collect_set函数
8)get_json_object解析json函数
9)NVL(表达式1,表达式2)如果表达式1为空值,NVL返回值为表达式2的值,否则返回表达式1的值。
10)split将字符串转化为数组,即:split(‘a,b,c,d’ , ‘,’) ==> [“a”,“b”,“c”,“d”]。
11)coalesce(T v1, T v2, …) 返回参数中的第一个非空值;如果所有值都为 NULL,那么返回NULL。
12)collect_list列出该字段所有的值,不去重 select collect_list(id) from table。
在 Hive 中,用户可以编写自定义函数(User-Defined Functions,UDFs)来实现自己的业务逻辑。根据计算方式的不同,Hive 中的自定义函数主要分为三类:UDF、UDAF 和 UDTF。
UDF(User-Defined Function)是用户自定义的一种标量函数,输入一行数据,输出一个值。 在 HiveQL 查询语句中,可以使用 UDF 函数处理查询中的表达式。例如,可以使用自定义函数将输入的字符串转换为大写或小写,或者对输入的数值进行四舍五入等操作。
UDAF(User-Defined Aggregation Function)是用户自定义的一种聚合函数,输入多行数据,输出一个值。 与 UDF 不同,UDAF 通常需要在多行数据上进行聚合操作,因此其输入参数中包含多行数据。在 HiveQL 查询语句中,可以使用 UDAF 函数对查询结果进行聚合操作。例如,可以使用自定义函数计算平均值或求和等聚合操作。
UDTF(User-Defined Table-Generating Function)是用户自定义的一种表生成函数,输入一行数据,输出多行数据。 与 UDF 和 UDAF 不同,UDTF 生成的不是一个单独的值,而是一个表,因此其输出参数需要包含多个行数据。在 HiveQL 查询语句中,可以使用 UDTF 函数生成新表,以便后续的查询操作。
总之,UDF、UDAF 和 UDTF 是 Hive 中非常重要的自定义函数类型,可以根据具体业务需求编写适合自己的自定义函数,并在 HiveQL 查询语句中灵活地使用它们。
1)写一个类继承(org.apache.hadoop.hive.ql.)UDF类;
2)覆盖方法evaluate();
3)打JAR包;
4)通过hive命令将JAR添加到Hive的类路径:
hive> add jar /home/ubuntu/ToDate.jar;
5)注册函数:
hive> create temporary function xxx as ‘XXX’;
6)使用函数;
7)[可选] drop临时函数;
在 Hive 中,UDF/UDTF 用于自定义函数,可以让用户在 Hive 中扩展 SQL 功能。使用自定义函数可以方便地在 SQL 中实现各种自定义逻辑,从而满足更为复杂的数据处理需求。
以下是一些常见的使用场景:
① 处理复杂的字符串、时间、数据类型转换等功能。
② 实现自定义聚合函数。
③ 处理复杂的逻辑和运算,例如解析 XML、JSON 等格式的数据,或者实现自定义算法。
④ 将 Hive 与其他系统整合,例如通过调用外部系统接口实现数据的查询和转换。
⑤ 引入第三方jar包时,也需要。
总之,通过自定义 UDF/UDTF,可以让用户更加灵活地处理数据,实现更加复杂的数据处理需求。
(1)RANK() 排序相同时会重复,总数不会变
(2)DENSE_RANK() 排序相同时会重复,总数会减少
(3)ROW_NUMBER() 会根据顺序计算
在 Hive 中,OVER() 是用于定义窗口函数的语法,可以对窗口内的数据进行聚合、排序等操作。OVER() 可以出现在聚合函数(如 SUM、COUNT、AVG、MIN、MAX 等)的后面。
OVER() 语法如下:
OVER (
[ PARTITION BY partition_expression1, partition_expression2, ... ]
[ ORDER BY sort_expression1 [ASC|DESC], sort_expression2 [ASC|DESC], ... ]
[ ROWS | RANGE frame_specification ]
)
其中:
– PARTITION BY 子句用于指定窗口分区,即按照哪些列进行分组。
– ORDER BY 子句用于指定窗口排序规则,即按照哪些列进行排序,以及是升序还是降序排列。
– ROWS 和 RANGE 子句用于指定窗口的行或范围大小,从而定义窗口大小。
OVER() 语法的作用是让聚合函数对窗口内的数据进行操作,而不是对整个数据集进行操作。
在Hive的OVER()函数中,常见的参数还有下面这些:
(1)CURRENT ROW:当前行
(2)n PRECEDING:往前n行数据
(3) n FOLLOWING:往后n行数据
(4)UNBOUNDED:起点,UNBOUNDED PRECEDING 表示从前面的起点, UNBOUNDED FOLLOWING表示到后面的终点
(5) LAG(col,n):往前第n行数据
(6)LEAD(col,n):往后第n行数据
(7) NTILE(n):把有序分区中的行分发到指定数据的组中,各个组有编号,编号从1开始,对于每一行,NTILE返回此行所属的组的编号。注意:n必须为int类型。
案例:计算每个销售员上月和这个月的销售额:
SELECT
seller,
date,
sales,
LAG(sales, 1) OVER(PARTITION BY seller ORDER BY month) AS last_month_sales,
sales - LAG(sales, 1) OVER(PARTITION BY seller ORDER BY month) AS sales_growth
FROM sales_table;
如果不指定MapJoin或者不符合MapJoin的条件,那么Hive解析器会将Join操作转换成Common Join,即:在Reduce阶段完成join。容易发生数据倾斜。可以用MapJoin把小表全部加载到内存在map端进行join,避免reducer处理。
1)开启MapJoin参数设置:
(1)设置自动选择Mapjoin set hive.auto.convert.join = true; 默认为true (2)大表小表的阀值设置(默认25M一下认为是小表): set hive.mapjoin.smalltable.filesize=25000000;
2)MapJoin工作机制
首先是Task A,它是一个Local Task(在客户端本地执行的Task),负责扫描小表b的数据,将其转换成一个HashTable的数据结构,并写入本地的文件中,之后将该文件加载到DistributeCache中。
接下来是Task B,该任务是一个没有Reduce的MR,启动MapTasks扫描大表a,在Map阶段,根据a的每一条记录去和DistributeCache中b表对应的HashTable关联,并直接输出结果。
由于MapJoin没有Reduce,所以由Map直接输出结果文件,有多少个Map Task,就有多少个结果文件。
列处理:在SELECT中,只拿需要的列,如果有,尽量使用分区过滤,少用SELECT *。
行处理:在分区剪裁中,当使用外关联时,如果将副表的过滤条件写在Where后面,那么就会先全表关联,之后再过滤。
针对 hive 中表的存储格式通常有 orc 和 parquet,压缩格式一般使用 snappy。
相比与 textfile 格式表,orc 占有更少的存储。因为 hive 底层使用 MR 计算架构,数据流是 hdfs 到磁盘再到 hdfs,而且会有很多次,所以使用 orc 数据格式和 snappy 压缩策略可以降低 IO 读写,还能降低网络传输量,这样在一定程度上可以节省存储,还能提升 hql 任务执行效率;
create table … partitoned by XX …
mapred.min.split.size: 指的是数据的最小分割单元大小;min的默认值是1MB
mapred.max.split.size: 指的是数据的最大分割单元大小;max的默认值是256MB
通过调整max可以起到调整map数的作用,减小max可以增加map数,增大max可以减少map数。
需要提醒的是,直接调整mapred.map.tasks这个参数是没有效果的。
Reduce个数并不是越多越好
(1)过多的启动和初始化Reduce也会消耗时间和资源;
(2)另外,有多少个Reduce,就会有多少个输出文件,如果生成了很多个小文件,那么如果这些小文件作为下一个任务的输入,则也会出现小文件过多的问题;
在设置Reduce个数的时候也需要考虑这两个原则:
处理大数据量利用合适的Reduce数;
使单个Reduce任务处理数据量大小要合适;
(1)动态分区插入数据,产生大量的小文件,从而导致map数量剧增;
(2)reduce数量越多,小文件也越多(reduce的个数和输出文件是对应的);
(3)数据源本身就包含大量的小文件。
(1)在Map执行前合并小文件,减少Map数:CombineHiveInputFormat具有对小文件进行合并的功能(系统默认的格式)。HiveInputFormat没有对小文件合并功能。
set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat
; – 默认值为每个 Map 最大输入大小(这个值决定了合并后文件的数量)
# 一个节点上 split 的至少大小(这个值决定了多个 DataNode 上的文件是否需要合并)
set mapred.max.split.size=256000000; -- 256M
# 一个交换机下 split 的至少大小(这个值决定了多个交换机上的文件是否需要合并)
set mapred.min.split.size.per.node=100000000; -- 100M
set mapred.min.split.size.per.rack=100000000; -- 100M
(2)输出合并小文件
# 默认true,在map-only任务结束时合并小文件
SET hive.merge.mapfiles = true;
# 默认false,在map-reduce任务结束时合并小文件
SET hive.merge.mapredfiles = true;
# 默认256M
SET hive.merge.size.per.task = 268435456;
#当输出文件的平均大小小于16m该值时,启动一个独立的map-reduce任务进行文件merge
SET hive.merge.smallfiles.avgsize = 16777216;
(3)开启JVM重用
set mapreduce.job.jvm.numtasks=10
(4)使用 hive 自带的 concatenate 命令,自动合并小文件使用方法:
#对于非分区表
alter table A concatenate;
#对于分区表
alter table B partition(day=20201224) concatenate;
(5)减少 Reduce 的数量
reduce 的个数决定了输出的文件的个数,所以可以调整 reduce 的个数控制hive表的文件数量。
hive 中的分区函数 distribute by 正好是控制 MR 中 partition 分区的,可以通过设置 reduce 的数量,结合分区函数让数据均衡的进入每个 reduce 即可
:
#设置reduce 的数量有两种方式,第一种是直接设置reduce 个数
set mapreduce.job.reduces=10;
#第二种是设置每个reduce 的大小,Hive 会根据数据总大小猜测确定一个reduce 个数
set hive.exec.reducers.bytes.per.reducer=5120000000; -- 默认是1G,设置为5G
执行以下语句,将数据均衡的分配到reduce中
set mapreduce.job.reduces=10;
--如设置 reduce 数量为 10,使用 rand(), 随机生成一个数
--x % 10,这样数据就会随机进入 reduce 中,防止出现有的文件过大或过小
insert overwrite table A partition(dt)
select
*
from
B
distribute by rand() % 10;
(6)使用 hadoop 的 archive 将小文件归档
Hadoop Archive 简称 HAR,是一个高效地将小文件放入 HDFS 块中的文件存档工具,它能够将多个小文件打包成一个 HAR 文件,这样在减少 namenode 内存使用的同时,仍然允许对文件进行透明的访问。
#用来控制归档是否可用
set hive.archive.enabled=true;
#通知Hive 在创建归档时是否可以设置父目录
set hive.archive.har.parentdir.settable=true;
#控制需要归档文件的大小
set har.partfile.size=1099511627776;
#使用以下命令进行归档:
ALTER TABLE A ARCHIVE PARTITION(dt='2021-05-07', hr='12');
#对已归档的分区恢复为原文件:
ALTER TABLE A UNARCHIVE PARTITION(dt='2021-05-07', hr='12');
注意:归档的分区可以查看不能 insert overwrite,必须先 unarchive
set hive.map.aggr=true;
设置map端输出、中间结果压缩。(不完全是解决数据倾斜的问题,但是减少了IO读写和网络传输,能提高很多效率)
set hive.exec.compress.intermediate=true --启用中间数据压缩
set mapreduce.map.output.compress=true --启用最终数据压缩
set mapreduce.map.outout.compress.codec=…; --设置压缩方式
Fetch抓取是指,Hive中对某些情况的查询可以不必使用MapReduce计算。
例如:SELECT * FROM employees;在这种情况下,Hive可以简单地读取employee对应的存储目录下的文件,然后输出查询结果到控制台。
在hive-default.xml.template文件中hive.fetch.task.conversion默认是more,老版本hive默认是minimal,该属性修改为more以后,在全局查找、字段查找、limit查找等都不走mapreduce。
<property>
<name>hive.fetch.task.conversion</name>
<value>more</value>
<description>
Expects one of [none, minimal, more].
Some select queries can be converted to single FETCH task minimizing latency.
Currently the query should be single sourced not having any subquery and should not have
any aggregations or distincts (which incurs RS), lateral views and joins.
0. none : disable hive.fetch.task.conversion
1. minimal : SELECT STAR, FILTER on partition columns, LIMIT only
2. more : SELECT, FILTER, LIMIT only (support TABLESAMPLE and virtual columns)
</description>
</property>
大多数的Hadoop Job是需要Hadoop提供的完整的可扩展性来处理大数据集的。不过,有时Hive的输入数据量是非常小的。在这种情况下,为查询触发执行任务时消耗可能会比实际job的执行时间要多的多。对于大多数这种情况,Hive可以通过本地模式在单台机器上处理所有的任务。对于小数据集,执行时间可以明显被缩短。
用户可以通过设置hive.exec.mode.local.auto的值为true,来让Hive在适当的时候自动启动这个优化。
set hive.exec.mode.local.auto=true; //开启本地mr
//设置local mr的最大输入数据量,当输入数据量小于这个值时采用local mr的方式,默认为134217728,即128M
set hive.exec.mode.local.auto.inputbytes.max=50000000;
//设置local mr的最大输入文件个数,当输入文件个数小于这个值时采用local mr的方式,默认为4
set hive.exec.mode.local.auto.input.files.max=10;
默认情况下,Map阶段同一Key数据分发给一个reduce,当一个key数据过大时就倾斜了。并不是所有的聚合操作都需要在Reduce端完成,很多聚合操作都可以先在Map端进行部分聚合,最后在Reduce端得出最终结果。
开启Map端聚合参数设置
(1)是否在Map端进行聚合,默认为True
hive.map.aggr = true
(2)在Map端进行聚合操作的条目数目
hive.groupby.mapaggr.checkinterval = 100000
(3)有数据倾斜的时候进行负载均衡(默认是false)
hive.groupby.skewindata = true
由于COUNT DISTINCT操作需要用一个Reduce Task来完成,这一个Reduce需要处理的数据量太大,就会导致整个Job很难完成,一般COUNT DISTINCT使用先GROUP BY再COUNT的方式替换Count(Distinct)
关系型数据库中,对分区表Insert数据时候,数据库自动会根据分区字段的值,将数据插入到相应的分区中,Hive中也提供了类似的机制,即动态分区(Dynamic Partition),只不过,使用Hive的动态分区,需要进行相应的配置。
(1)开启动态分区功能(默认true,开启)
hive.exec.dynamic.partition=true
(2)设置为非严格模式(动态分区的模式,默认strict,表示必须指定至少一个分区为静态分区,nonstrict模式表示允许所有的分区字段都可以使用动态分区。)
hive.exec.dynamic.partition.mode=nonstrict
(3)在所有执行MR的节点上,最大一共可以创建多少个动态分区。
hive.exec.max.dynamic.partitions=1000
(4)在每个执行MR的节点上,最大可以创建多少个动态分区。该参数需要根据实际的数据来设定。比如:源数据中包含了一年的数据,即day字段有365个值,那么该参数就需要设置成大于365,如果使用默认值100,则会报错。
hive.exec.max.dynamic.partitions.pernode=100
(5)整个MR Job中,最大可以创建多少个HDFS文件。
hive.exec.max.created.files=100000
(6)当有空分区生成时,是否抛出异常。一般不需要设置。
hive.error.on.empty.partition=false
Hive会将一个查询转化成一个或者多个阶段。这样的阶段可以是MapReduce阶段、抽样阶段、合并阶段、limit阶段。或者Hive执行过程中可能需要的其他阶段。默认情况下,Hive一次只会执行一个阶段。不过,某个特定的job可能包含众多的阶段,而这些阶段可能并非完全互相依赖的,也就是说有些阶段是可以并行执行的,这样可能使得整个job的执行时间缩短。不过,如果有更多的阶段可以并行执行,那么job可能就越快完成。
通过设置参数hive.exec.parallel值为true,就可以开启并发执行。不过,在共享集群中,需要注意下,如果job中并行阶段增多,那么集群利用率就会增加。
set hive.exec.parallel=true; //打开任务并行执行
set hive.exec.parallel.thread.number=16; //同一个sql允许最大并行度,默认为8。
当然,得是在系统资源比较空闲的时候才有优势,否则,没资源,并行也起不来。
在分布式集群环境下,因为程序Bug(包括Hadoop本身的bug),负载不均衡或者资源分布不均等原因,会造成同一个作业的多个任务之间运行速度不一致,有些任务的运行速度可能明显慢于其他任务(比如一个作业的某个任务进度只有50%,而其他所有任务已经运行完毕),则这些任务会拖慢作业的整体执行进度。为了避免这种情况发生,Hadoop采用了推测执行(Speculative Execution)机制,它根据一定的法则推测出“拖后腿”的任务,并为这样的任务启动一个备份任务,让该任务与原始任务同时处理同一份数据,并最终选用最先成功运行完成任务的计算结果作为最终结果。
注意:对于 left join 或者 right join 来说,不会对关联的字段自动去除 null值,对于 inner join 来说,会对关联的字段自动去除 null 值。
情形:比如用户表中user_id字段为int,log表中user_id字段string类型。当按照user_id进行两个表的Join操作时。
解决方式:
把数字类型转换成字符串类型
select * from users a
left outer join logs b
on a.usr_id = cast(b.user_id as string)
bug记录:https://www.jianshu.com/p/2181e00d74dc
在生产环境经常会用大量空值数据进入到一个reduce中去,导致数据倾斜。
解决办法:
自定义分区,将为空的key转变为字符串加随机数或纯随机数,将因空值而造成倾斜的数据分不到多个Reducer。对于异常值如果不需要的话,最好是提前在where条件里过滤掉,这样可以使计算量大大减少
(1) 按照id分组计算count值
-> 单个Key
-> 多个Key
(2) 单个Key
加随机数,双重聚合
配置参数,双重聚合 set hive.groupby.skewindata = true;
过滤出这个Key单独处理
(3) 多个Key
增加Reducer个数,一定程度上解决问题
自定义分区器
加随机数,双重聚合
配置参数,双重聚合 set hive.groupby.skewindata = true;
hive.groupby.skewindata=true
hive.map.aggr=true
该参数设定位true时,生成的查询计划会有两个MR Job。第一个MR Job中,Map的输出结果集合会随机分布到Reduce中,每个Reduce做部分聚合操作,并输出结果,这样处理的结果是相同的Group By Key有可能被分发到不同的Reduce中,从而达到负载均衡的目的;第二个MR Job再根据预处理的数据结果按照Group By Key 分布到 Reduce 中(这个过程可以保证相同的 Group By Key 被分布到同一个Reduce中),最后完成最终的聚合操作。
(1) 大表JOIN小表 mapJoin 避免了Reducer
(2) 大表JOIN大表 A表加随机数 B表扩容 聚合
A concat(name,'_',随机数[1,2])
B
concat(name,'_',1)
union all
concat(name,'_',2)
在map执行前合并小文件,减少map数:CombineHiveInputFormat具有对小文件进行合并的功能(系统默认的格式)。HiveInputFormat没有对小文件合并功能。
set hive.input.format= org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
当input的文件都很大,任务逻辑复杂,map执行非常慢的时候,可以考虑增加Map数,来使得每个map处理的数据量减少,从而提高任务的执行效率。
增加map的方法为:根据computeSliteSize(Math.max(minSize,Math.min(maxSize,blocksize)))=blocksize=128M公式,调整maxSize最大值。让maxSize最大值低于blocksize就可以增加map的个数。
hive 默认的字段分隔符为ascii码的控制符\001(^A),建表的时候用fields terminated by ‘\001’。注意:如果采用\t或者\001等为分隔符,需要要求前端埋点和javaEE后台传递过来的数据必须不能出现该分隔符,通过代码规范约束。一旦传输过来的数据含有分隔符,需要在前一级数据中转义或者替换(ETL)。
可以设置参数(导入HDFS同样有效):
--hive-drop-import-delims 导入到hive时删除 \n, \r, \001
--hive-delims-replacement 导入到hive时用自定义的字符替换掉 \n, \r, \001
字段包含分隔符存在的问题:
添加参数的效果:
Hive中的体现:
Tez可以将多个有依赖的作业转换为一个作业,这样只需写一次HDFS,且中间节点较少,从而大大提升作业的计算性能。
Mr/tez/spark区别:
Mr引擎:多job串联,基于磁盘,落盘的地方比较多。虽然慢,但一定能跑出结果。一般处理,周、月、年指标。
Spark引擎:虽然在Shuffle过程中也落盘,但是并不是所有算子都需要Shuffle,尤其是多算子过程,中间过程不落盘 DAG有向无环图。 兼顾了可靠性和效率。一般处理天指标。
Tez引擎:完全基于内存。 注意:如果数据量特别大,慎重使用。容易OOM。一般用于快速出结果,数据量比较小的场景。
元数据备份(重点,如数据损坏,可能整个集群无法运行,至少要保证每日零点之后备份到其它服务器两个复本)
MySQL的utf8编码最多存储3个字节,当数据中存在表情号、特色符号时会占用超过3个字节数的字节,那么会出现错误 Incorrect string value: ‘\xF0\x9F\x91\x91\xE5\xB0…’
解决办法:将utf8修改为utf8mb4
首先修改库的基字符集和数据库排序规则
再使用 SHOW VARIABLES LIKE ‘%char%’; 命令查看参数
确保这几个参数的value值为utf8mb4 如果不是使用set命令修改
如:set character_set_server = utf8mb4;
1)union会将联合的结果集去重,效率较union all差
2)union all不会对结果集去重,所以效率高
null在hive底层默认是用'\N'来存储的,可以通过alter table test SET SERDEPROPERTIES('serialization.null.format' = 'a');来修改。
Hive支持三种不同的元存储服务器,分别为:内嵌式元存储服务器、本地元存储服务器、远程元存储服务器
,每种存储方式使用不同的配置参数。
内嵌式元存储主要用于单元测试,在该模式下每次只有一个进程可以连接到元存储,Derby是内嵌式元存储的默认数据库。
在本地模式下,每个Hive客户端都会打开到数据存储的连接并在该连接上请求SQL查询。
在远程模式下,所有的Hive客户端都将打开一个到元数据服务器的连接,该服务器依次查询元数据,元数据服务器和客户端之间使用Thrift协议通信。
默认格式,存储方式为行存储,数据不做压缩,磁盘开销大,数据解析开销大。
可结合Gzip、Bzip2使用(系统自动检查,执行查询时自动解压),但使用这种方式,压缩后的文件不支持split,Hive不会对数据进行切分,从而无法对数据进行并行操作。并且在反序列化过程中,必须逐个字符判断是不是分隔符和行结束符,因此反序列化开销会比SequenceFile高几十倍。
SequenceFile是Hadoop API提供的一种二进制文件支持,,存储方式为行存储,其具有使用方便、可分割、可压缩的特点。
SequenceFile支持三种压缩选择:NONE,RECORD,BLOCK。Record压缩率低,一般建议使用BLOCK压缩。优势是文件和hadoop api中的MapFile是相互兼容的
存储方式:数据按行分块,每块按列存储。结合了行存储和列存储的优点
存储方式:数据按行分块 每块按照列存储。 压缩快 快速列存取。 效率比rcfile高,是rcfile的改良版本。
总结:相比TEXTFILE和SEQUENCEFILE,RCFILE由于列式存储方式,数据加载时性能消耗较大,但是具有较好的压缩比和查询响应。
数据仓库的特点是一次写入、多次读取,因此,整体来看,RCFILE相比其余两种格式具有较明显的优势。
存储文件压缩比: ORC > Parquet > TextFile
实际开发中数据格式一般为ORC和Parquet ,压缩格式一般为Snappy。
Hive中的桶表是一种数据分区的方式,将相似的数据行分配到相同的桶中,然后将每个桶存储为一个单独的文件。桶表可以提高查询性能,因为它们允许Hive更容易地对数据进行切片和切块,从而提高查询的速度。
桶表在创建时需要指定桶的数量和桶表的列。桶的数量必须是正整数,通常是2的幂,以便Hive可以将数据行分配到适当的桶中。桶表的列用于指定桶的分配方式。
下面是创建一个桶表的示例:
CREATE TABLE my_bucketed_table (
col1 INT,
col2 STRING,
col3 DOUBLE
)
CLUSTERED BY (col1) INTO 4 BUCKETS;
在上面的示例中,我们创建了一个桶表my_bucketed_table,其中包含三列col1,col2和col3。表被分为4个桶,并按列col1进行分区。
可以使用INSERT INTO语句将数据插入到桶表中。在插入数据时,Hive会根据指定的列对数据进行哈希,然后将其分配到适当的桶中。下面是一个插入数据的示例:
INSERT INTO TABLE my_bucketed_table
SELECT col1, col2, col3
FROM my_table;
在上面的示例中,我们将my_table中的数据插入到my_bucketed_table中。由于my_bucketed_table是一个桶表,Hive会将数据行哈希并将其分配到适当的桶中。
在查询桶表时,Hive可以利用桶的分配方式来加速查询。例如,如果查询只涉及表中的一个桶,则Hive只需要扫描一个文件,从而提高查询性能。下面是一个查询桶表的示例:
SELECT *
FROM my_bucketed_table
WHERE col1 = 100;
在上面的示例中,我们查询了my_bucketed_table中col1等于100的所有行。由于表被分为桶,并按列col1进行分区,因此Hive只需要扫描包含值为100的桶,而不需要扫描整个表,从而提高了查询性能。
User表 t1
name sex age name age message names
张三 1 22 张三 1,22 1,22 张三|王五
李四 2 24 --> 李四 2,24 --> 2,24 李四
王五 1 22 王五 1,22
HQL:
select
t1.message,
concat_ws("|",collect_set(t1.name)) names -- collect_set去重 collect——list不去重
from(
select name,concat(sex,",",age) message from User
) t1
group by t1.message;
Movie_Info表:
movie | category | |
---|---|---|
《战狼2》 | 战争,动作 | —> 《战狼2》 战争 |
《lie to me》 | 悬疑,心理 | —> 《lie to me》 悬疑 |
《lie to me》 心理 |
HQL:
select
movie,
category_name
from
Movie_Info
lateral view explode(split(category,",")) tmp as category_name;