Hbase默认只支持对行键的索引,那么如果需要针对其它的列来进行查询,就只能全表扫描了。表如果较大的话,代价是不可接受的,所以要提出二级索引的方案。网上的实现方法很多,华为,360等公司都有自己的方案,其中华为的已经开源,但是貌似对源码改动较大,新手不容易接受,所以没有选择它们。而其它的像利用Phoenix,solr等外部框架构建索引对Hbase的学习并没有太大的帮助。综上所述,我使用了Hbase自带的Cprocessor(协处理器)来实现。
有关协处理器的讲解,Hbase官方文档是最好的,这里大体说一下它的作用与使用方法。
关键部分来了,既然Hbase并没有提供二级索引,那如何实现呢?先看下面这张图
我们的需求是找出满足cf1:col2=c22这条记录的cf1:col1的值,实现方法如图,首先根据cf1:col2=c22查找到该记录的行键,然后再通过行健找到对应的cf1:col1的值。其中第二步是很容易实现的,因为Hbase的行键是有索引的,那关键就是第一步,如何通过cf1:col2的值找到它对应的行键。很容易想到建立cf1:col2的映射关系,即将它们提取出来单独放在一张索引表中,原表的值作为索引表的行键,原表的行键作为索引表的值,这就是Hbase的倒排索引的思想。
思想有了,工具有了Coprocessor,就开始具体实现了。我们想实现的功能就是每在原表插入一条数据,就相应的在索引表中也插入一条数据。也就是在Put数据到原表之前/之后使用Coprocessor提供的prePut/postPut方法向索引表中插入你想要的数据!
我使用的环境
Hbase提供了JavaAPI以实现增删改查,网上很多教程,大家可以自己去找,或者从我的github中down也行,我们直接来看Coprocessor中的代码怎么写
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.util.Bytes;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
/**
* Created by cwj on 17-10-26.
*
*/
public class IndexObserver extends BaseRegionObserver {
private static final byte[] TABLE_NAME = Bytes.toBytes("index_name_users");
private static final byte[] COLUMN_FAMILY = Bytes.toBytes("personalDet");
private static final byte[] COLUMN = Bytes.toBytes("name");
private Configuration configuration = HBaseConfiguration.create();
@Override
public void prePut(ObserverContext<RegionCoprocessorEnvironment> e, Put put, WALEdit edit, Durability durability)
throws IOException {
HTable indexTable = new HTable(configuration, TABLE_NAME);
List<Cell> cells = put.get(COLUMN_FAMILY, COLUMN);
Iterator<Cell> cellIterator = cells.iterator();
while (cellIterator.hasNext()) {
Cell cell = cellIterator.next();
Put indexPut = new Put(CellUtil.cloneValue(cell));
indexPut.add(COLUMN_FAMILY, COLUMN, CellUtil.cloneRow(cell));
indexTable.put(indexPut);
}
}
}
这里用的是Hbase官网在Coprocessor给的那个例子,表结构是这样的:
给personalDet:name列建立索引,代码本身很简单,大体说说吧,RegionObserver是基本接口,BaseRegionObserver是其实现类,一般继承这个类就行了,然后在prePut方法中向索引表中插入数据。可以看到prePut方法的入参有一个put对象,这个对象就是你在主表插入数据时的那个put对象,所以你可以通过这个对象拿到之前主表插入的数据,这样就可以实现自己的需求了。
之后将这个工程打成jar包(可以用IDEA自带的打包方式,或者maven-assembly-plugin插件也行),pom文件有这两个依赖就行了
<dependencies>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<version>1.2.4</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>1.2.4</version>
</dependency>
</dependencies>
要使用Coprocessor,就需要先完成对其的装载。这可以静态实现(通过HBase配置文件),也可以动态完成(通过shell或Java API)。
按以下如下步骤可以静态装载自定义的Coprocessor。需要注意的是,如果一个Coprocessor是静态装载的,要卸载它就需要重启HBase。静态装载步骤如下:
下面演示了如何装载一个自定义Coprocessor(这里是在SumEndPoint.java中实现的),需要在每个RegionServer的hbase-site.xml中创建如下的记录:
<property>
<name>hbase.coprocessor.region.classes</name>
<value>org.cwj.hbase.coprocessor.observer.IndexObserver</value>
</property>
如果要装载多个类,类名需要以逗号分隔。HBase会使用默认的类加载器加载配置中的这些类,因此需要将相应的jar文件上传到HBase服务端的类路径下。
使用这种方式加载的Coprocessor将会作用在HBase所有表的全部Region上,因此这样加载的Coprocessor又被称为系统Coprocessor。在Coprocessor列表中第一个Coprocessor的优先级值为Coprocessor.Priority.SYSTEM,其后的每个Coprocessor的值将会按序加一(这意味着优先级会减降低,因为优先级是按整数的自然顺序降序排列的)。
当调用配置的Observer Coprocessor时,HBase将会按照优先级顺序依次调用它们的回调方法。
静态卸载的步骤如下:
动态装载Coprocessor的一个优势就是不需要重启HBase。不过动态装载的Coprocessor只是针对某个表有效。因此,动态装载的Coprocessor又被称为表级Coprocessor。
此外,动态装载Coprocessor是对表的一次schema级别的调整,因此在动态装载Coprocessor时,目标表需要离线(disable)。
动态装载Coprocessor有两种方式:通过HBase Shell和通过Java API。不管选择哪一种,都要先将打好的jar包上传到HDFS中
1.1 先将表disable
disable 'users'
1.2 使用类似如下命令装载
alter 'users', METHOD => 'table_att', 'Coprocessor'=>'hdfs://<namenode>:<port>/
user/<hadoop-user>/coprocessor.jar| org.cwj.hbase.Coprocessor.IndexObserver|1073741823|
arg1=1,arg2=2'
简单解释下这个命令。这条命令在一个表的table_att中添加了一个新的属性“Coprocessor”。使用的时候Coprocessor会尝试从这个表的table_attr中读取这个属性的信息。这个属性的值用管道符“|”分成了四部分:
1.3 enable这个表
enable 'users'
1.4 查看是否加载成功
describe 'users'
装载过程就是这样,卸载过程和装载大体一样的,也是先将表disable,卸载之后在重新enable 卸载方式如下:
hbase> alter 'users', METHOD => 'table_att_unset', NAME => 'coprocessor$1'
Hbase版本前后经历了很大的变化,JavaAPI也是,有些方法在这个版本过期了,下个版本可能又会拿回来,所以代码根据自己的版本来,我这里提供的代码在1.2.4下是可以用的
public class CoprocessorUtilTest {
private String tableName;
private String jarPath;
private Class className;
private Logger logger = LogManager.getLogger(CoprocessorUtilTest.class);
@Before
public void setUp() throws Exception {
tableName = "users";
jarPath = "hdfs://os-1:9000/HbaseTest.jar";
className = ObserverExample.class;
// className = SumEndPoint.class;
// className = IndexObserver.class;
}
@Test
public void loadCoprocessor() throws Exception {
logger.info("load coprocessor...");
TableName tName = TableName.valueOf(tableName);
Path path = new Path(jarPath);
Configuration configuration = HBaseConfiguration.create();
Connection connection = ConnectionFactory.createConnection(configuration);
Admin admin = connection.getAdmin();
admin.disableTable(tName);
HTableDescriptor hTableDescriptor = new HTableDescriptor(tName);
HColumnDescriptor columnFamily1 = new HColumnDescriptor("personalDet");
columnFamily1.setMaxVersions(3);
hTableDescriptor.addFamily(columnFamily1);
HColumnDescriptor columnFamily2 = new HColumnDescriptor("salaryDet");
columnFamily2.setMaxVersions(3);
hTableDescriptor.addFamily(columnFamily2);
hTableDescriptor.addCoprocessor(className.getCanonicalName(), path, Coprocessor.PRIORITY_USER, null);
admin.modifyTable(tName, hTableDescriptor);
admin.enableTable(tName);
logger.info("load coprocessor successful!");
}
@Test
public void unloadCoprocessor() throws Exception {
logger.info("unload coprocessor...");
TableName tName = TableName.valueOf(tableName);
Configuration configuration = HBaseConfiguration.create();
Connection connection = ConnectionFactory.createConnection(configuration);
Admin admin = connection.getAdmin();
admin.disableTable(tName);
HTableDescriptor hTableDescriptor = new HTableDescriptor(tName);
HColumnDescriptor columnFamily1 = new HColumnDescriptor("personalDet");
columnFamily1.setMaxVersions(3);
hTableDescriptor.addFamily(columnFamily1);
HColumnDescriptor columnFamily2 = new HColumnDescriptor("salaryDet");
columnFamily2.setMaxVersions(3);
hTableDescriptor.addFamily(columnFamily2);
hTableDescriptor.removeCoprocessor(className.getCanonicalName());
admin.modifyTable(tName, hTableDescriptor);
admin.enableTable(tName);
logger.info("unload coprocessor successful!");
}
}
好了,这里有几个注意的地方
conf = HBaseConfiguration.create();
conf.set("hbase.zookeeper.quorum", "xxx.xxx.x.xx");
conf.set("hbase.zookeeper.property.clientPort", "2181");
我的环境使用这种方式一直提示无法连接到Hbase,不知道什么原因,这里推荐第二种方式,就是将的服务器的Hbase的配置文件hbase-site.xml,core-site.xml复制到客户端的src目录下,这样在加载的时候,首先它会从本地的配置文件读取地址,这样就可以连接到你的远程Hbase了。
hTableDescriptor.addCoprocessor(className.getCanonicalName(), path, Coprocessor.PRIORITY_USER, null);
第一个入参一定是一个Class对象.getCanonicalName(),刚开始的String classname。。
这个问题本身很弱智,但是引发的后果还是很严重的,那就是加载之后,集群直接崩了,几个RegionServer全部dead了,重启之后也一样,10S之内,相继挂掉。发现这个错误 java.lang.RuntimeException: HRegionServer Aborted,各种搜索发现,默认当加载了错误的Coprocessor之后,会导致RegionServer挂掉。解决方法是修改hbase-site.xml文件
<property>
<name>hbase.coprocessor.abortonerror</name>
<value>false</value>
</property>
关于这个参数,后续还会对它进行说明,这里设为false是指,哪怕加载了错误的Coprocessor,集群也不会崩溃
好了,集群重新起来了,修改了代码,成功加载上去了,兴冲冲的插入一条数据试试,然而再次懵比,索引表中并没有插入相应的索引数据
<property>
<name>hbase.table.sanity.checks</name>
<value>false</value>
</property>
代码其实并不复杂,但是集群的调试最麻烦,没事就去翻翻log,然后根据错误找原因。今天就到此为止,之后我们再一起深入学习Hbase。