00:00
啊,当然刚才我们写的代码当中有一点问题啊,刚才下课的时候有个同学跟我说了啊,就这个地方,你们上课发现问题了,可以及时说啊,这个地方没有写连接地址是不是啊,没有写少写个东西啊。parties.set party啊就是con config.per serve,那这个东西呢,地址我是不是可以统一啊,我不用从外面传进来对吧,因为对于当前这个项目而言,它都是一样的嘛,你换一个项目,你在这里边改一下就行了,对吧,我呢就写哈102王号9092这个漏了吧,啊估计也有其他同学发现的啊,看到这个问题,但是呢,嗯,没说啊,但是以后看到这个问题可以说一下啊。好,那这个当时写忘了写漏了对吧?呃,那接下来呢,我们写回来我们这个代码,那用flink CDC来读取配置信息表,创建这个配置流。呃,那么我们要导依赖了呀,对吧?啊,要导依赖了,OK,我看一下啊这边啊,直接在茯苓CC在这儿吧,在这儿啊,在这里边,因为没有把依赖扔出来啊,直接链接的一个文档啊,我们打开从这里边去拿一下也可以啊。
01:06
导入这个对应的依赖,那我们看一下导谁啊,Flink Java stream Java clients客户端都有了,那驱动对吧?Flink CDC的连接器。啊,然后呢,Table planner这几个,这三个对吧,就他们仨。要导进来吧啊,把他们三个导进来就行了,其他的无所谓嘛,因为这个都有,你想想看,这个是不是我们环境有,呃,最后这个fast杰森是不是也有啊,对吧,先拿过来啊。依赖对吧,我们就说了,你缺什么依赖呢,到后面去补,这来得及没关系对吧?呃,那么呢,我们把它放到这个,放弗Li一块啊,放到一块。好,那这边有了啊,这个呢,改个名字啊。用那个对吧,统一,因为我们现在做项目了,既然这样的话,我们有一个统一的,那就用统一的呗,对吧,弗Li版本啊R过来这边呢,是01:13点零是吧,还是用这个啊,但是其实一样的啊,但是呢,我们既然前面写了配置了,那就用这个配置呗,好那弗Li CC怎么读的啊,首先要构建一个S对吧叫。
02:20
My circle。诶,他没有依赖没有导进来吗?刷一下。因为写了没反应对吧。嗯,好,现在有了对吧,有了之后呢,我们再重新写一下啊。啊,咋回事啊,我看一下啊。依赖咋导进来不生效呢?我先把这个干掉,把这个多余的依赖干掉。呃,我检查了一下这个东西呢。有啊。
03:02
在这对吧。没用啊。就是正常的写我们的要构建一个source嘛,就是这个 my circlercs.build对吧,它不提示我大小写,而且后面的文字大写应该也无所谓啊。呃。有点有点有有点诡异,有点诡异啊,很奇怪啊,刚才一直不行对吧,啊,现在又好了。点build,最后呢,调用这个build,因为我检查一下这个依赖都是好的,对吧,我开始的依赖可能有问题啊,但是后面检查是好的。很奇怪啊,那无所谓的,不重要了啊好,那这个是build,这个build,那这里边要的参数我得写一下第一个。How's nameme对吧,主金明。端口号。对吧,嗯。用户名。
04:02
密码对吧,这个必要的,然后呢,带着list。Table。啊,然后呢,从哪开始start options对吧,以及。反的话器。对吧,我们要写的就是这些参数,好,那我们数据库呢,在哈杜102。端口号3306。用户名我呢是root,你就写你自己的,我密码我是123456啊对应的写你自己就行了,好我的数据库啊,这个东西写不要写错了好吧,Come。啊,我写我的,你写你的啊,你你要跟我一样,嗯,可以啊,你不一样,这个也无所谓对吧?好,那咱们的表明呢,一定要把这个库名带上啊,然后呢,表明叫table。Process啊,表明叫table process start options,那对于这张表而言,我们应该全要读,对吧?那就是start options.initial第一次嘛,全量读啊,好,那反学硫化器。
05:04
Jason divisionization的一个STEM啊,用它就好了,然后我们呢,就构建好了这个叫。MYS啊,接下来呢,读取数据in.from。S对吧,呃,这边呢买sce第二个叫world。No world,对吧?strange.no world mark第三个名字啊,My circle source,给它取个名字加V得到一个my circle.s对吧?它是读取blog产生的一个流。好,那这块就搞定了啊,这还比较简单对吧?啊,因为负6CC呢,咱们刚学啊,所以没什么太多可说的,接下来呢,要把这个处理成广播流。啊,咱们要把它处理成广播流,那很简单啊,你就拿着MYSODS点调用broadcast。对吧,啊,不是直接这样调用,这样调用CTRL加V,你得到了并不是一个广播流,它就是一个普通的数据流,对吧?好,那咱们这边呢,CTRLP它是传的参数。
06:10
这个东西。啊,你看啊。它可以传一个什么叫广播状态的描述器啊,或者说你看它也行啊,比如说我们这样啊啊ad对吧,你看着这里面呢,有一个状态描述器返回值呢,叫broadcas对吧,叫广播流,而你直接是括号这个广播,这个是我给大家扩展的那个什么叫重分区算子,大家还记得吗。第一天的考试题里边,下午我给大家讲的时候,重分区算子里边有一个broadcast给大家演示过,对吧,它只是一个重分区算子,如果我们要做成广播流,其实调用的是上面,诶要传一个。状态描述器。啊好,那我们看一下它这个状态描述器要什么类型啊,要一个map state。
07:03
啊,要一个map state好,呃,那这边呢,我们就另一个叫map state。Described对吧?呃,那我们想一下啊,既然是map类型,那么它就有K和value,那我们先这边呢,你在定义的时候,你是不是要给KV类型啊,先不用考虑这个什么类型,我们先想什么呢?先想这个K跟V我们要存什么。好不好对吧,想一想我们这个K应该存什么,Value应该存什么,然后我们再来考虑它类型应该用什么。
08:06
嗯。想一想啊,呃,我们在广播里对吧,他要用这个广播状态。对吧,广播状态,那这个广播状态它是一个map类型的,这个KV分别是什么呢。啊K应该是存个主流跟广播流的共有字段,对了,而且还得唯一。对不对,因为他是map类型,如果你不唯一那也不行,对吧,他两个人都有,因为一个负责往状态里边存,一个负责来写,这一点方宇说的不错啊,他他得是他俩共有的字段。而且得唯一。
09:01
那这个字段谁符合呢?你觉得哪个符合呀?哎,姚总说了,K呢,存S表明V存一行数据对不对呀,没毛病吧,你肯定要把这一行数据都要存进去,因为读完数据我们写的五个字段都是紧着这个字段来用的,你肯定要都要放进去啊,对吧?啊没毛病啊,说的好对吧?好,那既然这样的话,那我们K必然是string型了,那value呢,你可以用string杰森对吧?为了后续处理方便,我呢在这边定义一个Java。
10:08
好吧,定一个招聘啊,那其实都可以啊,你直接用spring用接对象啊,这些东西都可以,但是呢,都嗯不是很好处理,你知道吧,所以呢,下面呢,在这边我写一个。招聘啊,写个招聘可以吧。好,就用这个啊,叫table process。在B目录下来一个啊。OK吧,五个字段的一个照并,所以这边呢,一行数据我们就可以写照并的类型了,对吧,叫table process这个类型的啊好,那这里边呢,有三个参数,一个名字,两个类型嘛,名字呢,那就是map state啊,随便取个名字就行了啊,这边呢是stream。
11:02
点,然后呢是table process点。二加V,那我们就得到了一个状态描述器,对吧,把它。扔进来好,那这样的话呢,我们就得到了一个。管不了。对吧,这个代码大家都是写过的,其实啊,只不过说你要注意一下,就是这里边你要放什么东西来决定它类型是什么,对吧,就这个事情好,最后呢,连接两个流,那主流呢是接obj DS点连接connect对吧?啊连接广播流broadcast,对。看下位诶得到一个连接流。好了。对吧,那接下来呢,就加工这个数据,这个比较麻烦啊,今天呢,半个小时肯定写不完,所以呢,我就不写了啊,就就到这对吧,因为写不完写一半,这是一个方法对吧,一个处理方法,写一半我觉得这样不好啊,因为半个小时这个东西呢写不完啊,这里面很复杂啊。处理这个很复杂,因为你想还要见表,还要过滤,还要有状态,一个写状态,一个存状态,事儿还蛮多的,对吧,所以呢,先不着急啊。
我来说两句