前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >使用OGG微服务将PG同步到kafka(全量+增量)

使用OGG微服务将PG同步到kafka(全量+增量)

作者头像
AiDBA宝典
发布2023-04-26 11:21:37
发布2023-04-26 11:21:37
81900
代码可运行
举报
运行总次数:0
代码可运行

环境准备

PG环境

代码语言:javascript
代码运行次数:0
运行
复制
-- 创建专用网络
docker network create --subnet=172.72.6.0/24 pg-network

-- PG
docker rm -f lhrpg
docker run -d --name lhrpg -h lhrpg \
   -p : --net=pg-network --ip 172.72.6.34 \
   -e POSTGRES_PASSWORD=lhr \
   -e TZ=Asia/Shanghai \
   postgres:14.2



psql -U postgres -h 192.168.1.35 -p 


create database lhrdb;
\c lhrdb
create table t1(id int primary key);
create table t2(id int primary key);
create schema ogg;


-- 需要重启库
alter system set wal_level='logical';
select pg_reload_conf();


docker restart lhrpg





sysbench /usr/share/sysbench/oltp_common.lua --db-driver=pgsql \
--pgsql-host=172.72.6.34 --pgsql-port=5432 \
--pgsql-user=postgres --pgsql-password=lhr --pgsql-db=lhrdb \
--table-size=100 --tables=10 --threads=10 \
--events=999999999 --time=60 prepare



psql -U postgres -h 192.168.1.35 -p 64320 -d lhrdb
lhrdb=# \dt
          List of relations
 Schema |   Name   | Type  |  Owner   
--------+----------+-------+----------
 public | sbtest1  | table | postgres
 public | sbtest10 | table | postgres
 public | sbtest2  | table | postgres
 public | sbtest3  | table | postgres
 public | sbtest4  | table | postgres
 public | sbtest5  | table | postgres
 public | sbtest6  | table | postgres
 public | sbtest7  | table | postgres
 public | sbtest8  | table | postgres
 public | sbtest9  | table | postgres
 public | t1       | table | postgres
 public | t2       | table | postgres
(12 rows)

目标端kafka环境

代码语言:javascript
代码运行次数:0
运行
复制
docker pull lhrbest/kafka:3.2.0


docker rm -f lhrkafka
docker run -itd --name lhrkafka -h lhrkafka \
  --net=pg-network --ip 172.72.6.44 \
  -p 9092:9092 -p 2181:2181 \
  -v /sys/fs/cgroup:/sys/fs/cgroup \
  --privileged=true lhrbest/kafka:3.2.0 \
  /usr/sbin/init

docker exec -it lhrkafka bash


-- 启动(默认已启动)
/usr/local/kafka/bin/zookeeper-server-start.sh /usr/local/kafka/config/zookeeper.properties &
/usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server.properties &

[root@lhrkafka /]# jps
 QuorumPeerMain
 Kafka
 Jps
