在Seata中,服务端启动之前,会首先将相关的处理器进行初始化。在客户端也可以看到将相关处理器进行初始化的过程。而这个过程在其它的中间件中,依然受用。比如RocketMQ的服务端和客户端启动之前,也会将相关处理器进行初始化,注册处理器。
我们知道如果需要处理器,必须需要消息类型、处理器、线程池三个参数。因此我们先来了解服务端中的相关处理器信息。
registerProcessor(int messageType, RemotingProcessor processor, ExecutorService executor)
请求消息类型
响应消息类型
注册RM消息处理器的消息类型
注册TM消息处理器的消息类型
注册心跳消息处理器的消息类型
具体如下图所示:
请求消息处理器
响应消息处理器
rm消息处理器
tm消息处理器
心跳消息处理器
具体如下图所示:
ThreadPoolExecutor messageExecutor
其核心是注册到processorTable中,也即this.processorTable.put(requestCode, pair)。
由于服务请求是基于事件轮询实现的,因此在请求之后,会根据对应的请求code,请求的处理的核心方法pair.getFirst().process(ctx, rpcMessage)。在这个方法之后,会根据对应的请求code,路由到对应的处理器中,进行对应的业务逻辑处理。
同理,我们也可以基于上面的思路,梳理出客户端对应的处理器和对应的请求code。
主要是分支提交处理器、分支回滚处理器、undolog处理器、tc响应处理器、客户端心跳处理器。
分为三大类,rm分支相关的、消息响应相关的、心跳处理相关的。
通过上面的梳理可以看到服务端和客户端的交互,是通过相关的状态码,也即请求code来完成两者在Netty中交互的流转的。因此我们可以看到两个的交互在Netty的Pipeline中经过事件,流转到处理器,然后完成后续的操作。当然对于上面的数据量大的问题,seata专门使用了压缩处理工具来实现压缩,还对数据量大的数据在做删除时,做了分批处理等等。
参考资料:
https://github.com/apache/incubator-seata