前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >【十九】初学Kafka并实战整合SpringCloudStream进行使用

【十九】初学Kafka并实战整合SpringCloudStream进行使用

作者头像
小z666
发布2024-06-21 17:51:43
690
发布2024-06-21 17:51:43
举报
文章被收录于专栏:javajava

一、下载安装Kafka

要进行kafka的学习,首先肯定得安装kafka了。安装地址如下:

Apache Kafka

很慢,可以去找百度云资源。

1、下载Scala版本的,可以直接使用。

watermark,type_d3F5LXplbmhlaQ,shadow_50,text_Q1NETiBA5bCPeuKZgA==,size_20,color_FFFFFF,t_70,g_se,x_16
watermark,type_d3F5LXplbmhlaQ,shadow_50,text_Q1NETiBA5bCPeuKZgA==,size_20,color_FFFFFF,t_70,g_se,x_16

,然后点击链接进行下载。

2、解压后得到如下:

watermark,type_d3F5LXplbmhlaQ,shadow_50,text_Q1NETiBA5bCPeuKZgA==,size_20,color_FFFFFF,t_70,g_se,x_16
watermark,type_d3F5LXplbmhlaQ,shadow_50,text_Q1NETiBA5bCPeuKZgA==,size_20,color_FFFFFF,t_70,g_se,x_16

3、进入kafka安装目录:

watermark,type_d3F5LXplbmhlaQ,shadow_50,text_Q1NETiBA5bCPeuKZgA==,size_20,color_FFFFFF,t_70,g_se,x_16
watermark,type_d3F5LXplbmhlaQ,shadow_50,text_Q1NETiBA5bCPeuKZgA==,size_20,color_FFFFFF,t_70,g_se,x_16

4、执行命令启动zookeeper和kafka,需要先启动zookeeper,再启动kafka(kafka安装后,默认带了一个zookeeper) ,都有一套默认配置,此处就不修改配置了,直接使用默认配置启动,命令如下:

watermark,type_d3F5LXplbmhlaQ,shadow_50,text_Q1NETiBA5bCPeuKZgA==,size_20,color_FFFFFF,t_70,g_se,x_16
watermark,type_d3F5LXplbmhlaQ,shadow_50,text_Q1NETiBA5bCPeuKZgA==,size_20,color_FFFFFF,t_70,g_se,x_16
watermark,type_d3F5LXplbmhlaQ,shadow_50,text_Q1NETiBA5bCPeuKZgA==,size_20,color_FFFFFF,t_70,g_se,x_16
watermark,type_d3F5LXplbmhlaQ,shadow_50,text_Q1NETiBA5bCPeuKZgA==,size_20,color_FFFFFF,t_70,g_se,x_16

启动zookeeper: bin\windows\zookeeper-server-start.bat config\zookeeper.properties 启动kafka: bin\windows\kafka-server-start.bat config\server.properties 注意:是在bin目录上级执行的。此处是在windows上的启动命令,linux的不一样,后面有机会再学习。

watermark,type_d3F5LXplbmhlaQ,shadow_50,text_Q1NETiBA5bCPeuKZgA==,size_20,color_FFFFFF,t_70,g_se,x_16
watermark,type_d3F5LXplbmhlaQ,shadow_50,text_Q1NETiBA5bCPeuKZgA==,size_20,color_FFFFFF,t_70,g_se,x_16
watermark,type_d3F5LXplbmhlaQ,shadow_50,text_Q1NETiBA5bCPeuKZgA==,size_20,color_FFFFFF,t_70,g_se,x_16
watermark,type_d3F5LXplbmhlaQ,shadow_50,text_Q1NETiBA5bCPeuKZgA==,size_20,color_FFFFFF,t_70,g_se,x_16

表示两者已经启动成功了,下面可以开启构建项目了。

注意:zookeeper默认占用端口号:2181,kafka占用9092,注意跑之前看看端口是否被占用,否则出现端口冲突问题。

二、构建父子工程

为了测试消费者服务和生产者服务,所以父子工程就长这样:

watermark,type_d3F5LXplbmhlaQ,shadow_50,text_Q1NETiBA5bCPeuKZgA==,size_20,color_FFFFFF,t_70,g_se,x_16
watermark,type_d3F5LXplbmhlaQ,shadow_50,text_Q1NETiBA5bCPeuKZgA==,size_20,color_FFFFFF,t_70,g_se,x_16