[root@lhrkafka /]# ps -ef|grep java
/*
* 提示:该行代码过长,系统自动注释不进行高亮。一键复制会移除系统注释 
* root                   : ?        :: java -Xmx512M -Xms512M -server -XX:+UseG1GC -XX:MaxGCPauseMillis= -XX:InitiatingHeapOccupancyPercent= -XX:+ExplicitGCInvokesConcurrent -XX:MaxInlineLevel= -Djava.awt.headless=true -Xloggc:/usr/local/kafka/bin/../logs/zookeeper-gc.log -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCTimeStamps -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles= -XX:GCLogFileSize=M -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dkafka.logs.dir=/usr/local/kafka/bin/../logs -Dlog4j.configuration=file:/usr/local/kafka/bin/../config/log4j.properties -cp /usr/local/kafka/bin/../libs/activation-1.1.1.jar:/usr/local/kafka/bin/../libs/aopalliance-repackaged-2.6.1.jar:/usr/local/kafka/bin/../libs/argparse4j-0.7.0.jar:/usr/local/kafka/bin/../libs/audience-annotations-0.5.0.jar:/usr/local/kafka/bin/../libs/commons-cli-1.4.jar:/usr/local/kafka/bin/../libs/commons-lang3-3.8.1.jar:/usr/local/kafka/bin/../libs/connect-api-3.2.0.jar:/usr/local/kafka/bin/../libs/connect-basic-auth-extension-3.2.0.jar:/usr/local/kafka/bin/../libs/connect-json-3.2.0.jar:/usr/local/kafka/bin/../libs/connect-mirror-3.2.0.jar:/usr/local/kafka/bin/../libs/connect-mirror-client-3.2.0.jar:/usr/local/kafka/bin/../libs/connect-runtime-3.2.0.jar:/usr/local/kafka/bin/../libs/connect-transforms-3.2.0.jar:/usr/local/kafka/bin/../libs/hk2-api-2.6.1.jar:/usr/local/kafka/bin/../libs/hk2-locator-2.6.1.jar:/usr/local/kafka/bin/../libs/hk2-utils-2.6.1.jar:/usr/local/kafka/bin/../libs/jackson-annotations-2.12.6.jar:/usr/local/kafka/bin/../libs/jackson-core-2.12.6.jar:/usr/local/kafka/bin/../libs/jackson-databind-2.12.6.1.jar:/usr/local/kafka/bin/../libs/jackson-dataformat-csv-2.12.6.jar:/usr/local/kafka/bin/../libs/jackson-datatype-jdk8-2.12.6.jar:/usr/local/kafka/bin/../libs/jackson-jaxrs-base-2.12.6.jar:/usr/local/kafka/bin/../libs/jackson-jaxrs-json-provider-2.12.6.jar:/usr/local/kafka/bin/../libs/jackson-module-jaxb-annotations-2.12.6.jar:/usr/local/kafka/bin/../libs/jackson-module-scala_2.13-2.12.6.jar:/usr/local/kafka/bin/../libs/jakarta.activation-api-1.2.1.jar:/usr/local/kafka/bin/../libs/jakarta.annotation-api-1.3.5.jar:/usr/local/kafka/bin/../libs/jakarta.inject-2.6.1.jar:/usr/local/kafka/bin/../libs/jakarta.validation-api-2.0.2.jar:/usr/local/kafka/bin/../libs/jakarta.ws.rs-api-2.1.6.jar:/usr/local/kafka/bin/../libs/jakarta.xml.bind-api-2.3.2.jar:/usr/local/kafka/bin/../libs/javassist-3.27.0-GA.jar:/usr/local/kafka/bin/../libs/javax.servlet-api-3.1.0.jar:/usr/local/kafka/bin/../libs/javax.ws.rs-api-2.1.1.jar:/usr/local/kafka/bin/../libs/jaxb-api-2.3.0.jar:/usr/local/kafka/bin/../libs/jersey-client-2.34.jar:/usr/local/kafka/bin/../libs/jersey-common-2.34.jar:/usr/local/kafka/bin/../libs/jersey-container-servlet-2.34.jar:/usr/local/kafka/bin/../libs/jersey-container-servlet-core-2.34.jar:/usr/local/kafka/bin/../libs/jersey-hk2-2.34.jar:/usr/local/kafka/bin/../libs/jersey-server-2.34.jar:/usr/local/kafka/bin/../libs/jetty-client-9.4.44.v20210927.jar:/usr/local/kafka/bin/../libs/jetty-continuation-9.4.44.v20210927.jar:/usr/local/kafka/bin/../libs/jetty-http-9.4.44.v20210927.jar:/usr/local/kafka/bin/../libs/jetty-io-9.4.44.v20210927.jar:/usr/local/kafka/bin/../libs/jetty-security-9.4.44.v20210927.jar:/usr/local/kafka/bin/../libs/jetty-server-9.4.44.v20210927.jar:/usr/local/kafka/bin/../libs/jetty-servlet-9.4.44.v20210927.jar:/usr/local/kafka/bin/../libs/jetty-servlets-9.4.44.v20210927.jar:/usr/local/kafka/bin/../libs/jetty-util-9.4.44.v20210927.jar:/usr/local/kafka/bin/../libs/jetty-util-ajax-9.4.44.v20210927.jar:/usr/local/kafka/bin/../libs/jline-3.21.0.jar:/usr/local/kafka/bin/../libs/jopt-simple-5.0.4.jar:/usr/local/kafka/bin/../libs/jose4j-0.7.9.jar:/usr/local/kafka/bin/../libs/kafka_2.13-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-clients-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-log4j-appender-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-metadata-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-raft-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-server-common-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-shell-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-storage-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-storage-api-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-streams-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-streams-examples-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-streams-scala_2.13-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-streams-test-utils-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-tools-3.2.0.jar:/usr/local/kafka/bin/../libs/lz4-java-1.8.0.jar:/usr/local/kafka/bin/../libs/maven-artifact-3.8.4.jar:/usr/local/kafka/bin/../libs/metrics-core-2.2.0.jar:/usr/local/kafka/bin/../libs/metrics-core-4.1.12.1.jar:/usr/local/kafka/bin/../libs/netty-buffer-4.1.73.Final.jar:/usr/local/kafka/bin/../libs/netty-codec-4.1.73.Final.jar:/usr/local/kafka/bin/../libs/netty-common-4.1.73.Final.jar:/usr/local/kafka/bin/../libs/netty-handler-4.1.73.Final.jar:/usr/local/kafka/bin/../libs/netty-resolver-4.1.73.Final.jar:/usr/local/kafka/bin/../libs/netty-tcnative-classes-2.0.46.Final.jar:/usr/local/kafka/bin/../libs/netty-transport-4.1.73.Final.jar:/usr/local/kafka/bin/../libs/netty-transport-classes-epoll-4.1.73.Final.jar:/usr/local/kafka/bin/../libs/netty-transport-native-epoll-4.1.73.Final.jar:/usr/local/kafka/bin/../libs/netty-transport-native-unix-common-4.1.73.Final.jar:/usr/local/kafka/bin/../libs/osgi-resource-locator-1.0.3.jar:/usr/local/kafka/bin/../libs/paranamer-2.8.jar:/usr/local/kafka/bin/../libs/plexus-utils-3.3.0.jar:/usr/local/kafka/bin/../libs/reflections-0.9.12.jar:/usr/local/kafka/bin/../libs/reload4j-1.2.19.jar:/usr/local/kafka/bin/../libs/rocksdbjni-6.29.4.1.jar:/usr/local/kafka/bin/../libs/scala-collection-compat_2.13-2.6.0.jar:/usr/local/kafka/bin/../libs/scala-java8-compat_2.13-1.0.2.jar:/usr/local/kafka/bin/../libs/scala-library-2.13.8.jar:/usr/local/kafka/bin/../libs/scala-logging_2.13-3.9.4.jar:/usr/local/kafka/bin/../libs/scala-reflect-2.13.8.jar:/usr/local/kafka/bin/../libs/slf4j-api-1.7.36.jar:/usr/local/kafka/bin/../libs/slf4j-reload4j-1.7.36.jar:/usr/local/kafka/bin/../libs/snappy-java-1.1.8.4.jar:/usr/local/kafka/bin/../libs/trogdor-3.2.0.jar:/usr/local/kafka/bin/../libs/zookeeper-3.6.3.jar:/usr/local/kafka/bin/../libs/zookeeper-jute-3.6.3.jar:/usr/local/kafka/bin/../libs/zstd-jni-1.5.2-1.jar org.apache.zookeeper.server.quorum.QuorumPeerMain /usr/local/kafka/config/zookeeper.properties
*/
/*
* 提示:该行代码过长,系统自动注释不进行高亮。一键复制会移除系统注释 
* root                  : ?        :: java -Xmx1G -Xms1G -server -XX:+UseG1GC -XX:MaxGCPauseMillis= -XX:InitiatingHeapOccupancyPercent= -XX:+ExplicitGCInvokesConcurrent -XX:MaxInlineLevel= -Djava.awt.headless=true -Xloggc:/usr/local/kafka/bin/../logs/kafkaServer-gc.log -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCTimeStamps -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles= -XX:GCLogFileSize=M -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.port= -Dkafka.logs.dir=/usr/local/kafka/bin/../logs -Dlog4j.configuration=file:/usr/local/kafka/bin/../config/log4j.properties -cp /usr/local/kafka/bin/../libs/activation-1.1.1.jar:/usr/local/kafka/bin/../libs/aopalliance-repackaged-2.6.1.jar:/usr/local/kafka/bin/../libs/argparse4j-0.7.0.jar:/usr/local/kafka/bin/../libs/audience-annotations-0.5.0.jar:/usr/local/kafka/bin/../libs/commons-cli-1.4.jar:/usr/local/kafka/bin/../libs/commons-lang3-3.8.1.jar:/usr/local/kafka/bin/../libs/connect-api-3.2.0.jar:/usr/local/kafka/bin/../libs/connect-basic-auth-extension-3.2.0.jar:/usr/local/kafka/bin/../libs/connect-json-3.2.0.jar:/usr/local/kafka/bin/../libs/connect-mirror-3.2.0.jar:/usr/local/kafka/bin/../libs/connect-mirror-client-3.2.0.jar:/usr/local/kafka/bin/../libs/connect-runtime-3.2.0.jar:/usr/local/kafka/bin/../libs/connect-transforms-3.2.0.jar:/usr/local/kafka/bin/../libs/hk2-api-2.6.1.jar:/usr/local/kafka/bin/../libs/hk2-locator-2.6.1.jar:/usr/local/kafka/bin/../libs/hk2-utils-2.6.1.jar:/usr/local/kafka/bin/../libs/jackson-annotations-2.12.6.jar:/usr/local/kafka/bin/../libs/jackson-core-2.12.6.jar:/usr/local/kafka/bin/../libs/jackson-databind-2.12.6.1.jar:/usr/local/kafka/bin/../libs/jackson-dataformat-csv-2.12.6.jar:/usr/local/kafka/bin/../libs/jackson-datatype-jdk8-2.12.6.jar:/usr/local/kafka/bin/../libs/jackson-jaxrs-base-2.12.6.jar:/usr/local/kafka/bin/../libs/jackson-jaxrs-json-provider-2.12.6.jar:/usr/local/kafka/bin/../libs/jackson-module-jaxb-annotations-2.12.6.jar:/usr/local/kafka/bin/../libs/jackson-module-scala_2.13-2.12.6.jar:/usr/local/kafka/bin/../libs/jakarta.activation-api-1.2.1.jar:/usr/local/kafka/bin/../libs/jakarta.annotation-api-1.3.5.jar:/usr/local/kafka/bin/../libs/jakarta.inject-2.6.1.jar:/usr/local/kafka/bin/../libs/jakarta.validation-api-2.0.2.jar:/usr/local/kafka/bin/../libs/jakarta.ws.rs-api-2.1.6.jar:/usr/local/kafka/bin/../libs/jakarta.xml.bind-api-2.3.2.jar:/usr/local/kafka/bin/../libs/javassist-3.27.0-GA.jar:/usr/local/kafka/bin/../libs/javax.servlet-api-3.1.0.jar:/usr/local/kafka/bin/../libs/javax.ws.rs-api-2.1.1.jar:/usr/local/kafka/bin/../libs/jaxb-api-2.3.0.jar:/usr/local/kafka/bin/../libs/jersey-client-2.34.jar:/usr/local/kafka/bin/../libs/jersey-common-2.34.jar:/usr/local/kafka/bin/../libs/jersey-container-servlet-2.34.jar:/usr/local/kafka/bin/../libs/jersey-container-servlet-core-2.34.jar:/usr/local/kafka/bin/../libs/jersey-hk2-2.34.jar:/usr/local/kafka/bin/../libs/jersey-server-2.34.jar:/usr/local/kafka/bin/../libs/jetty-client-9.4.44.v20210927.jar:/usr/local/kafka/bin/../libs/jetty-continuation-9.4.44.v20210927.jar:/usr/local/kafka/bin/../libs/jetty-http-9.4.44.v20210927.jar:/usr/local/kafka/bin/../libs/jetty-io-9.4.44.v20210927.jar:/usr/local/kafka/bin/../libs/jetty-security-9.4.44.v20210927.jar:/usr/local/kafka/bin/../libs/jetty-server-9.4.44.v20210927.jar:/usr/local/kafka/bin/../libs/jetty-servlet-9.4.44.v20210927.jar:/usr/local/kafka/bin/../libs/jetty-servlets-9.4.44.v20210927.jar:/usr/local/kafka/bin/../libs/jetty-util-9.4.44.v20210927.jar:/usr/local/kafka/bin/../libs/jetty-util-ajax-9.4.44.v20210927.jar:/usr/local/kafka/bin/../libs/jline-3.21.0.jar:/usr/local/kafka/bin/../libs/jopt-simple-5.0.4.jar:/usr/local/kafka/bin/../libs/jose4j-0.7.9.jar:/usr/local/kafka/bin/../libs/kafka_2.13-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-clients-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-log4j-appender-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-metadata-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-raft-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-server-common-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-shell-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-storage-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-storage-api-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-streams-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-streams-examples-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-streams-scala_2.13-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-streams-test-utils-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-tools-3.2.0.jar:/usr/local/kafka/bin/../libs/lz4-java-1.8.0.jar:/usr/local/kafka/bin/../libs/maven-artifact-3.8.4.jar:/usr/local/kafka/bin/../libs/metrics-core-2.2.0.jar:/usr/local/kafka/bin/../libs/metrics-core-4.1.12.1.jar:/usr/local/kafka/bin/../libs/netty-buffer-4.1.73.Final.jar:/usr/local/kafka/bin/../libs/netty-codec-4.1.73.Final.jar:/usr/local/kafka/bin/../libs/netty-common-4.1.73.Final.jar:/usr/local/kafka/bin/../libs/netty-handler-4.1.73.Final.jar:/usr/local/kafka/bin/../libs/netty-resolver-4.1.73.Final.jar:/usr/local/kafka/bin/../libs/netty-tcnative-classes-2.0.46.Final.jar:/usr/local/kafka/bin/../libs/netty-transport-4.1.73.Final.jar:/usr/local/kafka/bin/../libs/netty-transport-classes-epoll-4.1.73.Final.jar:/usr/local/kafka/bin/../libs/netty-transport-native-epoll-4.1.73.Final.jar:/usr/local/kafka/bin/../libs/netty-transport-native-unix-common-4.1.73.Final.jar:/usr/local/kafka/bin/../libs/osgi-resource-locator-1.0.3.jar:/usr/local/kafka/bin/../libs/paranamer-2.8.jar:/usr/local/kafka/bin/../libs/plexus-utils-3.3.0.jar:/usr/local/kafka/bin/../libs/reflections-0.9.12.jar:/usr/local/kafka/bin/../libs/reload4j-1.2.19.jar:/usr/local/kafka/bin/../libs/rocksdbjni-6.29.4.1.jar:/usr/local/kafka/bin/../libs/scala-collection-compat_2.13-2.6.0.jar:/usr/local/kafka/bin/../libs/scala-java8-compat_2.13-1.0.2.jar:/usr/local/kafka/bin/../libs/scala-library-2.13.8.jar:/usr/local/kafka/bin/../libs/scala-logging_2.13-3.9.4.jar:/usr/local/kafka/bin/../libs/scala-reflect-2.13.8.jar:/usr/local/kafka/bin/../libs/slf4j-api-1.7.36.jar:/usr/local/kafka/bin/../libs/slf4j-reload4j-1.7.36.jar:/usr/local/kafka/bin/../libs/snappy-java-1.1.8.4.jar:/usr/local/kafka/bin/../libs/trogdor-3.2.0.jar:/usr/local/kafka/bin/../libs/zookeeper-3.6.3.jar:/usr/local/kafka/bin/../libs/zookeeper-jute-3.6.3.jar:/usr/local/kafka/bin/../libs/zstd-jni-1.5.2-1.jar kafka.Kafka /usr/local/kafka/config/server.properties
*/
root                : pts/    :: grep --color=auto java
[root@lhrkafka /]# netstat -tulnp | grep java
tcp               0.0.0.0:            0.0.0.0:*               LISTEN      /java            
tcp               0.0.0.0:            0.0.0.0:*               LISTEN      /java            
tcp               0.0.0.0:           0.0.0.0:*               LISTEN      /java            
tcp               0.0.0.0:           0.0.0.0:*               LISTEN      /java            
tcp               0.0.0.0:           0.0.0.0:*               LISTEN      /java            
tcp               0.0.0.0:            0.0.0.0:*               LISTEN      /java 

