Kudu没有提供标准SQL操作,支持Nosql样式的API,这里使用Java 操作Kudu ,包括创建表、插入数据、修改删除数据、删除表等操作,值得注意的是,Java api直接操作Kudu在开发中不是常用的方式,常用方式是Spark操作Kudu、Kudu与Impala整合写SQL操作Kudu。这里为了后续学习,需要在Kudu中创建一些表。
Java操作Kudu需要在创建好的Maven项目中导入kudu-client依赖,此外我们这里使用的是CDH版本的kudu依赖包,maven默认不支持CHD相关依赖,需要在maven中导入Cloudera仓库支持CDH依赖包:
<!-- maven 不支持CDH 相关依赖,需要添加Cloudera 仓库,可以使maven下载CDH相关依赖 -->
<repositories>
<repository>
<id>cloudera</id>
<url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
</repository>
</repositories>
<!-- 添加kudu-client依赖 -->
<dependency>
<groupId>org.apache.kudu</groupId>
<artifactId>kudu-client</artifactId>
<version>1.10.0-cdh6.3.2</version>
</dependency>
创建表有如下几个步骤:
代码如下:
/**
* 1. 创建KuduClient对象,连接Kudu集群。
*/
KuduClient kuduClient = new KuduClient.KuduClientBuilder("cm1:7051,cm2:7051").build();
/**
* 2. 准备创建表的Schema信息。
*/
ArrayList<ColumnSchema> schemaList = new ArrayList<>();
//添加列,key是指定当前列是否是主键列
schemaList.add(new ColumnSchema.ColumnSchemaBuilder("id",Type.INT32).key(true).build());
schemaList.add(new ColumnSchema.ColumnSchemaBuilder("name",Type.STRING).build());
schemaList.add(new ColumnSchema.ColumnSchemaBuilder("age",Type.INT32).build());
schemaList.add(new ColumnSchema.ColumnSchemaBuilder("score",Type.DOUBLE).build());
Schema schema = new Schema(schemaList);
/**
* 3. 指定创建表的分区属性。
*/
CreateTableOptions options = new CreateTableOptions();
//指定按照id进行hash分区到3个分区,默认id.hashcode % 3 ,决定数据进入哪个tablet
options.addHashPartitions(Arrays.asList("id"), 3);
/**
* 4. 使用KuduClient对象,创建表。
*/
//参数:表名,表的schema信息,表的分区属性
kuduClient.createTable("personInfo",schema , options);
/**
* 5. 关闭KuduClient对象。
*/
kuduClient.close();
执行完成以上命令可以登录Kudu查看到对应的表。
向Kudu表中插入数据经过以下步骤:
代码如下:
/**
* 1.创建KuduClient对象,连接Kudu集群。
*/
KuduClient kuduClient = new KuduClient.KuduClientBuilder("cm1:7051,cm2:7051").build();
/**
* 2.获取插入数据的表。
*/
KuduTable personInfo = kuduClient.openTable("personInfo");
/**
* 3.准备插入数据对象Insert, 准备插入数据PartialRow对象。
*/
//创建插入操作
Insert insert = personInfo.newInsert();
//创建插入的数据对象PartialRow
PartialRow row = insert.getRow();
row.addInt("id", 1);
row.addString("name","zhangsan" );
row.addInt("age",18 );
row.addDouble("score",100.0 );
/**
* 4.开启session会话,应用插入操作插入数据。
*/
//创建kuduSession会话
KuduSession kuduSession = kuduClient.newSession();
//设置kudu刷新插入数据策略
kuduSession.setFlushMode(SessionConfiguration.FlushMode.AUTO_FLUSH_SYNC);
//插入数据
kuduSession.apply(insert);
/**
* 5.关闭KuduClient对象。
*/
kuduSession.close();
kuduClient.close();
经过以上操作,数据插入成功,但是无法在kudu webui中查看到对应的数据,需要使用API查询。
此外,JAVA API 提供了三种向 kudu 插入数据的刷新策略,分别为:
AUTO_FLUSH_SYNC(默认),意思是调用 KuduSession.apply()方法后,客户端会在当数据刷新到服务器后再返回,这种情况就不能批量插入数据,调用 KuduSession.flush()方法不会起任何作用,因为此时缓冲区数据已经被刷新到了服务器。
AUTO_FLUSH_BACKGROUND,意思是调用KuduSession.apply()方法后,客户端会立即返回,但是写入将在后台发送,可能与来自同一会话的其他写入一起进行批处理。
MANUAL_FLUSH,意思是调用KuduSession.apply()方法后,会返回的非常快,但是写操作不会发送,直到用户使用flush()函数,如果缓冲区超过了配置的空间限制,KuduSession.apply()函数会返回一个错误。
查询数据需要以下几个步骤:
代码如下:
/**
* 1.创建KuduClient对象,连接Kudu集群。
*/
KuduClient kuduClient = new KuduClient.KuduClientBuilder("cm1:7051,cm2:7051").build();
/**
* 2.获取查询数据的表。
*/
KuduTable personInfo = kuduClient.openTable("personInfo");
/**
* 3.开启scan扫描器,获取查询数据。
*/
//设置查询的表
KuduScanner scanner = kuduClient.newScannerBuilder(personInfo)
//设置查询的列
.setProjectedColumnNames(Arrays.asList("id", "name", "age", "score"))
.build();
//scanner 中是多个tablet
while(scanner.hasMoreRows()){
//获取一个tablet 数据
RowResultIterator rowResults = scanner.nextRows();
while(rowResults.hasNext()){
RowResult next = rowResults.next();
int id = next.getInt("id");
String name = next.getString("name");
int age = next.getInt("age");
double score = next.getDouble("score");
System.out.println("id = "+id+",name = "+name+",age = "+age+",score = "+score);
}
}
/**
* 4.关闭KuduClient对象。
*/
kuduClient.close();
经过以上查询,可以查询出插入的数据。
Kudu中修改数据与Kudu中插入数据很类似,需要更新一张表的某些数据,需要经过以下步骤:
代码如下:
/**
* 1.创建KuduClient对象,连接Kudu集群。
*/
KuduClient kuduClient = new KuduClient.KuduClientBuilder("cm1:7051,cm2:7051").build();
/**
* 2.获取更新数据的表。
*/
KuduTable personInfo = kuduClient.openTable("personInfo");
/**
* 3.准备更新数据对象Update, 准备更新数据PartialRow对象。
*/
Update update = personInfo.newUpdate();
PartialRow row = update.getRow();
row.addInt("id", 1);//主键列
//需要更新什么列就加上什么列,不能修改主键列
row.addString("name","lisi" );
/**
* 4.开启session会话,应用更新操作,更新数据。
*/
KuduSession kuduSession = kuduClient.newSession();
kuduSession.apply(update);
/**
* 5.关闭KuduClient对象。
*/
kuduSession.close();
kuduClient.close();
经过以上代码执行,可以查询表中数据,对应的列已经修改。
此外,更新数据除了有newUpdate()方法,还有newUpsert()方法,两者都可以对表中存在的主键进行更新操作,两者区别为如果更新的主键在表中不存在,newUpdate()操作没有任何变化,也不会报错,newUpsert()方法会将此条数据插入到表中,但是插入的这条数据所有列都应设置值,否则也将不会有任何操作。
Kudu中删除数据与Kudu中插入数据很类似,需要删除一张表的某些数据,需要经过以下步骤:
代码如下:
/**
* 1.创建KuduClient对象,连接Kudu集群。
*/
KuduClient kuduClient = new KuduClient.KuduClientBuilder("cm1:7051,cm2:7051").build();
/**
* 2.获取删除数据的表。
*/
KuduTable personInfo = kuduClient.openTable("personInfo");
/**
* 3.准备删除数据对象Delete, 准备删除数据PartialRow对象。
*/
Delete delete = personInfo.newDelete();
PartialRow row = delete.getRow();
//删除数据,只需要指定主键即可
row.addInt("id", 1);
/**
* 4.开启session会话,应用删除操作,删除数据。
*/
KuduSession kuduSession = kuduClient.newSession();
kuduSession.apply(delete);
/**
* 5.关闭KuduClient对象。
*/
kuduSession.close();
kuduClient.close();
删除表需要经过如下步骤:
代码如下:
/**
* 1.创建KuduClient对象,连接Kudu集群。
*/
KuduClient kuduClient = new KuduClient.KuduClientBuilder("cm1:7051,cm2:7051").build();
/**
* 2.删除表。
*/
if(kuduClient.tableExists("personInfo")){
kuduClient.deleteTable("personInfo");
}
/**
* 3.关闭KuduClient对象。
*/
kuduClient.close();
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。