Loading [MathJax]/jax/input/TeX/config.js
前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >HBase BulkLoad 原理及批量写入数据实战

HBase BulkLoad 原理及批量写入数据实战

作者头像
大数据真好玩
发布于 2022-12-05 01:11:30
发布于 2022-12-05 01:11:30
2.4K00
代码可运行
举报
文章被收录于专栏:暴走大数据暴走大数据
运行总次数:0
代码可运行

目录

一、HBase BulkLoad介绍

  1. 前言
  2. 为什么要用bulkload方式导入?
  3. bulkload的实现原理

二、HBase BulkLoad批量写入数据实战

  1. 开发生成HFile文件的代码
  2. 打成jar包提交到集群中运行
  3. 观察HDFS上输出的结果
  4. 加载HFile文件到hbase表中
  5. 总结

一、HBase BulkLoad介绍

1. 前言

之前我们介绍了HBASE的存储机制,HBASE存储数据其底层使用的是HDFS来作为存储介质,HBASE的每一张表对应的HDFS目录上的一个文件夹,文件夹名是以HBASE表的名字来命名(如果没有使用命名空间,那么默认是在default目录下)。在表文件夹下存放着若干个region命名的文件夹,而region文件夹中的每个列族也是用文件夹进行存储的,每个列族中存储的就是实际的数据,以HFile的形式存在。

路径格式: /hbase/data/default/<tbl_name>/<region_id>//<hfile_id>

2. 为什么要用bulkload方式导入?

在进行数据传输中,批量加载数据到HBase集群有多种方式,比如通过HBase API进行批量写入数据、使用Sqoop工具批量导数到HBase集群、使用MapReduce批量导入等。这些方式,在导入数据的过程中,如果数据量过大,可能耗时会比较严重或者占用HBase集群资源较多(如磁盘IO、HBase Handler数等)。

3. bulkload的实现原理

按照HBase存储数据按照HFile格式存储在HDFS的原理,使用MapReduce直接生成HFile格式的数据文件,然后再通过RegionServer将HFile数据文件移动到相应的Region上去。

  • HBase数据正常写流程
  • bulkload方式的处理示意图
  • bulkload的好处
  1. 导入过程不占用Region资源
  2. 能快速导入海量的数据
  3. 节省内存

二、HBase BulkLoad批量写入数据实战

需求

通过bulkload的方式,将我们放在HDFS上面的这个路径/hbase/input/user.txt的数据文件,转换成HFile格式,然后load到myuser2这张Hbase表里面去。

1. 开发生成HFile文件的代码

自定义map类

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
package xsluo.hbase;
 
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
 
//四个泛型中后两个,分别对应rowkey及put
public class BulkLoadMapper extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put> {
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String[] split = value.toString().split("\t");
 
        //封装输出的rowkey类型
        ImmutableBytesWritable immutableBytesWritable = new ImmutableBytesWritable(split[0].getBytes());
        //构建Put对象
        Put put = new Put(split[0].getBytes());
        put.addColumn("f1".getBytes(), "name".getBytes(), split[1].getBytes());
        put.addColumn("f1".getBytes(), "age".getBytes(), split[2].getBytes());
 
        context.write(immutableBytesWritable, put);
    }
}

程序main

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
package xsluo.hbase;
 
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
 
public class HBaseBulkLoad extends Configured implements Tool {
    public static void main(String[] args) throws Exception {
        Configuration configuration = HBaseConfiguration.create();
        //设置ZK集群
        configuration.set("hbase.zookeeper.quorum", "node01:2181,node02:2181,node03:2181");
 
        int run = ToolRunner.run(configuration, new HBaseBulkLoad(), args);
        System.exit(run);
    }
 