kafka默认占用9092端口,ZK默认占用2181端口。

kafka日志:

代码语言:javascript
代码运行次数:0
运行
复制
tailf /usr/local/kafka/logs/server.log

测试一下,在服务器上创建一个topic为test,然后生产几条信息:

代码语言:javascript
代码运行次数:0
运行
复制
-- 生产者
/usr/local/kafka/bin/kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic test
>hello
>world



-- 在另一台机器上,开启消费者控制台,监听test的topic,发现可以收到数据
/usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092  --topic test --from-beginning
hello
word




-- 查看当前服务器中的所有 topic
/usr/local/kafka/bin/kafka-topics.sh --list  --bootstrap-server  localhost:9092

源端OGG for PG微服务环境

代码语言:javascript
代码运行次数:0
运行
复制
-- OGG机器
docker pull lhrbest/ogg213mapg:v1.0

docker rm -f lhrogg213mapg
docker run -d --name lhrogg213mapg -h lhrogg213mapg \
  --net=pg-network --ip 172.72.6.100 \
  -p 9391:3389 -p 29000-29005:9000-9005 \
  -v /sys/fs/cgroup:/sys/fs/cgroup \
  --privileged=true lhrbest/ogg213mapg:v1.0 \
  /usr/sbin/init


docker exec -it lhrogg213mapg bash



