作者:小傅哥 博客:https://bugstack.cn
❝沉淀、分享、成长,让自己和他人都能有所收获!😜 ❞
大家好,我是技术UP主小傅哥。
在我们日常开发中,有很多的同类共性功能组件,如;MQ 的有 Kafka、RabbitMQ,RPC 的有 GRpc、Dubbo。那如果我们想让服务可以平滑的从一套组件切换到另外一套,应该如何处理呢?🤔
这样的东西我也做过
在我工作的公司,近10年的发展中,Redis 的缓存服务组件陆续的变换了3、4款,目前有2套最终稳定共用的。那么我为此开发了一款缓存中间件,可以做到动态切换、读写控制、监控管理,可以非常方便的迁移和升级。
那么,在我们使用 MQ 的时候,如果在不改变系统工程代码的情况下,该怎样优雅的从一套MQ迁移到另外一套呢?今天小傅哥就带着大家来办这样一个事。
官网:https://spring.io/projects/spring-cloud-stream
Spring Cloud Stream 是一个用于构建与共享消息系统连接的高度可扩展的事件驱动微服务的框架。
该框架提供了一个灵活的编程模型,该模型建立在已建立且熟悉的 Spring 习语和最佳实践之上,包括对持久发布/订阅语义、消费者组和有状态分区的支持。
Spring Cloud Stream 支持对接的 MQ 包括:RabbitMQ、Kafka、RocketMQ、Azure Service Bus 等。
小傅哥这里搭建了一套测试 MQ 案例的六边形架构;
工程:https://github.com/fuzhengwei/xfg-dev-tech-springcloud-stream
本节的案例工程会需要用到 Kafka、RabbitMQ,所以需要安装这两套环境。
在做项目的案例前,我们可以先做下 SpringCloud Stream 对接 Kafka、RabbitMQ 的案例,有了这个基础在做整个工程的案例就更容易了。
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>Finchley.SR2</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<!-- kafka -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>
<!-- rabbit -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-rabbit</artifactId>
</dependency>
spring:
# rabbitmq:
# addresses: 192.168.1.108
# port: 5672
# username: admin
# password: admin
# listener:
# simple:
# prefetch: 10 # 每次投递n个消息,消费完在投递n个
kafka:
bootstrap-servers: 192.168.1.105:9092
producer:
# 发生错误后,消息重发的次数。
retries: 1
#当有多个消息需要被发送到同一个分区时,生产者会把它们放在同一个批次里。该参数指定了一个批次可以使用的内存大小,按照字节数计算。
batch-size: 16384
# 设置生产者内存缓冲区的大小。
buffer-memory: 33554432
acks: 1
consumer:
# 自动提交的时间间隔 在spring boot 2.X 版本中这里采用的是值的类型为Duration 需要符合特定的格式,如1S,1M,2H,5D
auto-commit-interval: 1S
# 该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理:
# latest(默认值)在偏移量无效的情况下,消费者将从最新的记录开始读取数据(在消费者启动之后生成的记录)
# earliest :在偏移量无效的情况下,消费者将从起始位置读取分区的记录
auto-offset-reset: earliest
# 是否自动提交偏移量,默认值是true,为了避免出现重复数据和数据丢失,可以把它设置为false,然后手动提交偏移量
enable-auto-commit: false
cloud:
stream:
bindings:
output:
destination: ${mq.topic.user}
input:
destination: ${mq.topic.user}
myoutput:
destination: ${mq.topic.user02}
myinput:
destination: ${mq.topic.user02}
mq:
topic:
user: xfg-topic
user02: xfg-topic-02
public interface MyProcessor {
String INPUT = "myinput";
String OUTPUT = "myoutput";
@Input(INPUT)
SubscribableChannel input();
@Output(OUTPUT)
MessageChannel output();
}
Slf4j
@RunWith(SpringRunner.class)
@SpringBootTest
public class StreamTest01 {
@Autowired
private MessageProducer producer;
@Test
public void test_publish() throws InterruptedException {
for (int i = 0; i < 2; i++) {
producer.send("mq 消息,哈喽哇!");
}
new CountDownLatch(1).await();
}
@Component
@EnableBinding(Source.class)
static class MessageProducer {
@Autowired
private Source source;
public void send(String message) {
source.output().send(MessageBuilder.withPayload(message).build());
}
}
@Component
@EnableBinding({Sink.class})
static class MessageConsumer {
@StreamListener(Sink.INPUT)
public void onMessage(String message) {
System.out.println("@测试 -> " + message);
}
}
}
@测试 -> mq 消息,哈喽哇!
@测试 -> mq 消息,哈喽哇!
@测试 -> mq 消息,哈喽哇!
@测试 -> mq 消息,哈喽哇!
@Slf4j
@RunWith(SpringRunner.class)
@SpringBootTest
public class StreamTest02 {
@Autowired
private MessageProducer producer;
@Test
public void test_publish() throws InterruptedException {
for (int i = 0; i < 2; i++) {
producer.send("mq 消息,哈喽哇!");
}
new CountDownLatch(1).await();
}
@Component
@EnableBinding(MyProcessor.class)
static class MessageProducer {
@Autowired
private MyProcessor source;
public void send(String message) {
source.output().send(MessageBuilder.withPayload(message).build());
}
}
@Component
@EnableBinding({MyProcessor.class})
static class MessageConsumer {
@StreamListener(MyProcessor.INPUT)
public void onMessage(String message) {
System.out.println("@测试 -> " + message);
}
}
}
@测试 -> mq 消息,哈喽哇!
@测试 -> mq 消息,哈喽哇!
@测试 -> mq 消息,哈喽哇!
@测试 -> mq 消息,哈喽哇!
public class UserMessageEvent extends BaseEvent<UserMessageEvent.UserMessage> {
@Value("${mq.topic.user}")
private String topic;
@Override
public EventMessage<UserMessage> buildEventMessage(UserMessage data) {
return EventMessage.<UserMessage>builder()
.id(RandomStringUtils.randomNumeric(11))
.timestamp(new Date())
.data(data)
.build();
}
@Override
public String topic() {
return topic;
}
/**
* 要推送的事件消息,聚合到当前类下。
*/
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public static class UserMessage {
private String userId;
private String userName;
private String userType;
}
}
@Slf4j
@Component
@EnableBinding(Source.class)
public class EventPublisher {
@Autowired
@Qualifier(Source.OUTPUT)
private MessageChannel messageChannel;
@Autowired
private Source source;
public void publish(String topic, BaseEvent.EventMessage<?> eventMessage) {
try {
String messageJson = JSON.toJSONString(eventMessage);
source.output().send(MessageBuilder.withPayload(messageJson).build());
log.info("发送MQ消息 topic:{} message:{}", topic, messageJson);
} catch (Exception e) {
log.error("发送MQ消息失败 topic:{} message:{}", topic, JSON.toJSONString(eventMessage), e);
throw e;
}
}
}
@Service
public class UserRepository extends UserMessageEvent implements IUserRepository {
@Resource
private EventPublisher publisher;
@Override
public void doSaveUser(UserEntity userEntity) {
// 推送消息
publisher.publish(this.topic(), this.buildEventMessage(UserMessageEvent.UserMessage.builder()
.userId(userEntity.getUserId())
.userName(userEntity.getUserName())
.userType(userEntity.getUserTypeVO().getDesc())
.build()));
}
}
@Slf4j
@Component
@EnableBinding({Sink.class})
public class MessageListener {
@StreamListener(Sink.INPUT)
public void onMessage(String message) {
log.info("接收消息:{}", message);
}
}
@Slf4j
@RunWith(SpringRunner.class)
@SpringBootTest
public class UserServiceTest {
@Resource
private IUserService userService;
@Test
public void test_register() throws InterruptedException {
while (true) {
UserEntity userEntity = new UserEntity();
userEntity.setUserId("10001");
userEntity.setUserName("小傅哥");
userEntity.setUserTypeVO(UserTypeVO.T8);
userService.register(userEntity);
Thread.sleep(1500);
}
}
}
24-12-01.13:33:53.003 [main ] INFO AppInfoParser - Kafka version : 1.0.2
24-12-01.13:33:53.003 [main ] INFO AppInfoParser - Kafka commitId : 2a121f7b1d402825
24-12-01.13:33:53.043 [main ] INFO EventPublisher - 发送MQ消息 topic:xfg-topic message:{"data":{"userId":"10001","userName":"小傅哥","userType":"架构师"},"id":"89743057693","timestamp":1733031232650}
24-12-01.13:33:54.549 [main ] INFO EventPublisher - 发送MQ消息 topic:xfg-topic message:{"data":{"userId":"10001","userName":"小傅哥","userType":"架构师"},"id":"80224746522","timestamp":1733031234546}
- END -
加入小傅哥的星球「码农会锁」,💐斩获大厂Offer!阅读500+份简历和评审,学习6个业务项目;MVC+DDD,双架构开发小型电商
、大营销(超级大课)
、OpenAI 大模型应用
、Lottery
、IM
、AI 问答助手
。7个组件项目;OpenAI 代码评审
、BCP 透视业务监控
、动态线程池
、支付SDK设计和开发
、API网关
、SpringBoot Starter
、IDEA Plugin 插件开发
。1套源码课程、1套基础教程、1到云服务器教程以及各类场景解决方案。
小傅哥有那么多课程内容,我加入后都可以学习吗?可以!
好啦,这就是小傅哥的技术列车🚌,嘎嘎实惠!🤔 几乎没有哪个大厂架构师,用这么一个普惠的价格手把手的教大家学习了。
星球「码农会锁」- 加入后从课程入口进入项目学习
星球全程手把手指导教学,遇到技术问题帮忙排查代码。已经有很多伙伴开始学起来了,还有大家交的作业笔记。有了的项目驱动学习,清晰的目标感,大家冲起来也有了更明确的方向!干干干!!!