00:00
好,那接下来呢,我们完成我们flink CDC任务的一个部署,好吧,那接下来就要写这个代码,我们要做的事情呢,就是同步这么所有的表发送到这个主题当中啊,那也他可能想了不对啊,之前你说的这个我们分为了。全量同步跟增量同步两种,斯库呢做全量同步,哎,其中一部分表呢,已经倒过去了,那增量部分呢,应该也是一部分啊,怎么又变成所有的表的呢?别着急啊,因为弗雷格CDC呢,读的是这个b logg。啊对吧,那他呢读blog blog里边并没有把这个表啊,说我按文件去分,那读进来就是全量的,那也就是说你可以在代码当中做一个过滤啊可以,但是我之前提到了未来我们要考虑到实时。那实时当中大家想会用SCO做每日全量吗?
01:00
并不会吧,也就无论你这个表大还是小,是不是全部都要做增量了,对吧?所以我们先导到卡夫卡里边的,如果后续有需要,我们可以在这边做过滤,如果你不过滤,都放在HDFS,那也无所谓,只不过说这一部分表好像在H里多存了一个备份每天的新增及变化的,那到时候你可以选择不用呢。对吧?啊,你在生态环线当中一定要注意,我们宁愿多倒一些啊,不要少倒了,因为多了未来你可以不用少了,你未来想补回来就比较麻烦了,对吧?啊,所以尽量的多一些好,那这是我们的目标,呃,那接下来呢,构建我们的环境啊,那首先呢,在这边创建我们的项目。打开idea啊,下一步。这个呢,我们就快速操作一下,因为大家呢,肯定代码肯定没有问题了,呃,那我们是em-G末好吧。下一步。
02:00
按照。好,那这边呢,下午搞定。接下来呢,未来我们不止有一个代码要写啊,所以呢,我先把外层的这个我给它删掉。在这里边儿,我们去新建一个模块。叫G杠弗杠CDC啊,这是我们当才这个程序好吧。我们用子模块啊,未来呢,这一个项目里边就包含我们所有的内容了啊,我把外层的src删掉了,因为防止未来代码写错位置了,好吧,呃,那接下来这块呢,就搞定在里边创建我们的包啊呃,一个呢APP,一个YouTube,一个工具包,一个是启动任务类啊那这个比较简单,直接到这来。创建八。呃,那我就直接叫。Com点爱硅谷点APP了啊。好点I硅谷点,然后呢,有一个YouTube包啊,两个包咱们就搞定啊好那继续。
03:09
报完了之后呢,我们要修改我们的抛文件,把这个配置信息呢,全部拿过来。跟大家来带大家一起看一下我们的泡门店添加的这些个依赖啊,首先呢,前面定义了一些是我们的版本信息啊,后面呢,直接用引用的方式,这样一改比较方便啊呃,那弗Li javali stream javali客户端啊哈,客户端,因为有可能我们要写这个check point存到IDFS对吧?呃,那接下来。买的CDC对吧,然后呢,这个是我们买CDC所要移动用到的,然后连接卡夫卡的连接器,因为我们要将数据写到卡夫卡发杰森,我们自己封装为杰森,最后呢,打印日志的那种啊,下面呢还有一个打包插件。
04:00
啊,帮助我们打包的一个插件,对吧,就没有太多的什么复杂的内容啊,都是常规的一些架包啊,那这个呢,我们就都关掉。好,那接下来我们继续往后走啊,在resource目录底下创建这个配置信息。Log啊,把这个拿过来啊,未来打印日志呢,我们让它只打印这个。错误日志,如果说我们测试的当中发现它没有报错,但是呢也没有结果,我们可以把它改为info来查看一下是不是有地方,哎,出现了警告。对吧,啊,把级别调低一点,我们可以看到更多的一个日志啊。好,接下来呢,是卡夫卡的一个工具类,我们给他拿过来,那既然是工具类,我们放在这个U包下啊,直接搂过来。带着大家看一下这个代码呢,这种代码就不带这去写了啊呃,那这里面呢,我们就获取了一个生产者。
05:08
对吧,获取一个生产者,在生产者当中呢,我们有一个集群信息,就是我们之前搭建的一个集群,在这当中呢,我们创建了一个producer,然后呢,返回这个producer,在这个当中呢,就是这样的一个东西,没有什么特别复杂的一个内容,好吧,然后自己可以传一个主题啊,可以传一个主题进来就完事了,其他的就没有啥了啊。这个主题呢,从外面传进来的啊,也就未来呢,我们要用到的时候,其实就是之前我们看到叫topic。DB啊,这就直接利用的API啊,弗Li啊,去创建一个工具类啊,这个就搞定了,接下来时间戳的一个处理工具类,因为Li CDC读取过来的数据呢,它可能会有时间戳的问题,这个到时候我们再具体再聊啊。现在呢,我们直接把这个拿过来,因为我们现在并没有看到弗林CDC给我们打印数据是什么样子啊,这里边呢,就是说我们可以把这个。
06:05
字符传,也就是说2022这样的,诶幺幺杠幺六对吧,这样的数据呢,12:12:12等等,这样数据呢,可以把它转化为时间戳,也可以把时间戳的转化为年月日十分秒啊就做这个事情有一个工具类啊好。这个没有什么太多可说的,那接下来。查询HDFS的一个,呃,UR这个呢,到时候我们再说好吧。等会我们再说flink CDC采集数据啊,我们先把这个主程序给他拿过来。主程序,然后接下来呢,我们把这个内容给他拿过来带着大家看一下啊,这个之前我们就提到,如果说你要想了解整个弗雷克CC那一样的。在B站上面也有我讲的这个课程,好吧,那这个包呢,我这次取的应该不太一样,对吧,所以我把这个去掉,去掉之后呢,我们重新去导一下。
07:06
你等一下啊。呃,那这边呢,有。这个。还有。这个topic d对吧,好,那在这边我们就搞定了啊好,那为了简单测试呢,嗯,因为是这样子的,在我们将数据呢,要发送到这个卡夫卡。对吧?啊,但是呢,我们卡夫卡是在103104105,它并没有公网的IP,所以我要在本地测试啊,还测不了它只能打包运行,呃,那我想代码要在本地测呢,我想就想看一下这个数据是否正常,对不对啊,那也可以啊,咱们这样我在这边呢,再来一个。嗯,或者说呢,既然是这样子的东西啊,我在写在test里边吧,这样更符合规范一点啊,看见硅谷点APP包下有一个testlink CTC。
08:03
好,那我们把这里边儿有一些东西呢,给它截取过来啊,这个呢,我们就直接写上慢方法。然后呢,把这里边儿东西搂过来啊,做一些删减好吧。呃,那我们直接到。这个位置。啊,那我们先做一个简单的,给大家做一个简单介绍,然后再去看我们这个复杂的内容,好吧。好,那这边呢,我们做点事情。呃,我去加上一点东西。就是它啊,我直接呢在这边。啊,等会给大家解释。这个是在干什么事儿啊?
09:03
好把这个呢,我就可以。删掉了啊,那这个东西呢,我们等。啊,赋值点。Build这就OK了啊,那我们就得到了一个S,这个S呢,我们就可以直接做一个打印。啊,等会儿呢,我会给大家去说这里边儿我们具体做的哪些事情啊。呃,来开到这边。把这个什么拆开换的这个东西呢,我也得给他干掉。啊,简单一点,这个东西呢,就不要了。没有用对吧,他会发现地址呢,没有用了啊这些东西都没有用。我们就最简单化,等会儿呢给大家做一个测试。这个就是第二。这个就是第三步啊,第四步打印并启动啊,我们先把这个代码呢做一个简化,然后再看复杂的。好,那代码呢,完成了,我们来看一下这个代码其实比较简单啊,首先第一步构建我们弗林格的执行环境,那嗯,你要学习到这儿,你肯定得有弗link基础了,如果没有弗link基础,其实这个代码我带着写也没有用啊,如果有基础的话,那就也不用带着写了啊,所以就是没必要了。呃,那这边呢,我们创建这个执行环境,接下来呢,就是构建flink CDC my so的啊,那我们看一下。
10:27
这边呢是买S,这是API里边的。是CDC包下的啊,直接调用点builder,然后呢,最后去调用这个builder。啊build,因为刚才他分开写了,所以我们还是分开写啊呃,那么这边呢,有一个name哈杜101,那我们得改一下啊,两种情况,第一,如果你Windows上配了哈多的101与你这边。集群的什么?官网IP的映射,如果你配置过这个映射,那么你可以写哈,101,如果没有配置过,那你就写这个。
11:05
公网的IP地址好吧,啊,那端口3306,然后数据库呢是G。Table啊,然后是table list table list那这地方呢,不写就待约所有的表,但是这个东西你得有啊,那那说我直接把这个呃注释掉不就好了吗?反正都没有空的,那不行啊,必须要写上,写上里边不写啊呃,然后呢,用户名密码启动模式啊,刚才那边呢,就判断启动模式对吧,因为我们是第一次启动还是后续启动啊,是不一样的,对吧?因为第一次呢,你要做全量啊,因为实时呢,它也要这个全量数据嘛,好,那这个启动模式当中呢,就有这几种,稍微给大家解释一下,第一个叫em initialion em initialion呢,它是先做一个。对于监控的数据库,第一次启动的时候先做一个。初始化的快照对吧?呃,然后接下来呢,去读取最新的。
12:02
Blog对吧,读取最新的blog啊,也就是说先做一次全量,然后接下来开始消费新的,然后呢,还有early list,它从最早的blo开始消费啊,Latest只读最新的,还有指定offset,指定位置,自己指定位置消费,也可以根据指定时间戳去读取买so的blo,好吧,最后呢,叫第1SO,因为blog blog是belo,它是一个二进制的,那我们要对它进行一个反序列化,那我们用的呢是。Li CC官方给我们提供的一个反序列化的一个工具啊,那我们就直接转化为啊,那之后呢,我们直接拿到这个去from,哎,读取到这个数据,然后呢,做一个打印简单吧,代码呢,其实没有什么东西啊,因为我把刚才那个复杂代码简化了,我需要跟大家说明白这个事儿,我先把这个带大家要测试一下。啊,那这个呢,我们先。
我来说两句