-- OGGMA
cat > /ogg213c/ogg_ma/odbc.ini <<"EOF"
[ODBC Data Sources]
PGDSN=DataDirect 13 PostgreSQL Wire Protocol

[ODBC]
IANAAppCodePage=106
InstallDir=/ogg213c/ogg_ma

[PGDSN]
Driver=/ogg213c/ogg_ma/lib/GGpsql25.so
#Driver=/usr/lib64/psqlodbcw.so
Description=DataDirect 13 PostgreSQL Wire Protocol
Database=lhrdb
HostName=172.72.6.34
PortNumber=5432
LogonID=postgres
Password=lhr

EOF


su - pg
adminclient
CONNECT http://127.0.0.1:9000 deployment deploy213 as oggadmin password lhr

访问:http://192.168.1.35:29001 ,用户名:oggadmin,密码:lhr

创建身份证明、添加trandata

目标端OGG for bigdata微服务环境

代码语言:javascript
代码运行次数:0
运行
复制
docker pull lhrbest/ogg214mabigdata:v1.0

docker rm -f lhrogg214mabigdata
docker run -d --name lhrogg214mabigdata -h lhrogg214mabigdata \
  --net=pg-network --ip 172.72.6.101 \
  -p 9191:3389 -p 9000-9005:9000-9005 \
  -v /sys/fs/cgroup:/sys/fs/cgroup \
  --privileged=true lhrbest/ogg214mabigdata:v1.0 \
  /usr/sbin/init


docker exec -it lhrogg214mabigdata bash


-- 配置kafka参数
vi /ogg214c/ogg_deploy/etc/conf/ogg/kafka.props
gg.handler.kafkahandler.schemaTopicName=LHR_OGG


vi  /ogg214c/ogg_deploy/etc/conf/ogg/custom_kafka_producer.properties
bootstrap.servers=172.72.6.44:9092

访问:http://192.168.1.35:9001 ,用户名:oggadmin,密码:lhr

全量同步

注意:在此阶段,源端需要停业务,不能产生新数据。

源端创建初始化加载

image-20220725094050700

代码语言:javascript
代码运行次数:0
运行
复制
EXTRACT ext0
SETENV(PGCLIENTENCODING = "UTF8")
SETENV(ODBCINI="/ogg213c/ogg_ma/odbc.ini")
SOURCEDB PGDSN USERIDALIAS PG1, DOMAIN OGGMA
EXTFILE ./dirdat/e0 ,  PURGE
TABLE public.*;

查询报告,说明数据已经传输到目标端了,如下:

进入OS查询:

代码语言:javascript
代码运行次数:0
运行
复制
[root@lhrogg213mapg /]# cd /ogg213c/ogg_deploy/var/lib/data/dirdat
[root@lhrogg213mapg dirdat]# ll
total 272
-rw-r----- 1 oracle oinstall 278395 Jul 25 09:42 e0000000
[root@lhrogg213mapg dirdat]# ll -h
total 272K
-rw-r----- 1 oracle oinstall 272K Jul 25 09:42 e0000000
[root@lhrogg213mapg dirdat]# 

