00:00
好,那么接下来呢,咱们该写的代码就是根据广播流处理主流数据。对吧,好。呃,那这个东西呢,我们先写它的一个API,因为它这个连接流,所以它的一个调用的方法呢,其实没有太多connected STEM电,你看啊,它只有process可以调用。看见没?啊,其他的叫什么,获取执行环境诶,获取第一个流,获取第二个流,诶又把它拆开了,看见没?然后呢,获取第一个类型,获取第二个类型。数据里边的类型,那这个有什么意义啊,对吧,所以对于他来说呢,其实只有一个什么process啊,只不过是这个process呢,你可以调用这个叫broadcast process function,诶那可以是kid broadcast process function,也就是说你是一个什么呃。监控流去关联一个广播流,诶,那你就可以调用它。Key的,因为这里边呢,可以写这个监控状态,对吧,因为你process方式里边可以定义状态嘛,对吧,它不同啊呃,还有。
01:06
这下面两个是重复的,只不过说在这里面可以定义这个类型。对吧,可以定义这个type information啊,一般来说呢,我们就很少这样去定义了,对吧?啊,那我们呢,并没有做。监控没有做keep前面的流对吧,所以呢,我们调用的就是它,诶这里边要一个broadcast process方啊,那我们就又一个叫broadcast process方啊。层object string object先就object,就这样啊,我们先不管这里边呢,大家看啊,就有两个方法要我们去实现,那第一个。一个呢,叫broadcast element。叫处理广播数据元素对吧,第二个呢,叫处理元素,这个其实就是我们对应的主流,它这有两个方法对吧,那我们要做的事情呢,就是说把这个里边的数据取出来。
02:01
对吧,啊该。转化为招聘就转化为招聘啊,该怎么着就怎么着。对吧,做这个事情啊呃,然后呢,把它写到。广播状态里边,那这边呢,去广播状态里边读出来,然后呢,数据来了之后,我们拿着表明去比对,看查不查得到,如果查不到这条数据就不要查得到,那就是要的数据,对吧,就这意思好,而且呢,你对比一下这个地方context。它呢就正常的context对吧,那这边呢,叫read only啊,因为我们要用那个广播流,它它肯定往广播状态里面去写,对不对,那大家都知道广播状态虽然在我们弗林当中呢,就没有特别深入的去介绍,但是4SPARK里边经常提一个东西叫广播状态,它是只读的。对吧,所以呢,你看人家这个context上下文环境,人家就叫written only啊,其实我告诉你取出来的那个状态呢,也是一个只读的状态,你不能改,你改不了,你要改呢,你就在这儿,你在这儿写数据对吧?啊,删数据啊,写数据啊,都在这儿啊,而不是你这个地方要去改。
03:07
懂吧,是这个意思啊。好,那么这是。我们看到这俩方法啊,但是其实我们这里边要做的事情还是蛮复杂的啊,那我们分析一下啊,你自己想一下,这边我们要做什么事,这边要做什么事。对吧,其实还蛮复杂的,所以呢,这个函数呢,等会儿我不写到这儿,我给它写出去,咱不有一个专门的方式嘛,因为如果这边特别长就不好看了,整个类对吧?啊,那我们来捋一下这里边儿我们要做的事情,这边呢我也有一个对应的。PPT啊,我有一个对应的PPT。在这对吧,然后我们来看一下啊。好两个方法对吧,啊嗯,那等会呢,我们先写它,因为他是负责写状态,他是负责读的,那我们先写后读嘛,对吧?啊处理这个方法啊,好,那在我们广播流。
04:09
这个里边我们要。做什么事呢,对吧,还记得当时我们的状态写的是什么类型。Key是表明用的是string类型。而value是这一行数据,用的是table process,咱们自己写的一个什么?Java病。对吧,咱们自己写的一个招聘啊呃,但是呢。我们读过来的数据,它是从这个flink CDC读过来的,读blog过来的,我们用的是杰森。官方所提供的这个什么杰森第civilization的一个STEM,也就是说它转换成了杰森的一个字符串。对吧,它转化成了杰森的字符串,OK,好,那我们第一件事情呢,就很简单,那我们就要把这个字符串呢,转化为。
05:07
Java病对象对吧?啊,第一步获取并解析数据,方便主流操作,这是我们做的,那因用Java病呢,后面确实好操作一点,嗯,比杰森肯定更好用,对吧,杰森还要写key啊,就get get,然后里边自己写K,那我们是招聘的话,我直接get AA get BB对吧?啊更好操作第二件事情。这干什么呢?校验并见表。当时我们在这个。配置信息当中是不是加了很多的内容,什么主键字段,还有建表的扩展字段,那是不是就在这个位置进行建表啊,因为这个位置是你第一次接触到这个配置信息的。假如说你新增了一个。配置信息,那一定在这边先被处理。对吧,所以这边呢,咱们要到Phoenix里边去见表。OK吧,要去见表,好,那第三件事情就简单了,因为最重要的事情嘛,把这个数据要写到。
06:05
状态里边对吧,啊,所以第三件事情要写入状态广播出去对吧?啊,就是这个事好,那对于我们主流这边。主流这边啊,也要做几件事情,那第一件事情。你对应的看应该做什么,那肯定是读取状态对吧,那你呢,负责写状态,我一切都要依赖于你这个数据,所以我第一步肯定是获取这个状态当中的信息。配置信息对吧,把它拿过来,拿过来之后呢,我们叫做过滤数据了吧,对吧,因为我们数据呢,可能有很多的这个实时表,而我们现在只要配置信息当中有的这个维度表啊,这是一种过滤,其实还有一种过滤。啊,还有个顾虑我们得说一下啊,呃,大家还记得我们在。配置信息当中加了一个。
07:00
S columns。干什么用的?是不是见表的字段呢?对吧,啊,那其实大家我不知道有没有印象啊,嗯,天天接触应该还好,我们trademark那张表应该有三个字段吧,一个ID,一个TM name。还有一个logo URL。对吧,还有一个logo UR,好,呃,那其实像logo URL这种东西呢,对于我们是没有用的,所以呢,我们可以在sing columns里边只写上ID和什么和TM name。对吧,我们只写TM name。那我们logo URL上面相当于建表里边就没有这个字段,那合理啊,因为我们不需要这个字段,但是你主流数据里边是不是一定有logo URL这个字段对吧?那这样的话,你主流数据三个字段,我表呢,两个字段对应关系就对应不上了,对吧?所以呢,这边过滤有两个第一。
08:05
整行数据的过滤,诶把这个实时表给它过滤掉。对吧,好,那还有呢,嗯,就算这个数据你确定找到了,它对应的配置信息有了,诶有可能呢,对这个字段还要做过滤,一个是行过滤,一个是列过滤,能懂吗。能不能明白,能明白给我扣个一啊,我这样解释了一下。能明白吧,啊所以第二件事情呢,这得写过滤啊过滤啊,这边是叫过滤姿态,其实呢,也得有这个行过滤对吧。好,那第三件事情呢,你过滤出来了,字段也有了,都过滤掉了,对吧,这个全部都整理好了,你可以写出去了哟,对不对啊,可以写出来了,那但是呢,补充一个think table字段,这个字段是不是。
09:09
Phoenix表明啊,因为这个数据输出以后,我们就准备往Phoenix去写了吧,所以呢,得有这个表明。得有表明,所以我们要补一下这个表明。对吧,啊,咱们要补一下这个表明这个呢,其实还好啊,那就搞定了,咱们整个的这个就解决了。对吧,啊,这是呃,分别有三件事啊好,那我呢把这个扣个图出来啊,等会呢,我们就围绕这个来写。啊,这个主要是给大家看的,我呢肯定都记住了啊,这个东西对吧,就是我放在这儿,我我这个先先这个啊,这个主流的暂时写不上,我先放在我另外一个屏幕上啊,我们先把这个放在这儿。好吧,因为接下来呢,我们要写这个代码啊,写这个代码好。
10:00
嗯,这个呢。
我来说两句