上次整合RabbitMQ时就建立了这个,此处直接拿来用了。

三、使用SpringCloudStream默认的信道实现消息传递

下面通过SpringCloudSteam实现Kafka,最基本的一个使用流程,差不多是下面这个样子(可能描述不准确):

watermark,type_d3F5LXplbmhlaQ,shadow_50,text_Q1NETiBA5bCPeuKZgA==,size_20,color_FFFFFF,t_70,g_se,x_16
watermark,type_d3F5LXplbmhlaQ,shadow_50,text_Q1NETiBA5bCPeuKZgA==,size_20,color_FFFFFF,t_70,g_se,x_16

如图可知,大概的一个流程就是(只学习了最基本的使用):

  • 构建消息生产者
  • 指定消息输入通道,并指定该通道指向的Topic
  • 构建消息消费者
  • 指定消息输出通道,并指定该通道订阅哪一个Topic
  • 构建消费者监听器,监听指定的输出通道,并获取消息进行消费

大概流程就是这样,下面开始具体操作。

本节实现功能:生成订单时,创建订单记录并通知买家(通过邮件和短信)。

先准备“构建材料”,在父模块引入所需jar包:

watermark,type_d3F5LXplbmhlaQ,shadow_50,text_Q1NETiBA5bCPeuKZgA==,size_20,color_FFFFFF,t_70,g_se,x_16
watermark,type_d3F5LXplbmhlaQ,shadow_50,text_Q1NETiBA5bCPeuKZgA==,size_20,color_FFFFFF,t_70,g_se,x_16
代码语言:javascript
复制
<dependency>
     <groupId>org.springframework.cloud</groupId>
     <artifactId>spring-cloud-starter-stream-kafka</artifactId>
     <version>3.0.3.RELEASE</version>       
</dependency>

增加这一个Jar包即可。

重点!!!:此处有个大坑,仍旧是版本问题,我目前的版本是这样的(这种我自己没问题):

watermark,type_d3F5LXplbmhlaQ,shadow_50,text_Q1NETiBA5bCPeuKZgA==,size_20,color_FFFFFF,t_70,g_se,x_16
watermark,type_d3F5LXplbmhlaQ,shadow_50,text_Q1NETiBA5bCPeuKZgA==,size_20,color_FFFFFF,t_70,g_se,x_16
watermark,type_d3F5LXplbmhlaQ,shadow_50,text_Q1NETiBA5bCPeuKZgA==,size_20,color_FFFFFF,t_70,g_se,x_16
watermark,type_d3F5LXplbmhlaQ,shadow_50,text_Q1NETiBA5bCPeuKZgA==,size_20,color_FFFFFF,t_70,g_se,x_16

2.3.*和3.0.*,试了很多版本组合都会运行时报错,不是找不到这个类就是找不到哪个类,后面再去研究一下版本匹配问题。

3.1 构建生产者服务

依赖导入完成就开始构建生产者服务。

watermark,type_d3F5LXplbmhlaQ,shadow_50,text_Q1NETiBA5bCPeuKZgA==,size_20,color_FFFFFF,t_70,g_se,x_16
watermark,type_d3F5LXplbmhlaQ,shadow_50,text_Q1NETiBA5bCPeuKZgA==,size_20,color_FFFFFF,t_70,g_se,x_16

3.1.1 修改配置文件

watermark,type_d3F5LXplbmhlaQ,shadow_50,text_Q1NETiBA5bCPeuKZgA==,size_20,color_FFFFFF,t_70,g_se,x_16
watermark,type_d3F5LXplbmhlaQ,shadow_50,text_Q1NETiBA5bCPeuKZgA==,size_20,color_FFFFFF,t_70,g_se,x_16

主要是绑定SpringCloudStream的输入信道以及指定kafka的服务器地址。

上图的output是Stream自带的消息输入信道,从最开始的流程图可以得知,需要新建topic和信道的绑定关系,上图的意思就是在output信道绑定上stream-demo这个topic,content-type是指发送的消息的格式,若想在消费端进行消息类型的转换,最好使用application/json类型。

注意:上图的output输入信道是stream自带的,还自带了一个输出信道input。