使用scp将文件传递到目标端

代码语言:javascript
代码运行次数:0
运行
复制
scp  /ogg213c/ogg_deploy/var/lib/data/dirdat/e0000000 root@172.72.6.101:/ogg214c/ogg_deploy/var/lib/data/dirdat/

cd /ogg214c/ogg_deploy/var/lib/data/dirdat/
chown oracle.oinstall -R ./dirdat/

目标端kafka数据全量初始化

代码语言:javascript
代码运行次数:0
运行
复制
REPLICAT rep0
targetdb libfile libggjava.so set property=/ogg214c/ogg_deploy/etc/conf/ogg/kafka.props
end runtime
map public.*, target public.*;

运行完后,自动停止:

全量同步结果检查

代码语言:javascript
代码运行次数:0
运行
复制
-- 查看所有历史数据
/usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092  --topic LHR_OGG --from-beginning



-- 查看当前服务器中的所有 topic
/usr/local/kafka/bin/kafka-topics.sh --list  --bootstrap-server  localhost:9092


-- topic详情
/usr/local/kafka/bin/kafka-topics.sh --bootstrap-server  localhost:9092 --describe --topic LHR_OGG

一张表一个主题,如下:

代码语言:javascript
代码运行次数:0
运行
复制
[root@lhrkafka /]# /usr/local/kafka/bin/kafka-topics.sh --list  --bootstrap-server  localhost:9092
__consumer_offsets
test
[root@lhrkafka /]# /usr/local/kafka/bin/kafka-topics.sh --list  --bootstrap-server  localhost:9092
LHR_OGG
__consumer_offsets
sbtest1
sbtest10
sbtest2
sbtest3
sbtest4
sbtest5
sbtest6
sbtest7
sbtest8
sbtest9
[root@lhrkafka /]# /usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092  --topic sbtest1 --from-beginning | wc -l
^CProcessed a total of 100 messages

