Databus是一个低延迟、可靠的、支持事务的、保持一致性的数据变更抓取系统。由LinkedIn于2013年开源。Databus通过挖掘数据库日志的方式,将数据库变更实时、可靠的从数据库拉取出来,业务可以通过定制化client实时获取变更并进行其他业务逻辑。
现实期望:
计划基于binlog同步,DB到DB,DB到es的管道,实现业务层面上的数据解耦. 以下是前期调研接触到的一些安装、测试环节涉及的一些流程
以下是简单脱敏后的草图,欢迎大家留言指出问题点:
下面是安装调试具体遇到的一些错误情况
Could not resolve all files for configuration ':databus2-relay:databus2-event-producer-mock:compileClasspath'.
> Could not find ojdbc6.jar (com.oracle:ojdbc6:11.2.0.2.0).
Searched in the following locations:
file:/Users/wenba/Desktop/tools/databus/databus/sandbox-repo/com/oracle/ojdbc6/11.2.0.2.0/ojdbc6-11.2.0.2.0.jar
解决方式: 下载一个ojdbc6-11.2.0.2.0.jar的jar包放到/Users/wenba/Desktop/tools/databus/databus/sandbox-repo/com/oracle/ojdbc6/11.2.0.2.0/目录下。
Cannot bind to URL rmi://localhost:1099 ServiceUnavailableException
修改: com.linkedin.databus2.core.container.netty.ServerContainer 的 initializeContainerJmx() 方法中加上一句
LocateRegistry.createRegistry(_containerStaticConfig.getJmx().getRmiRegistryPort());
protected void initializeContainerJmx() {
if (_containerStaticConfig.getJmx().isRmiEnabled()) {
try {
JMXServiceURL jmxServiceUrl = new JMXServiceURL(
"service:jmx:rmi://" + _containerStaticConfig.getJmx().getJmxServiceHost() + ":"
+ _containerStaticConfig.getJmx().getJmxServicePort() + "/jndi/rmi://"
+ _containerStaticConfig.getJmx().getRmiRegistryHost() + ":"
+ _containerStaticConfig.getJmx().getRmiRegistryPort() + "/jmxrmi"
+ _containerStaticConfig.getJmx().getJmxServicePort());
LocateRegistry.createRegistry(_containerStaticConfig.getJmx().getRmiRegistryPort());
_jmxConnServer = JMXConnectorServerFactory.newJMXConnectorServer(jmxServiceUrl, null, getMbeanServer());
} catch (Exception e) {
LOG.warn("Unable to instantiate JMX server", e);
}
}
}
com.linkedin.databus.core.DatabusRuntimeException: com.linkedin.databus2.core.DatabusException: pk is assigned to key but fieldList is id,firstName,lastName,birthDate,deleted,
解决:数据库名字 表结构一定要和实际对应
source-person.json
{
"name" : "person",
"id" : 1,
"uri" : "mysql://root%2Fwangyu123@10.1.58.111:3306/1001/mysql-bin",
"slowSourceQueryThreshold" : 2000,
"sources" :
[
{
"id" : 40,
"name" : "com.linkedin.events.example.person.Person",
"uri": "test.person",
"partitionFunction" : "constant:1"
}
]
}
字段定义 com.linkedin.events.example.person.Person.1.avsc
{
"name" : "Person_V1",
"doc" : "Auto-generated Avro schema for sy$person. Generated at Dec 04, 2012 05:07:05 PM PST",
"type" : "record",
"meta" : "dbFieldName=sy$person;pk=key;",
"namespace" : "com.linkedin.events.example.test",
"fields" : [ {
"name" : "key",
"type" : [ "long", "null" ],
"meta" : "dbFieldName=KEY;dbFieldPosition=0;" //主键ID
}, {
"name" : "firstName",
"type" : [ "string", "null" ],
"meta" : "dbFieldName=FIRST_NAME;dbFieldPosition=1;"
}, {
"name" : "lastName",
"type" : [ "string", "null" ],
"meta" : "dbFieldName=LAST_NAME;dbFieldPosition=2;"
}, {
"name" : "birthDate",
"type" : [ "long", "null" ],
"meta" : "dbFieldName=BIRTH_DATE;dbFieldPosition=3;"
}, {
"name" : "deleted",
"type" : [ "string", "null" ],
"meta" : "dbFieldName=DELETED;dbFieldPosition=4;"
} ]
}
databus2-example/databus2-example-client-pkg/conf/client_person.properties
#指定端口
databus.client.container.httpPort=9111
databus.relay.container.httpPort=11125
databus.relay.container.jmx.rmiEnabled=false
.....
databus2-example/databus2-example-client-pkg/conf/client_user.properties
#指定端口
databus.client.container.httpPort=9112
databus.relay.container.httpPort=11126
databus.relay.container.jmx.rmiEnabled=false
databus.relay.eventBuffer.allocationPolicy=DIRECT_MEMORY
剩下的模仿person example新建就可以了, 具体错误见:https://github.com/linkedin/databus/issues/26
build.gradle中:
subprojects {
apply from: rootProject.file("subprojects.gradle")
apply plugin:'java'
dependencies {
//runtime externalDependency.log4j
// Force easymock to version 3.1. One of the espresso dependencies changes it to 2.4
// and v2.4 does not support mocking of classes, causing our espresso unit tests
// to break.
//runtime(externalDependency.easymock) {
// force = true
//}
//compile(externalDependency.easymock) {
// force = true
//}
compile ("mysql:mysql-connector-java:5.1.24")
}
}
task wrapper(type: Wrapper) {
gradleVersion = '1.8'
}
tasks.withType(JavaCompile) { options.encoding = "UTF-8" }
/Users/wenba/Desktop/tools/databus/databus/databus2-example/databus2-example-client/src/main/java/com/linkedin/databus/client/example/PersonConsumer.java
//获取字段
Utf8 firstName = (Utf8) decodedEvent.get("firstName");
Utf8 lastName = (Utf8) decodedEvent.get("lastName");
Long birthDate = (Long) decodedEvent.get("birthDate");
Utf8 deleted = (Utf8) decodedEvent.get("deleted");
//获取业务标示ID 40
LOG.info("id :"+event.getSourceId());
//获取主键
LOG.info(decodedEvent.get("key"));
获取操作事件
//DbusOpcode.UPSERT DbusOpcode.DELETE
LOG.info(event.getOpcode()); //UPSERT DELETE
//如果是更新或添加操作
event.getOpcode().equals(DbusOpcode.UPSERT)
第一步:
databus2-example/databus2-example-relay-pkg/conf/下面创建一个source源
第二步:
databus2-example/databus2-example-relay-pkg/schemas_registry下创建个表字段定义。把新加的文件名加到 index.schemas_registry
第三步:
databus2-example/databus2-example-relay-pkg/script下创建启动脚本,脚本中的涉及到的source_name都要改成你新加的这个包名。
第四步(代码部分):
databus2-example/databus2-example-relay/src/main/java/com/linkedin/databus/relay/example/下新创建启动类。将包名、配置json改成新创建的
第六步:
databus2-example/databus2-example-client-pkg/conf/下新建配置文件,修改不同配置
第七步:
databus2-example/databus2-example-client-pkg/script/start-user-client.sh下再新建个启动脚本,改成新的source_name
第八步(代码部分):
databus2-example/databus2-example-client/src/main/java/com/linkedin/databus/client/example/ 新建client监听类,将里面的包名、监听端口改成数据源端设置的端口号。注册消费类
第九步(代码部分)
databus2-example/databus2-example-client/src/main/java/com/linkedin/databus/client/example/ 新建消费类,主要代码在processEvent下。数据事件和源封装在DbusEvent和DbusEventDecoder对象中。
第十步(代码部分)
创建具体消费类,对数据进行处理
编译:
Build:Databus采用gradle进行编译,所以需要安装gradle环境,安装安成后进入databus根目录执行命令
gradle -Dopen_source=true assemble
即可完成build,成功后在databus根目录下生成名为build的文件夹
启动Relay:
cd build/databus2-example-relay-pkg/distributions
tar -zxvf databus2-example-relay-pkg.tar.gz解压
执行启动脚本
person表
./bin/start-example-relay.sh person -Y ./conf/sources-person.json
user表
./bin/start-user-relay.sh user -Y ./conf/sources-user.json
执行命令 curl -s http://localhost:11115/sources
启动Client:
cd build/databus2-example-client-pkg/distributions
tar -zxvf databus2-example-client-pkg.tar.gz解压
执行启动脚本
./bin/start-example-client.sh person
./bin/start-user-client.sh user
执行命令 curl http://localhost:11115/relayStats/outbound/http/clients
测试:
Relay和Client启动成功后,就已经开始对person表进行数据变更捕获了。日志查看:
databus2-example-relay-pkg/distributions/logs下的relay.log
databus2-example-client-pkg/distributions/logs下的client.log
希望大家以后工作中如果有机会用到,可以避免踩一些坑。
参考文档:https://blog.csdn.net/feng12345zi/article/details/80843554 https://www.jianshu.com/p/9df54eb1ec35