文 / Kenyon,资深软件架构师,15年软件开发和技术管理经验,从程序员做到企业技术高管,专注技术管理、架构设计、AI技术应用和落地。
由于公众号推流的原因,请在关注页右上角加星标,这样才能及时收到新文章的推送。
在上一篇文章中,我们基于架构设计原则设计了RPC框架的基础架构。今天,我们将进入实战阶段,实现RPC框架的核心功能,包括服务代理、序列化、网络通信等模块。在实现过程中,我们将重点展示如何将SOLID原则、高内聚低耦合、KISS等架构设计原则应用到实际代码中。
遵循开闭原则,我们设计了Serializer接口,并提供了JSON实现:
// 序列化接口,支持扩展不同的序列化方式
public interface Serializer {
/**
* 将对象序列化为字节数组
*
* @param obj 要序列化的对象
* @param <T> 对象类型
* @return 序列化后的字节数组
* @throws Exception 序列化异常
*/
<T> byte[] serialize(T obj) throws Exception;
/**
* 将字节数组反序列化为对象
*
* @param bytes 序列化后的字节数组
* @param clazz 对象类型
* @param <T> 对象类型
* @return 反序列化后的对象
* @throws Exception 反序列化异常
*/
<T> T deserialize(byte[] bytes, Class<T> clazz) throws Exception;
/**
* 序列化类型枚举
*/
enum Type {
//目前暂时只是支持JSON,后续可以在这里添加要支持的其他序列化方式
JSON(1);
private final int code;
Type(int code) {
this.code = code;
}
public int getCode() {
return code;
}
/**
* 根据code查找对应的序列化类型
*
* @param code 序列化类型码
* @return 序列化类型
*/
public static Type findByCode(int code) {
for (Type type : Type.values()) {
if (type.code == code) {
return type;
}
}
return JSON; // 默认使用JSON
}
}
}
// JSON序列化实现
public class JsonSerializer implements Serializer {
private static final ObjectMapper objectMapper = new ObjectMapper();
@Override
public <T> byte[] serialize(T obj) throws Exception {
if (obj == null) {
return new byte[0];
}
return objectMapper.writeValueAsBytes(obj);
}
@Override
public <T> T deserialize(byte[] bytes, Class<T> clazz) throws Exception {
if (bytes == null || bytes.length == 0) {
return null;
}
return objectMapper.readValue(bytes, clazz);
}
}基于单一职责原则,我们将网络传输模块拆分为客户端和服务端:
// 网络传输客户端接口
public interface TransportClient {
void connect(InetSocketAddress address);
byte[] send(byte[] data) throws Exception;
void close();
}
// 网络传输服务端接口
public interface TransportServer {
void start(int port, RequestHandler handler);
void stop();
int getPort();
}
// 请求处理器接口
public interface RequestHandler {
byte[] handle(byte[] request);
}
// 使用Netty实现的传输客户端
public class NettyTransportClient implements TransportClient {
private static final Logger logger = LoggerFactory.getLogger(NettyTransportClient.class);
private static final int DEFAULT_CONNECT_TIMEOUT = 5000;
private Channel channel;
private EventLoopGroup group;
private ResponseHandler responseHandler;
@Override
public void connect(InetSocketAddress address) {
group = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap();
try {
bootstrap.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, DEFAULT_CONNECT_TIMEOUT)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
// 处理粘包问题
pipeline.addLast(new LengthFieldBasedFrameDecoder(65535, 0, 4, 0, 4));
pipeline.addLast(new LengthFieldPrepender(4));
// 字节数组编解码器
pipeline.addLast(new ByteArrayDecoder());
pipeline.addLast(new ByteArrayEncoder());
// 客户端处理器
NettyClientHandler clientHandler = new NettyClientHandler();
pipeline.addLast(clientHandler);
}
});
// 连接服务端
ChannelFuture future = bootstrap.connect(address).sync();
this.channel = future.channel();
// 初始化响应处理器
responseHandler = new ResponseHandler();
// 设置客户端处理器的响应处理器
((NettyClientHandler) channel.pipeline().last()).setResponseHandler(responseHandler);
} catch (Exception e) {
logger.error("Failed to connect to server: {}", address, e);
throw new RuntimeException("Failed to connect to server: " + address, e);
}
}
@Override
public byte[] send(byte[] data) throws Exception {
if (channel == null || !channel.isActive()) {
throw new IllegalStateException("Channel is not connected");
}
// 发送数据
channel.writeAndFlush(data).addListener((ChannelFutureListener) future -> {
if (!future.isSuccess()) {
Throwable cause = future.cause();
if (cause instanceof Exception) {
responseHandler.setException((Exception) cause);
} else {
responseHandler.setException(new RuntimeException(cause));
}
}
});
// 等待响应
return responseHandler.waitForResponse();
}
@Override
public void close() {
if (channel != null) {
channel.close();
}
if (group != null) {
group.shutdownGracefully();
}
}
/**
* 客户端处理器
*/
private static class NettyClientHandler extends SimpleChannelInboundHandler<byte[]> {
private static final Logger logger = LoggerFactory.getLogger(NettyClientHandler.class);
private ResponseHandler responseHandler;
@Override
protected void channelRead0(ChannelHandlerContext ctx, byte[] msg) throws Exception {
if (responseHandler != null) {
responseHandler.setResponse(msg);
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
logger.error("Exception in Netty client handler", cause);
if (responseHandler != null) {
if (cause instanceof Exception) {
responseHandler.setException((Exception) cause);
} else {
responseHandler.setException(new RuntimeException(cause));
}
}
ctx.close();
}
public void setResponseHandler(ResponseHandler responseHandler) {
this.responseHandler = responseHandler;
}
}
/**
* 响应处理器
*/
private static class ResponseHandler {
private final CountDownLatch latch = new CountDownLatch(1);
private byte[] response;
private Exception exception;
public byte[] waitForResponse() throws Exception {
if (!latch.await(30, TimeUnit.SECONDS)) {
throw new RuntimeException("Request timeout");
}
if (exception != null) {
throw exception;
}
return response;
}
public void setResponse(byte[] response) {
this.response = response;
latch.countDown();
}
public void setException(Exception exception) {
this.exception = exception;
latch.countDown();
}
}
}使用迪米特法则,代理模块只与必要的组件通信:
// RPC客户端核心类
public class ServiceProxy implements InvocationHandler {
private static final Logger logger = LoggerFactory.getLogger(ServiceProxy.class);
// 服务接口类
private final Class<?> serviceClass;
// 服务注册中心
private final RegistryCenter registryCenter;
// 负载均衡策略
private final LoadBalance loadBalance;
/**
* 构造函数
*
* @param serviceClass 服务接口类
* @param registryCenter 服务注册中心
*/
public ServiceProxy(Class<?> serviceClass, RegistryCenter registryCenter) {
this(serviceClass, registryCenter, new RandomLoadBalance());
}
/**
* 构造函数
*
* @param serviceClass 服务接口类
* @param registryCenter 服务注册中心
* @param loadBalance 负载均衡策略
*/
public ServiceProxy(Class<?> serviceClass, RegistryCenter registryCenter, LoadBalance loadBalance) {
this.serviceClass = serviceClass;
this.registryCenter = registryCenter;
this.loadBalance = loadBalance;
}
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
// 创建RPC请求的对象,把调用的服务接口类、方法名、参数类型、参数值等信息封装到请求对象中
RpcRequest request = new RpcRequest();
request.setRequestId(UUID.randomUUID().toString());
request.setServiceName(serviceClass.getName());
request.setMethodName(method.getName());
request.setParameterTypes(method.getParameterTypes());
request.setParameters(args);
logger.debug("Sending RPC request: {}, service: {}, method: {}",
request.getRequestId(), request.getServiceName(), request.getMethodName());
// 从注册中心获取服务地址列表
List<InetSocketAddress> addresses = registryCenter.discover(serviceClass.getName());
if (addresses == null || addresses.isEmpty()) {
throw new RuntimeException("No service available for: " + serviceClass.getName());
}
// 使用负载均衡策略选择服务地址
InetSocketAddress address = loadBalance.select(serviceClass.getName(), addresses);
if (address == null) {
throw new RuntimeException("No service address selected for: " + serviceClass.getName());
}
logger.debug("Selected service address: {}", address);
// 创建客户端并发送请求,这里暂时使用Netty作为网络传输组件
TransportClient client = new NettyTransportClient();
try {
// 创建序列化器,这里暂时使用JSON序列化,后续可以添加其他序列化方式,并且改成读取配置的方式来确定使用哪种序列化方式
Serializer serializer = new JsonSerializer();
// 连接到服务端
client.connect(address);
// 序列化请求
byte[] requestData = serializer.serialize(request);
// 发送请求并获取响应数据
byte[] responseData = client.send(requestData);
// 反序列化响应
RpcResponse response = serializer.deserialize(responseData, RpcResponse.class);
if (response.isSuccess()) {
return response.getResult();
} else {
throw new RuntimeException("RPC call failed: " + response.getError());
}
} finally {
client.close();
}
}
}在上面代码实现的过程中,我们分别应用了以下架构设计原则:
在实现RPC框架的过程中,我们也应用了多个通用设计原则:
因为篇幅的问题,在这篇文章就先写这么多,文章中我们实现了整个RPC框架里面最核心的组件,包括了序列化模块、网络传输模块和服务代理模块。在实现的过程中,我们重点展示了如何将架构设计原则应用到实际代码中,确保代码的可扩展性、可维护性和灵活性。
在下一篇文章中,我们将会完成这个RPC框架的剩余功能,像服务注册与发现、服务端核心的实现、客户端的负载均衡等模块,并编写相关的测试用例来进行完整的测试。同时,也会把项目的代码一起放上来给大家观摩和吐槽。
互动话题:在实现RPC框架的过程中,你认为哪个组件的设计最具挑战性?为什么?欢迎在评论区分享你的观点。
Kenyon,资深软件架构师,15年的软件开发和技术管理经验,从程序员做到企业技术高管。多年企业数字化转型和软件架构设计经验,善于帮助企业构建高质量、可维护的软件系统,目前专注技术管理、架构设计、AI技术应用和落地;全网统一名称"六边形架构",欢迎关注交流。
原创不易,转载请联系授权,如果觉得有帮助,请点赞、收藏、转发三连支持!
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。