数据已全量同步完成。

增量同步

配置复制槽

在配置PostgreSQL实时同步之前,需要先配置复制槽。

代码语言:javascript
代码运行次数:0
运行
复制
su - oracle
adminclient
CONNECT http://127.0.0.1:9001 deployment deploy213 as oggadmin password lhr
dblogin useridalias PG1 DOMAIN OGGMA
REGISTER EXTRACT  ext1

若不配置复制槽,会报错:OGG-25374 Oracle GoldenGate Capture for PostgreSQL, EXT1.prm: The replication slot 'ext1_eaa1c3d574a94c47' for group 'EXT1' does not exist in the database 'lhrdb'.

过程:

代码语言:javascript
代码运行次数:0
运行
复制
[root@lhrogg213mapg dirdat]# su  - oracle
Last login: Fri Dec  3 10:58:49 CST 2021 on pts/0
[oracle@lhrogg213mapg ~]$ adminclient 
Oracle GoldenGate Administration Client for PostgreSQL
Version 21.3.0.0.0 OGGCORE_21.3.0.0.0_PLATFORMS_210728.1047

Copyright (C) 1995, 2021, Oracle and/or its affiliates. All rights reserved.

Oracle Linux 7, x64, 64bit (optimized) on Aug  4 2021 19:52:45
Operating system character set identified as UTF-8.

OGG (not connected) > CONNECT http://127.0.0.1: deployment deploy213 as oggadmin password lhr             

