导读:本文来自社区用户武舞悟老师在 IDEA 中逐步排查 Dinky 使用 Flink kafka 连接器问题的经验分享。
GitHub 地址
https://github.com/DataLinkDC/dinky
https://gitee.com/DataLinkDC/Dinky
欢迎大家为 Dinky 送上小星星~
一、准备工作
本文所使用的IDEA版本信息如下:
从https://gitee.com/DataLinkDC/Dinky.git下载代码后,切换到0.7.5分支,不做任何修改。jdk、maven 等基本的,自己配好。
在 IDEA 右边 Maven 选项中进行 Maven Profile 勾选:dev、flink-1.17、jdk1.8、scala-2.12、web,然后继续使用它进行 maven clean,maven install,随后就是漫长的等待(编译时间长短取决机器硬件),等待编译结束。
说明项 | 内容 |
---|---|
Hadoop 版本 | hadoop-3.1.4 |
Flink 任务执行模式 | Yarn Session |
Flink 版本 | flink-1.17.0 |
Dinky 版本 | dlink-release-0.7.5 |
Kafka 版本 | kafka_2.12-3.0.0 |
Kafka 运行模式 | zookeeper |
Mysql 版本 | 5.7.28 |
HDFS集群、YARN集群、Dlink环境的搭建和启动,这里略过,假设已经完成。
在 MySQL 数据库创建 dlink_075 用户并在 dlink_075 数据库中执行 dlink-doc/sql/dinky.sql 文件。
查看dlink根目录下
/dlink-admin/src/main/resources/application.ym 文件,该文件最上面可以看到:
spring:
datasource:
url: jdbc:mysql://${MYSQL_ADDR:127.0.0.1:3306}/${MYSQL_DATABASE:dlink}?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&useSSL=false&zeroDateTimeBehavior=convertToNull&serverTimezone=Asia/Shanghai&allowPublicKeyRetrieval=true
username: ${MYSQL_USERNAME:dlink}
password: ${MYSQL_PASSWORD:dlink}
driver-class-name: com.mysql.cj.jdbc.Driver
可以看到${}中有些变量(如:MYSQL_ADDR、MYSQL_DATABAS、MYSQL_USERNAME、MYSQL_PASSWORD),是可以外部指定的,如果没有指定,则使用冒号后面的值。
现在,在IDEA设置 yml 配置文件的参数,进入 Run/Debug Conigurations 配置页面(Run ---> Edit Conigurations...):
在Environment variables中填入相关变量的值 ,变量名=变量值,不同部分间用分号间隔:
MYSQL_ADDR=192.168..;MYSQL_DATABASE=dlink_075;MYSQL_USERNAME=root;MYSQL_PASSWORD=
二、运行 Flink SQL 作业
创建 Session 集群
在 Flink 根目录下执行以下命令向 yarn 集群申请资源,开启一个 yarn会话,启动 Flink 集群:
./bin/yarn-session.sh -d -nm ww
参数说明:
DROP TABLE IF EXISTS employees;
CREATE TABLE IF NOT EXISTS employees (
`emp_no` INT NOT NULL,
`birth_date` DATE,
`first_name` STRING,
`last_name` STRING,
`gender` STRING,
`hire_date` DATE,
proctime as PROCTIME(),
PRIMARY KEY (`emp_no`) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = '192.168.*.*',
'port' = '3306',
'username' = 'root',
'password' = '****',
'scan.incremental.snapshot.enabled' = 'true',
'debezium.snapshot.mode' = 'latest-offset',
'database-name' = 'nfp_ep',
'table-name' = 'employees_dinky'
);
DROP TABLE IF EXISTS dim_sex;
CREATE TABLE dim_sex (
sex STRING,
caption STRING,
PRIMARY KEY (sex) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://192.168.*.*:3306/employees',
'table-name' = 'dim_sex',
'username' = 'root',
'password' = '****'
);
select
*
from
employees
left join dim_sex FOR SYSTEM_TIME AS OF employees.proctime ON employees.gender = dim_sex.sex;
这是一个最基本的 FlinkSQL 任务,运行正常,源端表有新增或修改,可以在编辑界面下方的结果 Sheet 看到最新的结果变化,点击“获取最新数据”按钮,可以看到数据:
在编辑器中输入以下内容:
DROP TABLE IF EXISTS employees_kafka;
CREATE TABLE IF NOT EXISTS employees_kafka (
`emp_no` INT NOT NULL,
`birth_date` DATE,
`first_name` STRING,
`last_name` STRING,
`gender` STRING,
`hire_date` DATE
) WITH (
'connector' = 'kafka',
'topic' = 'flink-cdc-kafka',
'properties.bootstrap.servers' = 'bd171:9092,bd172:9092,bd173:9092',
'properties.group.id' = 'flink-cdc-kafka-group',
'format' = 'json',
'scan.startup.mode' = 'latest-offset'
);
CREATE TABLE IF NOT EXISTS employees_sink (
`emp_no` INT NOT NULL,
`birth_date` DATE,
`first_name` STRING,
`last_name` STRING,
`gender` STRING,
`hire_date` DATE,
PRIMARY KEY (`emp_no`) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://192.168.*.*:3306/employees?serverTimezone=UTC&useUnicode=true&characterEncoding=utf-8&useSSL=false',
'table-name' = 'employees_kafka_sink',
'driver' = 'com.mysql.cj.jdbc.Driver',
'username' = 'root',
'password' = '****'
);
insert into
employees_sink
select
emp_no,
birth_date,
first_name,
last_name,
gender,
hire_date
from
employees_kafka;
运行时会报错,如下:
[dlink] 2023-11-30 21:36:27.751 ERROR 16072 --- [nio-8888-exec-9] com.dlink.utils.LogUtil: 2023-11-30T21:36:27.750: Exception in executing FlinkSQL:
insert into
employees_sink
select
emp_no,
birth_date,
first_name,
last_name,
gender,
hire_date
from
employees_kafka
Error message:
org.apache.flink.table.api.ValidationException: Unable to create a source for reading table 'default_catalog.default_database.employees_kafka'.
Table options are:
'connector'='kafka'
'format'='json'
'properties.bootstrap.servers'='bd171:9092,bd172:9092,bd173:9092'
'properties.group.id'='flink-cdc-kafka-group'
'scan.startup.mode'='latest-offset'
'topic'='flink-cdc-kafka'
at org.apache.flink.table.factories.FactoryUtil.createDynamicTableSource(FactoryUtil.java:167)
at org.apache.flink.table.factories.FactoryUtil.createDynamicTableSource(FactoryUtil.java:192)
......
at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61)
at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'json' that implements 'org.apache.flink.table.factories.DeserializationFormatFactory' in the classpath.
Available factory identifiers are:
raw
at org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:546)
at org.apache.flink.table.factories.FactoryUtil$TableFactoryHelper.discoverOptionalFormatFactory(FactoryUtil.java:1130)
at org.apache.flink.table.factories.FactoryUtil$TableFactoryHelper.discoverOptionalDecodingFormat(FactoryUtil.java:1046)
at org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicTableFactory.getValueDecodingFormat(KafkaDynamicTableFactory.java:330)
at org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicTableFactory.createDynamicTableSource(KafkaDynamicTableFactory.java:183)
at org.apache.flink.table.factories.FactoryUtil.createDynamicTableSource(FactoryUtil.java:164)
... 114 more
三、问题解决
由上述异常可得知缺少json相关依赖,于是在 dlink-admin 的pom.xml文件里加上以下依赖:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>${flink.version}</version>
</dependency>
准备再次编译整个Dinky工程,你会问为什么不单独编译 dlink-admin,因为会报错,如下:
[INFO] --- spotless-maven-plugin:2.27.1:check (default) @ dlink-admin ---
[INFO] ------------------------------------------------------------------------
[INFO] BUILD FAILURE
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 36.993 s
[INFO] Finished at: 2023-11-30T22:02:29+08:00
[INFO] ------------------------------------------------------------------------
[ERROR] Failed to execute goal com.diffplug.spotless:spotless-maven-plugin:2.27.1:check (default) on project dlink-admin: Execution default of goal com.diffplug.spotless:spotless-maven-plugin:2.27.1:check failed: Unable to locate file with path: style/spotless_dlink_formatter.xml: Could not find resource 'style/spotless_dlink_formatter.xml'. -> [Help 1]
[ERROR]
[ERROR] To see the full stack trace of the errors, re-run Maven with the -e switch.
[ERROR] Re-run Maven using the -X switch to enable full debug logging.
[ERROR]
[ERROR] For more information about the errors and possible solutions, please read the following articles:
[ERROR] [Help 1] http://cwiki.apache.org/confluence/display/MAVEN/PluginExecutionException
编译整个Dinky工程前,在IDEA右边Maven选项中,把Maven Profile里对“web”勾选去掉,静态的web项目再编译是没必要的,并且编译它太耗时了。
经过对整个Dinky工程的编译,启动 dinky,再次执行上面的 FlinkSQL ,会发现又有问题,IDEA中有以下报错:
[dlink] 2023-11-30 22:22:52.386 INFO 5668 --- [ent-IO-thread-1] org.apache.flink.client.program.rest.RestClusterClient: Submitting job 'kafka到mysql单表employees_savepoint' (16258731657846a524dd565dcfbef607).
[dlink] 2023-11-30 22:22:54.522 INFO 5668 --- [ent-IO-thread-4] org.apache.flink.client.program.rest.RestClusterClient: Successfully submitted job 'kafka到mysql单表employees_savepoint' (16258731657846a524dd565dcfbef607) to 'http://bd171:18081'.
[dlink] 2023-11-30 22:22:54.698 ERROR 5668 --- [nio-8888-exec-8] com.dlink.utils.LogUtil: 2023-11-30T22:22:54.698: Exception in executing FlinkSQL:
insert into
employees_sink
select
emp_no,
birth_date,
first_name,
last_name,
gender,
hire_date
from
employees_kafka
Error message:
org.apache.flink.table.api.TableException: Failed to execute sql
at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:938)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:883)
.....
at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61)
at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.flink.util.FlinkException: Failed to execute job 'kafka到mysql单表employees_savepoint'.
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:2212)
at org.apache.flink.table.planner.delegation.DefaultExecutor.executeAsync(DefaultExecutor.java:95)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:921)
... 94 more
Caused by: java.lang.RuntimeException: org.apache.flink.runtime.client.JobInitializationException: Could not start the JobMaster.
at org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:321)
at org.apache.flink.util.function.FunctionUtils.lambda$uncheckedFunction$2(FunctionUtils.java:75)
......
at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1703)
at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:172)
Caused by: org.apache.flink.runtime.client.JobInitializationException: Could not start the JobMaster.
at org.apache.flink.runtime.jobmaster.DefaultJobMasterServiceProcess.lambda$new$0(DefaultJobMasterServiceProcess.java:97)
......
at java.lang.Thread.run(Thread.java:748)
Caused by: java.util.concurrent.CompletionException: java.lang.RuntimeException: org.apache.flink.runtime.JobException: Cannot instantiate the coordinator for operator Source: employees_kafka[1] -> ConstraintEnforcer[2] -> Sink: employees_sink[2]
at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)
at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)
at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1592)
... 3 more
Caused by: java.lang.RuntimeException: org.apache.flink.runtime.JobException: Cannot instantiate the coordinator for operator Source: employees_kafka[1] -> ConstraintEnforcer[2] -> Sink: employees_sink[2]
at org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:321)
at org.apache.flink.util.function.FunctionUtils.lambda$uncheckedSupplier$4(FunctionUtils.java:114)
at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
... 3 more
Caused by: org.apache.flink.runtime.JobException: Cannot instantiate the coordinator for operator Source: employees_kafka[1] -> ConstraintEnforcer[2] -> Sink: employees_sink[2]
at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.initialize(ExecutionJobVertex.java:229)
......
at org.apache.flink.util.function.FunctionUtils.lambda$uncheckedSupplier$4(FunctionUtils.java:112)
... 4 more
Caused by: java.lang.ClassCastException: cannot assign instance of org.apache.kafka.clients.consumer.OffsetResetStrategy to field org.apache.flink.connector.kafka.source.enumerator.initializer.ReaderHandledOffsetsInitializer.offsetResetStrategy of type org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.OffsetResetStrategy in instance of org.apache.flink.connector.kafka.source.enumerator.initializer.ReaderHandledOffsetsInitializer
at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2287)
......
at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.initialize(ExecutionJobVertex.java:223)
... 20 more
Caused by: java.lang.ClassCastException: cannot assign instance of org.apache.kafka.clients.consumer.OffsetResetStrategy to field org.apache.flink.connector.kafka.source.enumerator.initializer.ReaderHandledOffsetsInitializer.offsetResetStrategy of type org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.OffsetResetStrategy in instance of org.apache.flink.connector.kafka.source.enumerator.initializer.ReaderHandledOffsetsInitializer
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<executions>
<execution>
<id>shade-flink</id>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<artifactSet>
<includes>
<include>org.apache.flink:flink-connector-base</include>
<include>org.apache.flink:flink-connector-kafka</include>
<include>org.apache.kafka:*</include>
</includes>
</artifactSet>
<filters>
<filter>
<artifact>org.apache.kafka:*</artifact>
<excludes>
<exclude>kafka/kafka-version.properties</exclude>
<exclude>LICENSE</exclude>
<!-- Does not contain anything relevant.
Cites a binary dependency on jersey, but this is neither reflected in the
dependency graph, nor are any jersey files bundled. -->
<exclude>NOTICE</exclude>
<exclude>common/**</exclude>
</excludes>
</filter>
</filters>
<relocations>
<relocation>
<pattern>org.apache.kafka</pattern>
<shadedPattern>org.apache.flink.kafka.shaded.org.apache.kafka</shadedPattern>
</relocation>
</relocations>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
kafka.clients中的类的包名由“org.apache.kafka”着色成“org.apache.flink.kafka.shaded.org.apache.kafka”。上面以 kafka 作为数据源的FlinkSQL为什么会报错呢,回看IDEA中报错的关键点,其中有:
Caused by: java.lang.RuntimeException: org.apache.flink.runtime.client.JobInitializationException: Could not start the JobMaster.
......
Caused by: org.apache.flink.runtime.client.JobInitializationException: Could not start the JobMaster.
......
at org.apache.flink.runtime.jobmaster.DefaultSlotPoolServiceSchedulerFactory.createScheduler(DefaultSlotPoolServiceSchedulerFactory.java:119)
at org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:371)
at org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:348)
......
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:534)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:522)
at org.apache.flink.util.SerializedValue.deserializeValue(SerializedValue.java:67)
at org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.create(OperatorCoordinatorHolder.java:471)
at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.createOperatorCoordinatorHolder(ExecutionJobVertex.java:286)
at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.initialize(ExecutionJobVertex.java:223)
... 20 more
显然,IDEA端的错误,部分就转自远端Flink集群的JobMaster错误;来看看IDEA中dinky项目所使用jar包中,与kafka在关的jar包有哪些,File ---> Projec Structure ---> Project Settings ---> Libraries:
Dinky源码工程在编译后运行时,用到的主要是:flink-connector-kafka-1.17.1.jar,kafka-clients-3.0.2.jar;再看看flink用的是什么kafka有关的包:
看来flink用的是:flink-sql-connector-kafka-1.17.0.jar;把flink的kafka换成和dinky端一样的flink-connector-kafka-1.17.1.jar吧;然后再次启动flink集群,启动成功后再次运行前面的FlinkSQL,在IDEA端运行正常,没有报错:
[dlink] 2023-11-30 23:31:13.209 INFO 5668 --- [adPool-Worker-5] com.dlink.api.FlinkAPI: Unable to connect to Flink JobManager: http://FINISHED
[dlink] 2023-11-30 23:31:13.216 WARN 5668 --- [adPool-Worker-5] com.alibaba.druid.pool.DruidAbstractDataSource: discard long time none received connection. , jdbcUrl : jdbc:mysql://192.168.1.198/dlink_075?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&useSSL=false&zeroDateTimeBehavior=convertToNull&serverTimezone=Asia/Shanghai&allowPublicKeyRetrieval=true, version : 1.2.8, lastPacketReceivedIdleMillis : 60278
[dlink] 2023-11-30 23:41:29.010 WARN 5668 --- [io-8888-exec-10] com.alibaba.druid.pool.DruidAbstractDataSource: discard long time none received connection. , jdbcUrl : jdbc:mysql://192.168.1.198/dlink_075?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&useSSL=false&zeroDateTimeBehavior=convertToNull&serverTimezone=Asia/Shanghai&allowPublicKeyRetrieval=true, version : 1.2.8, lastPacketReceivedIdleMillis : 615551
[dlink] 2023-11-30 23:42:58.238 WARN 5668 --- [io-8888-exec-10] com.alibaba.druid.pool.DruidAbstractDataSource: discard long time none received connection. , jdbcUrl : jdbc:mysql://192.168.1.198/dlink_075?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&useSSL=false&zeroDateTimeBehavior=convertToNull&serverTimezone=Asia/Shanghai&allowPublicKeyRetrieval=true, version : 1.2.8, lastPacketReceivedIdleMillis : 88776
[dlink] 2023-11-30 23:42:58.980 INFO 5668 --- [io-8888-exec-10] com.dlink.executor.Executor: Simple authentication mode
[dlink] 2023-11-30 23:42:58.996 INFO 5668 --- [io-8888-exec-10] com.dlink.executor.Executor: Simple authentication mode
[dlink] 2023-11-30 23:42:59.008 INFO 5668 --- [io-8888-exec-10] com.dlink.executor.Executor: Simple authentication mode
[dlink] 2023-11-30 23:42:59.017 INFO 5668 --- [io-8888-exec-10] com.dlink.executor.Executor: Simple authentication mode
[dlink] 2023-11-30 23:42:59.137 INFO 5668 --- [ent-IO-thread-1] org.apache.flink.client.program.rest.RestClusterClient: Submitting job 'kafka到mysql单表employees_savepoint' (2fa25e0cbab4e2ba11a6818fe2da2677).
[dlink] 2023-11-30 23:42:59.588 INFO 5668 --- [ent-IO-thread-4] org.apache.flink.client.program.rest.RestClusterClient: Successfully submitted job 'kafka到mysql单表employees_savepoint' (2fa25e0cbab4e2ba11a6818fe2da2677) to 'http://bd171:18081'.
[dlink] 2023-11-30 23:44:02.942 WARN 5668 --- [adPool-Worker-5] com.alibaba.druid.pool.DruidAbstractDataSource: discard long time none received connection. , jdbcUrl : jdbc:mysql://192.168.1.198/dlink_075?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&useSSL=false&zeroDateTimeBehavior=convertToNull&serverTimezone=Asia/Shanghai&allowPublicKeyRetrieval=true, version : 1.2.8, lastPacketReceivedIdleMillis : 60395
但是在远端flink集群上却有错:
还是把dinky和flink端的jar包统一成flink-sql-connector-kafka-1.17.0.jar吧;flink端更换jar的过程略;在IDEA中全局搜索包含flink-connector-kafka的xml文件:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-sql-connector-kafka</artifactId>
<version>${flink.version}</version>
</dependency>
然后:重新编译dinky项目,启动flink集群,启动dinky,运行前面的FlinkSQL,这回一切正常。
2023-11-30 23:55:34,027 WARN org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.ConsumerConfig [] - The configuration 'client.id.prefix' was supplied but isn't a known config.
2023-11-30 23:55:34,027 WARN org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.ConsumerConfig [] - The configuration 'partition.discovery.interval.ms' was supplied but isn't a known config.
2023-11-30 23:55:34,033 INFO org.apache.flink.kafka.shaded.org.apache.kafka.common.utils.AppInfoParser [] - Kafka version: 2.7.2
2023-11-30 23:55:34,034 INFO org.apache.flink.kafka.shaded.org.apache.kafka.common.utils.AppInfoParser [] - Kafka commitId: 37a1cc36bf4d76f3
2023-11-30 23:55:34,034 INFO org.apache.flink.kafka.shaded.org.apache.kafka.common.utils.AppInfoParser [] - Kafka startTimeMs: 1701335854027
2023-11-30 23:55:34,065 INFO org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - Starting split fetcher 0
2023-11-30 23:55:34,083 INFO org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.KafkaConsumer [] - [Consumer clientId=flink-cdc-kafka-group-0, groupId=flink-cdc-kafka-group] Subscribed to partition(s): flink-cdc-kafka-0
2023-11-30 23:55:34,095 INFO org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.internals.SubscriptionState [] - [Consumer clientId=flink-cdc-kafka-group-0, groupId=flink-cdc-kafka-group] Seeking to LATEST offset of partition flink-cdc-kafka-0
2023-11-30 23:55:35,618 INFO org.apache.flink.kafka.shaded.org.apache.kafka.clients.Metadata [] - [Consumer clientId=flink-cdc-kafka-group-0, groupId=flink-cdc-kafka-group] Resetting the last seen epoch of partition flink-cdc-kafka-0 to 3 since the associated topicId changed from null to oQYrIKJBRe-oWt7Q0nZi7A
2023-11-30 23:55:35,625 INFO org.apache.flink.kafka.shaded.org.apache.kafka.clients.Metadata [] - [Consumer clientId=flink-cdc-kafka-group-0, groupId=flink-cdc-kafka-group] Cluster ID: _nGd57n0QxGTp130IKGwDQ
2023-11-30 23:55:35,669 INFO org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.internals.SubscriptionState [] - [Consumer clientId=flink-cdc-kafka-group-0, groupId=flink-cdc-kafka-group] Resetting offset for partition flink-cdc-kafka-0 to position FetchPosition{offset=1222, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[bd171:9092 (id: 171 rack: null)], epoch=3}}.
四、结论
最后总结为直接在 Dinky 及 Flink 环境中使用 flink-sql-connector-kafka 可以避免很多依赖问题。此外,类似与 mysql cdc 等依赖的引入也建议使用带有 flink-sql 的 jar 包可以避免很多问题。
以上浓缩了从发现问题到解决核心问题的全过程,这个过程,可不像上面描述的这么容易解决了。
继续努力!