    @Override
    public int run(String[] strings) throws Exception {
        Configuration conf = super.getConf();
        Job job = Job.getInstance(conf);
        job.setJarByClass(HBaseBulkLoad.class);
 
        FileInputFormat.addInputPath(job, new Path("hdfs://node01:8020/hbase/input"));
        job.setMapperClass(BulkLoadMapper.class);
        job.setMapOutputKeyClass(ImmutableBytesWritable.class);
        job.setMapOutputValueClass(Put.class);
 
        //获取数据库连接
        Connection connection = ConnectionFactory.createConnection(conf);
        Table table = connection.getTable(TableName.valueOf("myuser2"));
 
        //使MR可以向myuser2表中,增量增加数据
        HFileOutputFormat2.configureIncrementalLoad(job, table, connection.getRegionLocator(TableName.valueOf("myuser2")));
        //数据写回到HDFS,写成HFile -> 所以指定输出格式为HFileOutputFormat2
        job.setOutputFormatClass(HFileOutputFormat2.class);
        HFileOutputFormat2.setOutputPath(job, new Path("hdfs://node01:8020/hbase/out_hfile"));
 
        //开始执行
        boolean b = job.waitForCompletion(true);
 
        return b?0:1;
    }
}
2. 打成jar包提交到集群中运行
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
hadoop jar hbase_demo-1.0-SNAPSHOT.jar xsluo.hbase.HBaseBulkLoad
3. 观察HDFS上输出的结果
4. 加载HFile文件到hbase表中

方式一:代码加载

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
package xsluo.hbase;
 
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
 
public class LoadData {
    public static void main(String[] args) throws Exception {
        Configuration configuration = HBaseConfiguration.create();
        configuration.set("hbase.zookeeper.quorum", "node01,node02,node03");
        //获取数据库连接
        Connection connection =  ConnectionFactory.createConnection(configuration);
        //获取表的管理器对象
        Admin admin = connection.getAdmin();
        //获取table对象
        TableName tableName = TableName.valueOf("myuser2");
        Table table = connection.getTable(tableName);
        //构建LoadIncrementalHFiles加载HFile文件
        LoadIncrementalHFiles load = new LoadIncrementalHFiles(configuration);
        load.doBulkLoad(new Path("hdfs://node01:8020/hbase/out_hfile"), admin,table,connection.getRegionLocator(tableName));
    }
}

方式二:命令加载

先将hbase的jar包添加到hadoop的classpath路径下

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
export HBASE_HOME=/xsluo/install/hbase-1.2.0-cdh5.14.2/
export HADOOP_HOME=/xsluo/install/hadoop-2.6.0-cdh5.14.2/
export HADOOP_CLASSPATH=`${HBASE_HOME}/bin/hbase mapredcp`
  • 运行命令
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
yarn jar /xsluo/install/hbase-1.2.0-cdh5.14.2/lib/hbase-server-1.2.0-cdh5.14.2.jar   completebulkload /hbase/out_hfile myuser2

加载完后,原文件会被移动到hdfs中此表对应的region的列族的目录下

5. 总结

本文为了演示实战效果,将生成HFile文件和使用BulkLoad方式导入HFile到HBase集群的步骤进行了分解,实际情况中,可以将这两个步骤合并为一个,实现自动化生成与HFile自动导入。如果在执行的过程中出现RpcRetryingCaller的异常,可以到对应RegionServer节点查看日志信息,这里面记录了出现这种异常的详细原因。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2022-10-12,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 大数据真好玩 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
暂无评论
推荐阅读
编辑精选文章
换一批
2021年大数据HBase(十五):HBase的Bulk Load批量加载操作
        很多时候,我们需要将外部的数据导入到HBase集群中,例如:将一些历史的数据导入到HBase做备份。我们之前已经学习了HBase的Java API,通过put方式可以将数据写入到HBase中,我们也学习过通过MapReduce编写代码将HDFS中的数据导入到HBase。但这些方式都是基于HBase的原生API方式进行操作的。这些方式有一个共同点,就是需要与HBase连接,然后进行操作。HBase服务器要维护、管理这些连接,以及接受来自客户端的操作,会给HBase的存储、计算、网络资源造成较大消耗。此时,在需要将海量数据写入到HBase时,通过Bulk load(大容量加载)的方式,会变得更高效。可以这么说,进行大量数据操作,Bulk load是必不可少的。