watermark,type_d3F5LXplbmhlaQ,shadow_50,text_Q1NETiBA5bCPeuKZgA==,size_20,color_FFFFFF,t_70,g_se,x_16
watermark,type_d3F5LXplbmhlaQ,shadow_50,text_Q1NETiBA5bCPeuKZgA==,size_20,color_FFFFFF,t_70,g_se,x_16
watermark,type_d3F5LXplbmhlaQ,shadow_50,text_Q1NETiBA5bCPeuKZgA==,size_20,color_FFFFFF,t_70,g_se,x_16
watermark,type_d3F5LXplbmhlaQ,shadow_50,text_Q1NETiBA5bCPeuKZgA==,size_20,color_FFFFFF,t_70,g_se,x_16

所以可以直接使用这两个通道。

3.1.2 新增发送消息服务类

新增一个发送消息的服务类,如下:

watermark,type_d3F5LXplbmhlaQ,shadow_50,text_Q1NETiBA5bCPeuKZgA==,size_20,color_FFFFFF,t_70,g_se,x_16
watermark,type_d3F5LXplbmhlaQ,shadow_50,text_Q1NETiBA5bCPeuKZgA==,size_20,color_FFFFFF,t_70,g_se,x_16

注释的代码不需要管,后面才用到。通过EnableBinding注解绑定Source类(自带的那个消息输入通道)。

调用source的output方法下的send方法发送一个Message类型的消息。

watermark,type_d3F5LXplbmhlaQ,shadow_50,text_Q1NETiBA5bCPeuKZgA==,size_20,color_FFFFFF,t_70,g_se,x_16
watermark,type_d3F5LXplbmhlaQ,shadow_50,text_Q1NETiBA5bCPeuKZgA==,size_20,color_FFFFFF,t_70,g_se,x_16

消息消费者服务接受的会是一个String类型的消息。

3.1.3 新增controller进行接口测试

新增一个controller,写一个订单生成接口,如下:

watermark,type_d3F5LXplbmhlaQ,shadow_50,text_Q1NETiBA5bCPeuKZgA==,size_20,color_FFFFFF,t_70,g_se,x_16
watermark,type_d3F5LXplbmhlaQ,shadow_50,text_Q1NETiBA5bCPeuKZgA==,size_20,color_FFFFFF,t_70,g_se,x_16

在此接口调用消息发送者服务写的发送消息方法,发送消息。

3.2 构建消费者服务
watermark,type_d3F5LXplbmhlaQ,shadow_50,text_Q1NETiBA5bCPeuKZgA==,size_20,color_FFFFFF,t_70,g_se,x_16
watermark,type_d3F5LXplbmhlaQ,shadow_50,text_Q1NETiBA5bCPeuKZgA==,size_20,color_FFFFFF,t_70,g_se,x_16

3.2.1 修改配置文件

watermark,type_d3F5LXplbmhlaQ,shadow_50,text_Q1NETiBA5bCPeuKZgA==,size_20,color_FFFFFF,t_70,g_se,x_16
watermark,type_d3F5LXplbmhlaQ,shadow_50,text_Q1NETiBA5bCPeuKZgA==,size_20,color_FFFFFF,t_70,g_se,x_16

此处绑定输出通道和Topic的关系,上图表示使用默认的input消息输出信道绑定demo这个Topic,监听input这个信道就可以获取demo中的消息了,下面创建监听器。

3.2.2 新建短信处理监听类

watermark,type_d3F5LXplbmhlaQ,shadow_50,text_Q1NETiBA5bCPeuKZgA==,size_20,color_FFFFFF,t_70,g_se,x_16
watermark,type_d3F5LXplbmhlaQ,shadow_50,text_Q1NETiBA5bCPeuKZgA==,size_20,color_FFFFFF,t_70,g_se,x_16

通过EnableBinding注解,将该类绑定默认的Sink类

该类的信道名称是input。

通过StreamListener注解,监听topic中获取到的消息,并进行处理消费。

3.2.3 新建邮件处理监听类

watermark,type_d3F5LXplbmhlaQ,shadow_50,text_Q1NETiBA5bCPeuKZgA==,size_20,color_FFFFFF,t_70,g_se,x_16
watermark,type_d3F5LXplbmhlaQ,shadow_50,text_Q1NETiBA5bCPeuKZgA==,size_20,color_FFFFFF,t_70,g_se,x_16

同上面的一样。

启动两个服务,进行演示,结果如下:

