在分析TC各模块之前,首先再回顾下seata的整个执行流程:
为什么TC是seata核心呢?因为TC这个角色就好像上帝一样,协调控制TM、RM协同工作,TC一旦不好使,那么RM和TM就会出现问题,那必定会乱的一塌糊涂。
那么一个优秀的事务协调者应该具备哪些能力呢?
TC整体设计如上,各模块说明如下:
Discovery模块就是服务发现模块,TC启动后需要将自己的信息注册到服务中心,这样才能暴露给其他使用者,Discovery接口定义如下:
public interface RegistryService<T> {
void register(InetSocketAddress address) throws Exception;
void unregister(InetSocketAddress address) throws Exception;
void subscribe(String cluster, T listener) throws Exception;
void unsubscribe(String cluster, T listener) throws Exception;
List<InetSocketAddress> lookup(String key) throws Exception;
void close() throws Exception;
}
上述方法看定义就能知道其作用,因此不在赘述。Discovery模块有多个实现类,如下图所示:
我们知道,服务注册到服务中心,一般需要与服务中心进行心跳保活,否则服务中心会将该服务信息给清除,一般服务中心的client jar包会集成对应的心跳能力。但是针对redis来说,该如何注册呢,下面就以redis作为示例来分析服务注册流程,对应类 RedisRegistryServiceImpl。
从源码来看,seata使用redis注册是使用的是hash字典结构,那么它怎么心跳的呢?准确来说,seata注册redis是没有心跳的,只使用到了redis channel作为通知机制来保证tc实例变化时的通知上下线能力。注意 redis channel只有在更改时的通知能力,因此tm/rm在启动时需要先从redis获取数据之后,然后再设置channel监听,seata对应逻辑在方法io.seata.discovery.registry.redis.RedisRegistryServiceImpl#subscribe
中,严格来讲,在获取数据和设置channel监听之间,如果数据发生了变更,是存在更新丢失问题的,不过这种问题触发概率极地可以忽略,并且后续有更新时还可以再次获取得到新的数据。
@Override
public void register(InetSocketAddress address) {
NetUtil.validAddress(address);
String serverAddr = NetUtil.toStringAddress(address);
try (Jedis jedis = jedisPool.getResource()) {
jedis.hset(getRedisRegistryKey(), serverAddr, ManagementFactory.getRuntimeMXBean().getName());
jedis.publish(getRedisRegistryKey(), serverAddr + "-" + RedisListener.REGISTER);
}
}
@Override
public void unregister(InetSocketAddress address) {
NetUtil.validAddress(address);
String serverAddr = NetUtil.toStringAddress(address);
try (Jedis jedis = jedisPool.getResource()) {
jedis.hdel(getRedisRegistryKey(), serverAddr);
jedis.publish(getRedisRegistryKey(), serverAddr + "-" + RedisListener.UN_REGISTER);
}
}
由于没有心跳能力,那就就需要在tc进程关闭(JVM关闭)时执行unregister逻辑,因此会添加对应的ShutdownHook
钩子函数,其能保证在程序正常退出、System.out、Ctrl+C中断结束、系统关闭、OOM宕机、kill pid进程时被执行到,但是如果是执行kill -9 pid
这种,是没有执行到钩子函数的。register是在Netty启动后进行注册的,对应的逻辑在方法io.seata.core.rpc.netty.NettyServerBootstrap#start
中,这里不在赘述。
配置模块是seata的基础模块,比如netty线程配置、session配置等,这些配置seata基本上都有默认配置。seata有一个关于配置的类Configuration:
public interface Configuration {
int getInt(String dataId, int defaultValue);
String getConfig(String dataId, long timeoutMills);
boolean putConfig(String dataId, String content);
boolean removeConfig(String dataId, long timeoutMills);
boolean removeConfig(String dataId);
void addConfigListener(String dataId, ConfigurationChangeListener listener);
void removeConfigListener(String dataId, ConfigurationChangeListener listener);
}
目前seata适配了多种配置中心,如下:
在Seata中需要配置registry.conf,来配置config.type :
config {
# file、nacos 、apollo、zk、consul、etcd3
type = "file"
nacos {
serverAddr = "127.0.0.1:8848"
namespace = ""
group = "SEATA_GROUP"
username = ""
password = ""
dataId = "seataServer.properties"
}
consul {
serverAddr = "127.0.0.1:8500"
}
apollo {
appId = "seata-server"
apolloMeta = "http://192.168.1.204:8801"
namespace = "application"
apolloAccesskeySecret = ""
}
zk {
serverAddr = "127.0.0.1:2181"
sessionTimeout = 6000
connectTimeout = 2000
username = ""
password = ""
}
etcd3 {
serverAddr = "http://localhost:2379"
}
file {
name = "file.conf"
}
}
Store模块为seata的存储模块,主要存储session数据,存储类为TransactionStoreManager
,其主要提供了读写session接口:
public interface TransactionStoreManager {
boolean writeSession(LogOperation logOperation, SessionStorable session);
GlobalSession readSession(String xid);
GlobalSession readSession(String xid, boolean withBranchSessions);
List<GlobalSession> readSession(SessionCondition sessionCondition);
void shutdown();
enum LogOperation {
/**
* Global add log operation.
*/
GLOBAL_ADD((byte)1),
/**
* Global update log operation.
*/
GLOBAL_UPDATE((byte)2),
/**
* Global remove log operation.
*/
GLOBAL_REMOVE((byte)3),
/**
* Branch add log operation.
*/
BRANCH_ADD((byte)4),
/**
* Branch update log operation.
*/
BRANCH_UPDATE((byte)5),
/**
* Branch remove log operation.
*/
BRANCH_REMOVE((byte)6);
private byte code;
}
}
存储模块目前支持file/db/redis存储,如果需要保证TC可用性建议将数据存储到DB中。
写入file文件流程如下:
public boolean writeSession(LogOperation logOperation, SessionStorable session) {
writeSessionLock.lock();
long curFileTrxNum;
try {
if (!writeDataFile(new TransactionWriteStore(session, logOperation).encode())) {
return false;
}
lastModifiedTime = System.currentTimeMillis();
curFileTrxNum = FILE_TRX_NUM.incrementAndGet();
if (curFileTrxNum % PER_FILE_BLOCK_SIZE == 0
&& (System.currentTimeMillis() - trxStartTimeMills) > MAX_TRX_TIMEOUT_MILLS) {
return saveHistory();
}
} catch (Exception exx) {
return false;
} finally {
writeSessionLock.unlock();
}
flushDisk(curFileTrxNum, currFileChannel);
return true;
}
注意这里的刷盘模式默认为异步模式,为了数据安全性的话可以设置为同步刷盘,避免系统断电写入pagecache中未刷盘的数据丢失。
大家知道数据库实现隔离级别主要是通过锁来实现的,同样的在分布式事务框架Seata中要实现隔离级别也需要通过锁。一般在数据库中数据库的隔离级别一共有四种:读未提交,读已提交,可重复读,串行化。在Seata中可以保证隔离级别是读已提交,但是提供了达到读已提交隔离的手段。
Lock模块也就是Seata实现隔离级别的核心模块。在Lock模块中提供了一个接口用于管理我们的锁:
public interface Locker {
boolean acquireLock(List<RowLock> rowLock) ;
boolean releaseLock(List<RowLock> rowLock);
boolean releaseLock(String xid, Long branchId);
boolean releaseLock(String xid, List<Long> branchIds);
boolean isLockable(List<RowLock> rowLock);
void cleanAllLocks();
}
其中有三个方法:
public class FileLocker extends AbstractLocker {
private static final int BUCKET_PER_TABLE = 128;
private static final ConcurrentMap<String/* resourceId */, ConcurrentMap<String/* tableName */,
ConcurrentMap<Integer/* bucketId */, BucketLockMap>>>
LOCK_MAP = new ConcurrentHashMap<>();
在本地锁的实现中有两个常量需要关注:
可以看见实际上的加锁在bucketLockMap这个map中,这里具体的加锁方法比较简单就不作详细阐述,主要是逐步的找到bucketLockMap,然后将当前trascationId塞进去,如果这个主键当前有TranscationId,那么比较是否是自己,如果不是则加锁失败。
seata RPC通信基层基于netty来保证高性能,采用默认的配置netty线程池模型处理流程如下:
如果采用默认的基本配置那么会有一个Acceptor线程用于处理客户端的链接,会有cpu*2数量的NIO-Thread,在这个线程中不会做业务太重的事情,只会做一些速度比较快的事情,比如编解码,心跳事件,和TM注册。一些比较费时间的业务操作将会交给业务线程池,默认情况下业务线程池配置为最小线程为100,最大为500。
seata心跳是通过netty的IdleStateHandler来完成的,在Sever端对于写没有设置最大空闲时间,对于读设置了最大空闲时间,默认为15s(客户端默认写空闲为5s,发送ping消息),如果超过15s则会将链接断开,关闭资源。
在TC server侧的netty处理流程中,接收到数据首先进行解码,按照seata定义的固定协议格式进行,会将数据解码成 RpcMessage 消息,代码如下:
public class RpcMessage {
private int id;
private byte messageType;
private byte codec;
private byte compressor;
private Map<String, String> headMap = new HashMap<>();
private Object body;
后续seata的各种处理器的处理流程都是基于 RpcMessage 消息来的。
Coordinator Core的实现为DefaultCoordinator,其实TC中重要的协调类,负责分布式事务生命周期中的各种操作管理工作,其初始化代码如下:
// main方法中
DefaultCoordinator coordinator = new DefaultCoordinator(nettyRemotingServer);
coordinator.init();
nettyRemotingServer.setHandler(coordinator);
// 初始化方法
public void init() {
retryRollbacking.scheduleAtFixedRate(() -> {
// xxx
}, 0, ROLLBACKING_RETRY_PERIOD, TimeUnit.MILLISECONDS);
retryCommitting.scheduleAtFixedRate(() -> {
// xxx
}, 0, COMMITTING_RETRY_PERIOD, TimeUnit.MILLISECONDS);
asyncCommitting.scheduleAtFixedRate(() -> {
// xxx
}, 0, ASYNC_COMMITTING_RETRY_PERIOD, TimeUnit.MILLISECONDS);
timeoutCheck.scheduleAtFixedRate(() -> {
// xxx
}, 0, TIMEOUT_RETRY_PERIOD, TimeUnit.MILLISECONDS);
undoLogDelete.scheduleAtFixedRate(() -> {
// xxx
}, UNDO_LOG_DELAY_DELETE_PERIOD, UNDO_LOG_DELETE_PERIOD, TimeUnit.MILLISECONDS);
}
其内部会初始化几种线程池来驱动分布式事务操作的进行,比如undoLog清理、commit/rollback重试等等。关于Coordinator这块业务逻辑较多,后续会专门写这块内容,本文就不在赘述了。
参考资料: