本文主要研究一下rocketmq的SlaveSynchronize
rocketmq-all-4.6.0-source-release/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
public class BrokerController {
//......
private void handleSlaveSynchronize(BrokerRole role) {
if (role == BrokerRole.SLAVE) {
if (null != slaveSyncFuture) {
slaveSyncFuture.cancel(false);
}
this.slaveSynchronize.setMasterAddr(null);
slaveSyncFuture = this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
BrokerController.this.slaveSynchronize.syncAll();
}
catch (Throwable e) {
log.error("ScheduledTask SlaveSynchronize syncAll error.", e);
}
}
}, 1000 * 3, 1000 * 10, TimeUnit.MILLISECONDS);
} else {
//handle the slave synchronise
if (null != slaveSyncFuture) {
slaveSyncFuture.cancel(false);
}
this.slaveSynchronize.setMasterAddr(null);
}
}
//......
}
rocketmq-all-4.6.0-source-release/broker/src/main/java/org/apache/rocketmq/broker/slave/SlaveSynchronize.java
public class SlaveSynchronize {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
private final BrokerController brokerController;
private volatile String masterAddr = null;
public SlaveSynchronize(BrokerController brokerController) {
this.brokerController = brokerController;
}
public String getMasterAddr() {
return masterAddr;
}
public void setMasterAddr(String masterAddr) {
this.masterAddr = masterAddr;
}
public void syncAll() {
this.syncTopicConfig();
this.syncConsumerOffset();
this.syncDelayOffset();
this.syncSubscriptionGroupConfig();
}
//......
}
rocketmq-all-4.6.0-source-release/broker/src/main/java/org/apache/rocketmq/broker/slave/SlaveSynchronize.java
public class SlaveSynchronize {
//......
private void syncTopicConfig() {
String masterAddrBak = this.masterAddr;
if (masterAddrBak != null && !masterAddrBak.equals(brokerController.getBrokerAddr())) {
try {
TopicConfigSerializeWrapper topicWrapper =
this.brokerController.getBrokerOuterAPI().getAllTopicConfig(masterAddrBak);
if (!this.brokerController.getTopicConfigManager().getDataVersion()
.equals(topicWrapper.getDataVersion())) {
this.brokerController.getTopicConfigManager().getDataVersion()
.assignNewOne(topicWrapper.getDataVersion());
this.brokerController.getTopicConfigManager().getTopicConfigTable().clear();
this.brokerController.getTopicConfigManager().getTopicConfigTable()
.putAll(topicWrapper.getTopicConfigTable());
this.brokerController.getTopicConfigManager().persist();
log.info("Update slave topic config from master, {}", masterAddrBak);
}
} catch (Exception e) {
log.error("SyncTopicConfig Exception, {}", masterAddrBak, e);
}
}
}
//......
}
rocketmq-all-4.6.0-source-release/broker/src/main/java/org/apache/rocketmq/broker/slave/SlaveSynchronize.java
public class SlaveSynchronize {
//......
private void syncConsumerOffset() {
String masterAddrBak = this.masterAddr;
if (masterAddrBak != null && !masterAddrBak.equals(brokerController.getBrokerAddr())) {
try {
ConsumerOffsetSerializeWrapper offsetWrapper =
this.brokerController.getBrokerOuterAPI().getAllConsumerOffset(masterAddrBak);
this.brokerController.getConsumerOffsetManager().getOffsetTable()
.putAll(offsetWrapper.getOffsetTable());
this.brokerController.getConsumerOffsetManager().persist();
log.info("Update slave consumer offset from master, {}", masterAddrBak);
} catch (Exception e) {
log.error("SyncConsumerOffset Exception, {}", masterAddrBak, e);
}
}
}
//......
}
rocketmq-all-4.6.0-source-release/broker/src/main/java/org/apache/rocketmq/broker/slave/SlaveSynchronize.java
public class SlaveSynchronize {
//......
private void syncDelayOffset() {
String masterAddrBak = this.masterAddr;
if (masterAddrBak != null && !masterAddrBak.equals(brokerController.getBrokerAddr())) {
try {
String delayOffset =
this.brokerController.getBrokerOuterAPI().getAllDelayOffset(masterAddrBak);
if (delayOffset != null) {
String fileName =
StorePathConfigHelper.getDelayOffsetStorePath(this.brokerController
.getMessageStoreConfig().getStorePathRootDir());
try {
MixAll.string2File(delayOffset, fileName);
} catch (IOException e) {
log.error("Persist file Exception, {}", fileName, e);
}
}
log.info("Update slave delay offset from master, {}", masterAddrBak);
} catch (Exception e) {
log.error("SyncDelayOffset Exception, {}", masterAddrBak, e);
}
}
}
//......
}
rocketmq-all-4.6.0-source-release/broker/src/main/java/org/apache/rocketmq/broker/slave/SlaveSynchronize.java
public class SlaveSynchronize {
//......
private void syncSubscriptionGroupConfig() {
String masterAddrBak = this.masterAddr;
if (masterAddrBak != null && !masterAddrBak.equals(brokerController.getBrokerAddr())) {
try {
SubscriptionGroupWrapper subscriptionWrapper =
this.brokerController.getBrokerOuterAPI()
.getAllSubscriptionGroupConfig(masterAddrBak);
if (!this.brokerController.getSubscriptionGroupManager().getDataVersion()
.equals(subscriptionWrapper.getDataVersion())) {
SubscriptionGroupManager subscriptionGroupManager =
this.brokerController.getSubscriptionGroupManager();
subscriptionGroupManager.getDataVersion().assignNewOne(
subscriptionWrapper.getDataVersion());
subscriptionGroupManager.getSubscriptionGroupTable().clear();
subscriptionGroupManager.getSubscriptionGroupTable().putAll(
subscriptionWrapper.getSubscriptionGroupTable());
subscriptionGroupManager.persist();
log.info("Update slave Subscription Group from master, {}", masterAddrBak);
}
} catch (Exception e) {
log.error("SyncSubscriptionGroup Exception, {}", masterAddrBak, e);
}
}
}
//......
}
BrokerController有个handleSlaveSynchronize方法,在role为BrokerRole.SLAVE的时候,会注册一个定时任务,每隔10秒钟执行一次BrokerController.this.slaveSynchronize.syncAll();SlaveSynchronize的syncAll方法分别调用了syncTopicConfig、syncConsumerOffset、syncDelayOffset、syncSubscriptionGroupConfig方法
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。