watermark,type_d3F5LXplbmhlaQ,shadow_50,text_Q1NETiBA5bCPeuKZgA==,size_20,color_FFFFFF,t_70,g_se,x_16
watermark,type_d3F5LXplbmhlaQ,shadow_50,text_Q1NETiBA5bCPeuKZgA==,size_20,color_FFFFFF,t_70,g_se,x_16
watermark,type_d3F5LXplbmhlaQ,shadow_50,text_Q1NETiBA5bCPeuKZgA==,size_20,color_FFFFFF,t_70,g_se,x_16
watermark,type_d3F5LXplbmhlaQ,shadow_50,text_Q1NETiBA5bCPeuKZgA==,size_20,color_FFFFFF,t_70,g_se,x_16

消费者服务已经获取到了生产者服务通过Topic发送的消息。

四、使用自定义信道(和发送消息体)实现消息传递

上述代码实现的是通过stream默认的信道实现的,本节实现通过自定义信道实现,除了邮件和短信处理外,额外新增一个操作(通过新的信道)实现。上述代码可以发现,消费者服务接收到的消息是String类型的,若想发送和接受自定义的类型,本节也进行实现。

本节实现功能:生成订单时,创建订单记录并通知买家(通过邮件和短信),然后额外短信通知买家购买的东西是什么。

4.1 构建公共模块

4.1.1 构建自定义信道接口

由于本章节要实现自定义信道的功能,所有需要模仿stream自带的Sink接口和MySource接口写自己的自定义信道接口,再由于这部分代码会在消费者服务和生产者服务都有用到,直接将该两个接口写在common公共类里面,如下:

watermark,type_d3F5LXplbmhlaQ,shadow_50,text_Q1NETiBA5bCPeuKZgA==,size_20,color_FFFFFF,t_70,g_se,x_16
watermark,type_d3F5LXplbmhlaQ,shadow_50,text_Q1NETiBA5bCPeuKZgA==,size_20,color_FFFFFF,t_70,g_se,x_16
watermark,type_d3F5LXplbmhlaQ,shadow_50,text_Q1NETiBA5bCPeuKZgA==,size_20,color_FFFFFF,t_70,g_se,x_16
watermark,type_d3F5LXplbmhlaQ,shadow_50,text_Q1NETiBA5bCPeuKZgA==,size_20,color_FFFFFF,t_70,g_se,x_16
watermark,type_d3F5LXplbmhlaQ,shadow_50,text_Q1NETiBA5bCPeuKZgA==,size_20,color_FFFFFF,t_70,g_se,x_16
watermark,type_d3F5LXplbmhlaQ,shadow_50,text_Q1NETiBA5bCPeuKZgA==,size_20,color_FFFFFF,t_70,g_se,x_16

4.1.2 自定义消息体

此处特别讲一个自定义消息体,找了一些文章没有发现一个简单的方式实现自定义消息体,结果捣鼓捣鼓发现实现springboot提供的Message接口即可,如下:

watermark,type_d3F5LXplbmhlaQ,shadow_50,text_Q1NETiBA5bCPeuKZgA==,size_20,color_FFFFFF,t_70,g_se,x_16
watermark,type_d3F5LXplbmhlaQ,shadow_50,text_Q1NETiBA5bCPeuKZgA==,size_20,color_FFFFFF,t_70,g_se,x_16

因为发送消息的send方法的入参必须是一个Message类的泛型类。

watermark,type_d3F5LXplbmhlaQ,shadow_50,text_Q1NETiBA5bCPeuKZgA==,size_20,color_FFFFFF,t_70,g_se,x_16
watermark,type_d3F5LXplbmhlaQ,shadow_50,text_Q1NETiBA5bCPeuKZgA==,size_20,color_FFFFFF,t_70,g_se,x_16

如此这般,common就结束了,下面整改生产者和消费者服务。

4.2 构建生产者服务

4.2.1 修改配置文件

watermark,type_d3F5LXplbmhlaQ,shadow_50,text_Q1NETiBA5bCPeuKZgA==,size_20,color_FFFFFF,t_70,g_se,x_16
watermark,type_d3F5LXplbmhlaQ,shadow_50,text_Q1NETiBA5bCPeuKZgA==,size_20,color_FFFFFF,t_70,g_se,x_16

对比上面一小节,本小结在配置中新增了一个绑定,就是关于自定义的output1这个信道的绑定,翻翻上面可以看到自定义的传入信道接口类信道名称叫做input1,所以此处保持一致,然后将该信道的消息发送到demo1这个topic。

