下载地址:https://rocketmq.apache.org/release-notes/
rocketmq-all-5.0.0-bin-release.zip
下载后上传到服务器;
解压命令# unzip rocketmq-all-5.0.0-bin-release.zip
RocketMQ默认配置是比较好的,这样可以直接应用于生产环境,所以如果机器内存较小,启动会因为内存不足失败,为了避免后面启动失败,选择先修改其内存大小,一般阿里云服务器是满足不了默认内存。
手动调整JVM的配置,单位从g改为m
-server -Xms256m -Xmx256m -Xmn128m
-server -Xms256m -Xmx256m -Xmn128m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m
解压目录执行# nohup ./bin/mqnamesrv -n 1.117.75.57(自己的ip):9876 &
添加namesrvAddr 和 brokerIP1:
解压目录执行# nohup ./bin/mqbroker -n 1.117.75.57:9876 -c ./conf/broker.conf &
jps
由于服务器内存可能比较小,建议先关闭其他应用,比如rabbitmq,docker的容器等;
还需要开启几个端口:9876,10909,10910,10911;
导出环境变量# export NAMESRV_ADDR=1.117.75.57:9876
发送消息# bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
导出环境变量# export NAMESRV_ADDR=1.117.75.57:9876
消费消息# bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
关闭nameserver# bin/mqshutdown namesrv
关闭broker# bin/mqshutdown broker
localhost:9696
整体可以分成4个角色,分别是:NameServer,Broker,Producer,Consumer:
<!-- rocket -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.0.2</version>
</dependency>
# rocketmq配置
rocketmq:
#rocketMQ服务的地址
name-server: 1.117.75.57:9876
# 生产者组
producer:
group: kh96-sendsms-group
@Autowired(required = false)
private RocketMQTemplate rocketMQTemplate;
/**
* @param : [phoneNo]
* @return : java.lang.String
* @author : huayu
* @date : 30/11/2022
* @description : 测试发送消息到用户中心,用户中心给手机号发信息
*/
@RequestMapping("/testRocketMQSendMsg")
public String testRocketMQSendMsg(@RequestParam String phoneNo) {
log.info("------ 使用RocketMQ,测试给手机:{},发送消息 -------", phoneNo);
//使用RocketMQ发送消息
rocketMQTemplate.convertAndSend("rocketmq-send-sms-kh96", phoneNo);
return "send msg success";
}
1.添加加rocketmq的依赖;
2.用户服务,监听发送短信的请求发送消息:
/**
* Created On : 30/11/2022.
* <p>
* Author : huayu
* <p>
* Description: 用户服务,监听发送短信的请求发送消息
*/
@Slf4j
@Component
@RocketMQMessageListener(consumerGroup = "rocketmq-user-sms-group", //组 随便写
topic = "rocketmq-send-sms-kh96" //消息队列,发送的时候指定的
)
public class SendSmsListener implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
log.info("***** 接收发送信息的请求,给手机:{},发送消息 ******", message);
}
}
/**
* @param : [phoneNo]
* @return : java.lang.String
* @author : huayu
* @date : 30/11/2022
* @description : 测试roketmq-发送可靠同步消息
*/
@RequestMapping("/testRocketMQSendMsgSync")
public String testRocketMQSendMsgSync(@RequestParam String syncMsg) {
log.info("------ 使用RocketMQ,发送可靠同步消息{} -------", syncMsg);
//使用RocketMQ发送消息,拿到同步结果
SendResult sendResult = rocketMQTemplate.syncSend("rocketmq-sync-msg-kh96", syncMsg);
log.info("------ 使用RocketMQ,发送可靠同步消息结果:{} -------", sendResult);
return "send sync msg success";
}
/**
* @param : [phoneNo]
* @return : java.lang.String
* @author : huayu
* @date : 30/11/2022
* @description : 测试roketmq-发送可靠异步消息
*/
@RequestMapping("/testRocketMQSendMsgAsync")
public String testRocketMQSendMsgAsync(@RequestParam String asyncMsg) {
log.info("------ 使用RocketMQ,发送可靠异步消息:{} -------", asyncMsg);
//使用RocketMQ发送消息
rocketMQTemplate.asyncSend("rocketmq-sync-msg-kh96",
asyncMsg,
new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
log.info("------ 可靠异步发送成功回调 ------");
}
@Override
public void onException(Throwable throwable) {
log.info("------ 可靠异步发送失败回调 ------");
}
});
return "send async msg success";
}
/**
* @param : [phoneNo]
* @return : java.lang.String
* @author : huayu
* @date : 30/11/2022
* @description : 测试roketmq-发送单向消息,只发不收结果
*/
@RequestMapping("/testRocketMQSendMsgOneWay")
public String testRocketMQSendMsgOneWay(@RequestParam String oneWayMsg) {
log.info("------ 使用RocketMQ,发送单向消息给:{} -------", oneWayMsg);
//使用RocketMQ发送消息
rocketMQTemplate.sendOneWay("rocketmq-oneWay-msg-kh96", oneWayMsg);
return "send oneWay msg success";
}
/**
* @param : [phoneNo]
* @return : java.lang.String
* @author : huayu
* @date : 30/11/2022
* @description : 测试roketmq-发送顺序消息
*/
@RequestMapping("/testRocketMQSendMsgOrderly")
public String testRocketMQSendMsgOrderly(@RequestParam String orderlyMsgs) {
log.info("------ 使用RocketMQ,发送顺序消息:{} -------", orderlyMsgs);
//使用RocketMQ发送顺序消息,必须要提供一个唯一的标识di,比如用户编号等
String userId = UUID.randomUUID().toString().replace("-", "");
//发送多条顺序消息,模拟iang消息分割成多个符号发送
Arrays.asList(orderlyMsgs.split("")).
forEach(orderlyMsg -> rocketMQTemplate.syncSendOrderly("rocketmq-orderly-msg-kh96", orderlyMsg, userId));
return "send orderly msg success";
}
/**
* Created On : 30/11/2022.
* <p>
* Author : huayu
* <p>
* Description: RocketMQ 监听顺序消息
*/
@Slf4j
@Component
@RocketMQMessageListener(consumerGroup = "rocketmq-orderly-sms-group",
topic = "rocketmq-orderly-msg-kh96",
consumeMode = ConsumeMode.ORDERLY)
public class RocketMQOrderlyMsgListener implements RocketMQListener<String> {
@Override
public void onMessage(String orderlyMsg) {
log.info("------接收顺序消息 :{} ------", orderlyMsg);
}
}
参考博客1:https://blog.csdn.net/Weixiaohuai/article/details/123733518
参考博客2:https://blog.csdn.net/qq_42877546/article/details/125404307
/**
* Created On : 30/11/2022.
* <p>
* Author : huayu
* <p>
* Description: 事务消息的业务 接口
*/
public interface RocketMQTxService {
/**
* @param : [kgcMallOrder]
* @return : void
* @author : huayu
* @date : 30/11/2022
* @description : 发送生成订单的半事务消息
*/
void sendCreateOrderHalfTx(KgcMallOrder kgcMallOrder);
/**
* @param : [txId, kgcMallOrder]
* @return : void
* @author : huayu
* @date : 30/11/2022
* @description : 执行本地生成订单的事务操作
*/
void executeCreateOrderLocalTx(String txId, KgcMallOrder kgcMallOrder);
}
/**
* Created On : 30/11/2022.
* <p>
* Author : huayu
* <p>
* Description: 事务消息的业务 处理类
*/
@Service
@Slf4j
public class RocketMQTxServiceImpl implements RocketMQTxService {
@Autowired(required = false)
private RocketMQTemplate rocketMQTemplate;
@Autowired
private KgcMallOrderRepository kgcMallOrderRepository;
@Autowired
private KgcMallTxlogrepisitory kgcMallTxlogrepisitory;
@Override
@Transactional
public void sendCreateOrderHalfTx(KgcMallOrder kgcMallOrder) {
log.info("###### 1. 开始发送生成订单的半事务消息 到 rocketmq服务端 ######");
//自定义事务编号
String txId = UUID.randomUUID().toString().substring(0, 8);
//发送半事务消息,返回发送结果
TransactionSendResult transactionSendResult =
rocketMQTemplate.sendMessageInTransaction("rocketmq-tx-msg-group", //组
"rocketmq-tx-msg-kh96", //队列
MessageBuilder.withPayload(kgcMallOrder).setHeader("txId", txId).build(), // 消息体
kgcMallOrder); //发送内容
log.info("###### 2. 开始发送生成订单的半事务消息rocketmq服务端成功,响应:{} ######", transactionSendResult);
}
@Override
@Transactional
public void executeCreateOrderLocalTx(String txId, KgcMallOrder kgcMallOrder) {
log.info("###### 3.1 本地开始执行生成订单的事务操作 ######");
//开始插入订单
kgcMallOrderRepository.save(kgcMallOrder);
log.info("###### 3.2 本地执行生成订单的事务操作 成功 ######");
// 模拟本地事务处理失败
// int a = 10 / 0;
log.info("###### 3.3开始生成用于事务回查的本地事务日志 ######");
//创建事务对象
KgcMallTxlog kgcMallTxlog = KgcMallTxlog.builder()
.id(txId)
.txDetail("本地事务日志")
.txTime(new Date())
.build();
//事务日志入库
kgcMallTxlogrepisitory.save(kgcMallTxlog);
log.info("###### 3.4 生成用于事务回查的本地事务日志成功 ######");
}
}
/**
* Created On : 30/11/2022.
* <p>
* Author : huayu
* <p>
* Description: 事务消息,本地执行事务监听,半事务消息发送成功后,此监听会收到本地事务处理的通知
*/
@Slf4j
@Component
@RocketMQTransactionListener(txProducerGroup = "rocketmq-tx-msg-group")
public class RocketMQExecuteLocalTxListener implements RocketMQLocalTransactionListener {
@Autowired
private RocketMQTxService rocketMQTxService;
@Autowired
private KgcMallTxlogrepisitory kgcMallTxlogrepisitory;
//执行本地事务
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
try {
//调用本地事务执行的业务处理接口
log.info("###### 3 半事务消息发送成功, 执行本地事务 ######");
rocketMQTxService.executeCreateOrderLocalTx((String) msg.getHeaders().get("txId"), (KgcMallOrder) arg);
//响应本地事务执行成功结果给服务端,服务端接收到此提交结果,会投递消息
log.info("###### 4 本地事务处理成功,响应事务处理结果给服务端 ######");
return RocketMQLocalTransactionState.COMMIT;
} catch (Exception e) {
log.error("###### 本地事务执行异常:{} ######", e.getMessage());
}
//响应本地事务执行失败结果给服务端,服务端接收到此回滚结果,不会投递消息(缓存,并定期删除)
log.info("###### 4 本地事务处理失败,响应事务处理结果给服务端 #######");
return RocketMQLocalTransactionState.ROLLBACK;
}
//回查本地事务
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
log.info("###### 5 未收到第4步本地事务处理结果,回查事务状态 ######");
//在网络闪断或者服务重启,没有及时通知服务断事务处理结果,进行会查操作
//如果回查本地事务执行成功(看事物日志是否存在,如果存在就是处理成功如果不存在就是处理失败),通知服务端投递消息,否则不能投递
log.info("###### 6 检查本地事务处理结果,响应事务处理结果给服务端 ######");
if (kgcMallTxlogrepisitory.findById((String) msg.getHeaders().get("txId")).orElse(null) == null) {
//本地事务入库失败,代表本地事务没有处理成功,步投递消息(缓存,并定期删除)
log.info("###### 7 检查本地事务处理结果失败 ######");
return RocketMQLocalTransactionState.ROLLBACK;
}
//本地事务入库成功,代表本地事务处理成功,投递消息
log.info("###### 7 检查本地事务处理结果成功 ######");
return RocketMQLocalTransactionState.COMMIT;
}
}
/**
* Created On : 30/11/2022.
* <p>
* Author : huayu
* <p>
* Description: 事务消息,消费监听,如果本地事务处理成功,会收到投递的消息,如果失败,收不到消息
*/
@Slf4j
@Component
@RocketMQMessageListener(consumerGroup = "rocket-tx-msg-consumer-group",
topic = "rocket-tx-msg-kh96"
)
public class RocketMQConsumerTxMsgListener implements RocketMQListener<Object> {
@Override
public void onMessage(Object message) {
log.info("###### 8 消费端,收到生成订单成功的事务消息:{} ###### ", message);
}
}
/**
* Created On : 30/11/2022.
* <p>
* Author : huayu
* <p>
* Description: RocketMQ 消息队列 测试消息入口
*/
@Slf4j
@RestController
public class RocketMQController {
@Autowired(required = false)
private RocketMQTemplate rocketMQTemplate;
@Autowired
private RocketMQTxService rocketMQTxService;
/**
* @param : [phoneNo]
* @return : java.lang.String
* @author : huayu
* @date : 30/11/2022
* @description : 测试发送消息到用户中心,用户中心给手机号发信息
*/
@RequestMapping("/testRocketMQSendMsg")
public String testRocketMQSendMsg(@RequestParam String phoneNo) {
log.info("------ 使用RocketMQ,测试给手机:{},发送消息 -------", phoneNo);
//使用RocketMQ发送消息
rocketMQTemplate.convertAndSend("rocketmq-send-sms-kh96", phoneNo);
return "send msg success";
}
/**
* @param : [phoneNo]
* @return : java.lang.String
* @author : huayu
* @date : 30/11/2022
* @description : 测试roketmq-发送可靠同步消息
*/
@RequestMapping("/testRocketMQSendMsgSync")
public String testRocketMQSendMsgSync(@RequestParam String syncMsg) {
log.info("------ 使用RocketMQ,发送可靠同步消息{} -------", syncMsg);
//使用RocketMQ发送消息,拿到同步结果
SendResult sendResult = rocketMQTemplate.syncSend("rocketmq-sync-msg-kh96", syncMsg);
log.info("------ 使用RocketMQ,发送可靠同步消息结果:{} -------", sendResult);
return "send sync msg success";
}
/**
* @param : [phoneNo]
* @return : java.lang.String
* @author : huayu
* @date : 30/11/2022
* @description : 测试roketmq-发送可靠异步消息
*/
@RequestMapping("/testRocketMQSendMsgAsync")
public String testRocketMQSendMsgAsync(@RequestParam String asyncMsg) {
log.info("------ 使用RocketMQ,发送可靠异步消息:{} -------", asyncMsg);
//使用RocketMQ发送消息
rocketMQTemplate.asyncSend("rocketmq-sync-msg-kh96",
asyncMsg,
new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
log.info("------ 可靠异步发送成功回调 ------");
}
@Override
public void onException(Throwable throwable) {
log.info("------ 可靠异步发送失败回调 ------");
}
});
return "send async msg success";
}
/**
* @param : [phoneNo]
* @return : java.lang.String
* @author : huayu
* @date : 30/11/2022
* @description : 测试roketmq-发送单向消息,只发不收结果
*/
@RequestMapping("/testRocketMQSendMsgOneWay")
public String testRocketMQSendMsgOneWay(@RequestParam String oneWayMsg) {
log.info("------ 使用RocketMQ,发送单向消息:{} -------", oneWayMsg);
//使用RocketMQ发送消息
rocketMQTemplate.sendOneWay("rocketmq-oneWay-msg-kh96", oneWayMsg);
return "send oneWay msg success";
}
/**
* @param : [phoneNo]
* @return : java.lang.String
* @author : huayu
* @date : 30/11/2022
* @description : 测试roketmq-发送顺序消息
*/
@RequestMapping("/testRocketMQSendMsgOrderly")
public String testRocketMQSendMsgOrderly(@RequestParam String orderlyMsgs) {
log.info("------ 使用RocketMQ,发送顺序消息:{} -------", orderlyMsgs);
//使用RocketMQ发送顺序消息,必须要提供一个唯一的标识di,比如用户编号等
String userId = UUID.randomUUID().toString().replace("-", "");
//发送多条顺序消息,模拟iang消息分割成多个符号发送
Arrays.asList(orderlyMsgs.split("")).
forEach(orderlyMsg -> rocketMQTemplate.syncSendOrderly("rocketmq-orderly-msg-kh96", orderlyMsg, userId));
return "send orderly msg success";
}
/**
* @param : [phoneNo]
* @return : java.lang.String
* @author : huayu
* @date : 30/11/2022
* @description : 测试roketmq-发送事务消息
*/
@RequestMapping("/testRocketMQSendMsgTx")
public String testRocketMQSendMsgTx(@RequestParam String txmsg) {
log.info("------ 使用RocketMQ,发送事务消息:{} -------", txmsg);
//使用RocketMQ发送事务消息,模拟生成一笔订单
KgcMallOrder kgcMallOrder = KgcMallOrder.builder()
.userId(2)
.userName("RocketMQ_tx")
.prodId(2)
.prodName(txmsg)
.totalPrice(96.0)
.build();
//发送事务消息
rocketMQTxService.sendCreateOrderHalfTx(kgcMallOrder);
return "send tx msg success";
}
}