OGG (http://127.0.0.1: deploy213) > dblogin useridalias PG1 DOMAIN OGGMA
Successfully logged into database.

OGG (http://127.0.0.1: deploy213 as PG1@lhrdb) > REGISTER EXTRACT  ext1
-07-25T02::Z  INFO    OGG-25355  Successfully created replication slot 'ext1_eaa1c3d574a94c47' for Extract group 'EXT1' in database 'lhrdb'.

OGG (http://127.0.0.1: deploy213 as PG1@lhrdb) > 

PG端配置

代码语言:javascript
代码运行次数:0
运行
复制
extract ext1
SETENV(PGCLIENTENCODING = "UTF8" )
SETENV(ODBCINI="/ogg213c/ogg_ma/odbc.ini" )
SOURCEDB PGDSN USERIDALIAS PG1, DOMAIN OGGMA
exttrail ./dirdat/e1
IGNOREREPLICATES
table public.*;

源端配置数据分发服务

登陆:http://192.168.1.35:29002

代码语言:javascript
代码运行次数:0
运行
复制
trail://172.72.6.100:9002/services/v2/sources?trail=./dirdat/e1
ogg://172.72.6.101:9003/services/v2/targets?trail=./dirdat/e1

此时,bigdata会自动添加接收方服务:

文件已传输到目标端:

代码语言:javascript
代码运行次数:0
运行
复制
[root@lhrogg214mabigdata dirdat]# ll
total 276
-rwxrwxrwx 1 oracle oinstall 278395 Jul 25 09:51 e0000000
-rw-r----- 1 oracle oinstall   1534 Jul 25 10:31 e1000000000
[root@lhrogg214mabigdata dirdat]# pwd
/ogg214c/ogg_deploy/var/lib/data/dirdat

kafka端应用配置

目标端选项较多,包括:Warehouse、Cassandra、HBase、HDFS、JDBC、Kafka和Kafka Connect等。

代码语言:javascript
代码运行次数:0
运行
复制
REPLICAT rep1
targetdb libfile libggjava.so set property=/ogg214c/ogg_deploy/etc/conf/ogg/kafka.props
map public.*, target public.*;

增量测试

代码语言:javascript
代码运行次数:0
运行
复制
lhrdb=# insert into t1 values (1),(2);
INSERT  
lhrdb=# delete from sbtest1 where id<=1;
DELETE 
lhrdb=#

源端:

数据分发:

kafka端:

命令行接收:

代码语言:javascript
代码运行次数:0
运行
复制
[root@lhrkafka /]# /usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092  --topic t1
public.t1I42022-07-25 10:34:36.05799942022-07-25 10:34:40.889000(00000000000000001820
public.t1I42022-07-25 10:34:36.05799942022-07-25 10:34:41.005000(00000000000000001922

可见,数据会增量同步的。

使用kafka manager查看kafka数据

参考:https://www.xmmup.com/kafkatuxingguanligongjucmakkafka-manageranzhuangjishiyong.html

代码语言:javascript
代码运行次数:0
运行
复制
docker pull registry.cn-hangzhou.aliyuncs.com/lhrbest/kafkamanager_cmak:3.0.0.6

docker rm -f lhrkafkamanager
docker run -itd --name lhrkafkamanager -h lhrkafkamanager \
  --net=ora-network --ip 172.72.6.45 \
  -p 9100:9000  \
  -v /sys/fs/cgroup:/sys/fs/cgroup \
  --privileged=true lhrbest/kafkamanager_cmak:3.0.0.6 \
  /usr/sbin/init

docker exec -it lhrkafkamanager bash

web登陆地址:http://192.168.1.35:9100/

总结

1、配置数据分发服务时,需要注意dirdat的位置

2、分发是9002端口,接收是9003端口。

3、若replicate进程启动不报错,但是不应用,检查是否参数文件的owner写错了?

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2022-07-27,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 DB宝 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 环境准备
    • PG环境
    • 目标端kafka环境
  • 源端OGG for PG微服务环境
    • 创建身份证明、添加trandata
  • 目标端OGG for bigdata微服务环境
  • 全量同步
    • 源端创建初始化加载
    • 使用scp将文件传递到目标端
    • 目标端kafka数据全量初始化
    • 全量同步结果检查
  • 增量同步
    • 配置复制槽
    • PG端配置
    • 源端配置数据分发服务
    • kafka端应用配置
    • 增量测试
  • 使用kafka manager查看kafka数据
  • 总结
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档