4.2.2 修改消息发送服务类

watermark,type_d3F5LXplbmhlaQ,shadow_50,text_Q1NETiBA5bCPeuKZgA==,size_20,color_FFFFFF,t_70,g_se,x_16
watermark,type_d3F5LXplbmhlaQ,shadow_50,text_Q1NETiBA5bCPeuKZgA==,size_20,color_FFFFFF,t_70,g_se,x_16

修改该类,EnableBinding注解的值改为绑定多个传入信道接口。

再调用自定义的传入信道接口的send方法发送消息。

此处的sendMsg方法的入参改为自定义的消息体。

4.3 构建消费者服务

4.3.1 修改配置文件

watermark,type_d3F5LXplbmhlaQ,shadow_50,text_Q1NETiBA5bCPeuKZgA==,size_20,color_FFFFFF,t_70,g_se,x_16
watermark,type_d3F5LXplbmhlaQ,shadow_50,text_Q1NETiBA5bCPeuKZgA==,size_20,color_FFFFFF,t_70,g_se,x_16

让自定义的名字叫input1的这个传出信道去绑定demo1这个topic,让他可以获取topic1里面的消息,后面再监听input1这个出口即可。

4.3.2 新增额外处理的监听器

监听input1这个出口。

watermark,type_d3F5LXplbmhlaQ,shadow_50,text_Q1NETiBA5bCPeuKZgA==,size_20,color_FFFFFF,t_70,g_se,x_16
watermark,type_d3F5LXplbmhlaQ,shadow_50,text_Q1NETiBA5bCPeuKZgA==,size_20,color_FFFFFF,t_70,g_se,x_16
watermark,type_d3F5LXplbmhlaQ,shadow_50,text_Q1NETiBA5bCPeuKZgA==,size_20,color_FFFFFF,t_70,g_se,x_16
watermark,type_d3F5LXplbmhlaQ,shadow_50,text_Q1NETiBA5bCPeuKZgA==,size_20,color_FFFFFF,t_70,g_se,x_16

对比前面两个监听器,此处使用EnableBinding注解绑定的是自定义的传入信道,然后再通过StreamListener注解,去监听这个传出信道进出消息消费,逻辑处理。

此处的方法接收到的数据可以通过json转换成自定义消息体的消息。(注意,消息生产者一定是要通过content-type: application/json 这种格式发送的消息才可以进行json转换)。

然后重启provider和consumer两个服务,记得要重新编译common模块,因为改了common模块的东西,然后另外两个模块又使用了它。调用测试接口进行效果演示,如下:

watermark,type_d3F5LXplbmhlaQ,shadow_50,text_Q1NETiBA5bCPeuKZgA==,size_20,color_FFFFFF,t_70,g_se,x_16
watermark,type_d3F5LXplbmhlaQ,shadow_50,text_Q1NETiBA5bCPeuKZgA==,size_20,color_FFFFFF,t_70,g_se,x_16
watermark,type_d3F5LXplbmhlaQ,shadow_50,text_Q1NETiBA5bCPeuKZgA==,size_20,color_FFFFFF,t_70,g_se,x_16
watermark,type_d3F5LXplbmhlaQ,shadow_50,text_Q1NETiBA5bCPeuKZgA==,size_20,color_FFFFFF,t_70,g_se,x_16

可以看到新增的监听器成功监听到了来自自定义信道的消息,并且接收到的消息也成功转成了自定义消息体。

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2024-03-10,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 二、构建父子工程
  • 三、使用SpringCloudStream默认的信道实现消息传递
    • 3.1 构建生产者服务
      • 3.2 构建消费者服务
      • 四、使用自定义信道(和发送消息体)实现消息传递
        • 4.1 构建公共模块
          • 4.2 构建生产者服务
            • 4.3 构建消费者服务
            相关产品与服务
            短信
            腾讯云短信(Short Message Service,SMS)可为广大企业级用户提供稳定可靠,安全合规的短信触达服务。用户可快速接入,调用 API / SDK 或者通过控制台即可发送,支持发送验证码、通知类短信和营销短信。国内验证短信秒级触达,99%到达率;国际/港澳台短信覆盖全球200+国家/地区,全球多服务站点,稳定可靠。
            领券
            问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档