Lansonli
2021/10/11
2.1K0
Spark读写HBase之使用Spark自带的API以及使用Bulk Load将大量数据导入HBase
以上两个算子分别是基于Hadoop新版API和hadoop旧版API实现的,大部分代码都一样,需要注意的是新版API使用中Job类,旧版API使用JobConf类,另外导包的时候新版的相关jar包在org.apache.hadoop.mapreduce下,而旧版的相关jar包在org.apache.hadoop.mapred下
CoderJed
2018/09/13
3.4K0
HBase 写优化之 BulkLoad 实现数据快速入库
1、为何要 BulkLoad 导入?传统的 HTableOutputFormat 写 HBase 有什么问题? 我们先看下 HBase 的写流程: 通常 MapReduce 在写HBase时使
用户1177713
2018/02/24
3.1K0
HBase 写优化之 BulkLoad 实现数据快速入库
ImportTsv-HBase数据导入工具
HBase官方提供了基于Mapreduce的批量数据导入工具:Bulk load和ImportTsv。关于Bulk load大家可以看下我另一篇博文。
幽鸿
2020/04/01
1.1K0
ImportTsv-HBase数据导入工具
HBase Bulkload 实践探讨
HBase 是一个面向列,schemaless,高吞吐,高可靠可水平扩展的 NoSQL 数据库,用户可以通过 HBase client 提供的 put get 等 api 实现在数据的实时读写。在过去的几年里,HBase 有了长足的发展,它在越来越多的公司里扮演者越来越重要的角色。同样的,在有赞 HBase 承担了在线存储的职责,服务了有赞用户,商品详情,订单详情等核心业务。HBase 擅长于海量数据的实时读取,但软件世界没有银弹,原生 HBase 没有二级索引,复杂查询场景支持的不好。同时因为 split,磁盘,网络抖动,Java GC 等多方面的因素会影响其 RT 表现,所以通常我们在使用HBase的同时也会使用其他的存储中间件,比如 ES,Reids,Mysql 等等。避免 HBase 成为信息孤岛,我们需要数据导入导出的工具在这些中间件之间做数据迁移,而最常用的莫过于阿里开源的 DataX。Datax从 其他数据源迁移数据到 HBase 实际上是走的 HBase 原生 api 接口,在少量数据的情况下没有问题,但当我们需要从 Hive 里,或者其他异构存储里批量导入几亿,几十亿的数据,那么用 DataX 这里就显得不那么适合,因为走原生接口为了避免影响生产集群的稳定性一定要做好限流,那么海量数据的迁移就很很慢,同时数据的持续写入会因为 flush,compaction 等机制占用较多的系统资源。为了解决批量导入的场景,Bulkload 应运而生。
有赞coder
2020/08/24
1.7K3
HBase Bulkload 实践探讨
HBase应用(一):数据批量导入说明
前两种方式:需要频繁的与数据所存储的 RegionServer 通信,一次性导入大量数据时,可能占用大量 Regionserver 资源,影响存储在该 Regionserver 上其他表的查询。
create17
2019/06/15
4.2K0
用户画像 | 标签数据存储之HBase真实应用
上一篇文章已经为大家介绍了 MySQL 在用户画像的标签数据存储中的具体应用场景,本篇我们来谈谈 HBase 的使用!
大数据梦想家
2021/10/22
2.5K0
史上最全 | HBase 知识体系吐血总结
HBase 是 BigTable 的开源 Java 版本。是建立在 HDFS 之上,提供高可靠性、高性能、列存储、可伸缩、实时读写 NoSql 的数据库系统。
五分钟学大数据
2021/11/23
5.2K0
史上最全 | HBase 知识体系吐血总结
Mapreduce和HBase新版本整合之WordCount计数案例
先计数单词数量存到hdfs文件上,这个是以前的就做过的 package com.my.myhnase.mapreduce; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; i
汤高
2018/01/11
1.2K0
使用Spark通过BulkLoad快速导入数据到HBase
在项目中有需求需要将Hive表中的数据存储在HBase中。使用Spark访问Hive表,将读表数据导入到HBase中,写入HBase有两种方式:一种是通过HBase的API接口批量的将数据写入HBase,另一种是通过BulkLoad的方式生成HFile文件然后加载到HBase中,两种方式相比之下第二种效率会更高。本篇文章Fayson主要介绍如何使用Spark读取Hive表数据通过BulkLoad的方式快速的将数据导入到HBase。
Fayson
2018/08/17
4.5K0
使用Spark通过BulkLoad快速导入数据到HBase
Hbase入门篇03---Java API使用,HBase高可用配置和架构设计
因为缴费明细的数据记录非常庞大,该公司的信息部门决定使用HBase来存储这些数据。并且,他们希望能够通过Java程序来访问这些数据。
大忽悠爱学习
2023/05/23
9160
Hbase入门篇03---Java API使用,HBase高可用配置和架构设计
对给定的数据利用MapReduce编程实现数据的清洗和预处理
数据集: 链接:https://pan.baidu.com/s/1rnUJn5ld45HpLhzbwYIM1A 提取码:7bsd
Maynor
2021/12/20
8120
Hbase Bulkload 原理|面试必备
下面假设我们有一个 CSV 文件,是存储用户购买记录的。它一共有三列, order_id,consumer,product。我们需要将这个文件导入到Hbase里,其中 order_id 作为Hbase 的 row key。
Spark学习技巧
2021/03/05
2.5K0
通过Spark生成HFile,并以BulkLoad方式将数据导入到HBase
在实际生产环境中,将计算和存储进行分离,是我们提高集群吞吐量、确保集群规模水平可扩展的主要方法之一,并且通过集群的扩容、性能的优化,确保在数据大幅增长时,存储不能称为系统的瓶颈。
大数据学习与分享
2020/08/10
2.6K0
HBase的读写路径详解与性能调优指南
HBase作为分布式数据库,在大规模数据存储与处理方面展现了强大的能力,特别适用于在线分析处理、时间序列数据处理等场景。由于其基础是Hadoop HDFS的分布式存储架构,因此HBase在提供海量数据存储能力的同时,具备了高吞吐量和水平扩展的特点。HBase提供了强大的存储和读写性能,但为了在实际的生产环境中充分发挥其效能,深入了解HBase的读写路径,并通过性能调优来优化整体数据处理过程是十分必要的。
数字扫地僧
2024/09/05
2490
HBase笔记
HBaseAdmin(Admin):管理表(创建,删除)     HTableDescriptor:表描述器,用于创建表     HColumnDescriptor:列描述器(构建列族)
CBeann
2023/12/25
1760
HBase笔记
HBase快速入门系列(7) | 官方HBase-MapReduce与自定义
1. 官方HBase-MapReduce 1.查看HBase的MapReduce任务的执行 [bigdata@hadoop002 hbase]$ bin/hbase mapredcp 上图标记处为
不温卜火
2020/10/28
8600
HBase快速入门系列(7) | 官方HBase-MapReduce与自定义
快速学习-HBaseAPI操作
通过HBase的相关JavaAPI,我们可以实现伴随HBase操作的MapReduce过程,比如使用MapReduce将数据从本地文件系统导入到HBase的表中,比如我们从HBase中读取一些原始数据后使用MapReduce做数据分析。
cwl_java
2020/02/21
4890
Hadoop基础教程-第10章 HBase:Hadoop数据库(10.6 HBase API)
第10章 HBase:Hadoop数据库 10.6 HBase API (新特性) 本节所有代码可以从https://github.com/ihadron/hbase.git下载。 10.6.1 HB
程裕强
2018/01/02
2.4K0
Hadoop基础教程-第10章 HBase:Hadoop数据库(10.6 HBase API)
HBase整合MapReduce之建立HBase索引
HBase索引主要用于提高Hbase中表数据的访问速度,有效的避免了全表扫描,HBase中的表根据行健被分成了多个Regions,通常一个region的一行都会包含较多的数据,如果以列值作为查询条件,就只能从第一行数据开始往下找,直到找到相关数据为止,这很低效。相反,如果将经常被查询的列作为行健、行健作为列重新构造一张表,即可实现根据列值快速定位相关数据所在的行,这就是索引。显然索引表仅需要包含一个列,所以索引表的大小和原表比起来要小得多,如图4-14给出了索引表与原表之间的关系。从图可以看出,由于索引表的
汤高
2018/01/11
1.1K0
HBase整合MapReduce之建立HBase索引
推荐阅读
相关推荐
2021年大数据HBase(十五):HBase的Bulk Load批量加载操作
更多 >
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
本文部分代码块支持一键运行,欢迎体验
本文部分代码块支持一键运行,欢迎体验