学了netty框架以及看了一下一小部分的netty框架的源码,听说dubbo是基于netty框架的一个优秀的落地实现,所以看了一小部分dubbo的源码,感觉学习netty总要有一个方式证明自己曾经学过,所以写下这一篇小笔记,写给自己看。
https://github.com/cbeann/NettyRpcDemooo
PERSISTENT | 持久化节点 |
---|---|
PERSISTENT_SEQUENTIAL | 持久化排序节点 |
EPHEMERAL | 临时节点 |
EPHEMERAL_SEQUENTIAL | 临时排序节点 |
https://blog.csdn.net/qq_37171353/category_9328166.html
其它参考百度。。。
其它参考百度。。。
自定义redis-spring-boot-starter_CBeann的博客-CSDN博客_redis starter
其它参考百度。。。
下面只贴了部分代码和写这个小demo的想法,全部代码在 https://github.com/cbeann/NettyRpcDemooo 中
1)zk做为注册中心
2)服务提供者和消费者支持集群和轮询调用
3)服务提供者上线和下线服务消费者可以感知
4)整合自定义starter
5)没有使用序列化,使用json字符串进行接口调用
注册中心ZK的IP
注册中心ZK的端口
服务提供者的名称
服务提供者的端口(NettyServer的端口,不是SpringBoot的端口)
注册中心ZK的IP
注册中心ZK的端口
服务消费者的名称
如果要满足集群版本的服务提供者存储,zk的存储设计也应该好好的想一想,有一点可以明确的创建的节点有临时节点,这样服务消费者才能通过某种机制监听到服务提供者上下线并进行业务逻辑操作。
本文中的存储结构如图所示
服务提供者的思路其实比服务消费者简单多了,其实只需要解决把ioc容器放入到自定义的SimpleChannelInboundHandler中,然后读取json字符串获取class、方法和参数,然后在ioc容器中获取类并反射调用方法返回结果即可。
在配置文件中拿到服务提供者NettyServer中ServerBootstrap的端口,然后启动ServerBootstrap,并向zk暴露自己的服务(添加临时节点信息),服务提供者就完成了。
public NettyServer(int port) {
this.port = port;
}
public class NettyServer implements ApplicationContextAware{
private ApplicationContext applicationContext;
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext = applicationContext;
}
}
/*
socketChannel.pipeline().addLast(new RcpServerHandler(applicationContext));
*/
/** 启动Netty服务器 */
@PostConstruct
public void postConstruct() {
// 向外暴露端口,zk添加服务信息
doExport();
// 异步开启netty服务器
new Thread(
() -> {
this.bind();//ServerBootstrap.bind(ip)
})
.start();
}
部分代码如下所示:
package com.rpc.server;
import com.rpc.properties.RpcProperties;
import com.rpc.properties.RpcServerProperties;
import com.rpc.zk.ZKServer;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.util.CharsetUtil;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import javax.annotation.PostConstruct;
import java.net.InetAddress;
/**
* @author chaird
* @create 2021-02-07 2:32
*/
public class NettyServer implements ApplicationContextAware {
private Integer port;
private ApplicationContext applicationContext;
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
ServerBootstrap b = new ServerBootstrap();
public NettyServer(int port) {
this.port = port;
}
/** * 开启NettyServer的方法 */
public void bind() {
try {
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
.childHandler(
new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
// TimeClientHandler是自己定义的方法
socketChannel.pipeline().addLast(new StringDecoder(CharsetUtil.UTF_8));
socketChannel.pipeline().addLast(new StringEncoder(CharsetUtil.UTF_8));
socketChannel.pipeline().addLast(new RcpServerHandler(applicationContext));
}
});
System.out.println("服务端 Netty服务器启动成功:" + port);
// 绑定端口
ChannelFuture f = b.bind(port).sync();
// 等待服务端监听端口关闭
f.channel().closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
} finally {
// 优雅关闭,释放线程池资源
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
/** 启动Netty服务器 */
@PostConstruct
public void postConstruct() {
// 向外暴露端口
doExport();
// 异步开启netty服务器
new Thread(
() -> {
this.bind();
})
.start();
}
/** 服务暴露(其实就是把服务信息保存到Zookeeper上) */
private void doExport() {
ZKServer zkServer = applicationContext.getBean(ZKServer.class);
RpcServerProperties rpcServerProperties = applicationContext.getBean(RpcServerProperties.class);
RpcProperties rpcProperties = applicationContext.getBean(RpcProperties.class);
// providerGroupDir = /rpc/provider/myProviderName
String providerGroupDir = rpcProperties.getPath() + rpcProperties.getProviderPath();
providerGroupDir = providerGroupDir + "/" + rpcServerProperties.getProviderName();
try {
// 创建服务名目录(用于集群)
zkServer.createPathPermanent(providerGroupDir, "");
} catch (Exception e) {
e.printStackTrace();
}
try {
String providerAddress = InetAddress.getLocalHost().getHostAddress();
String providerInstance = providerAddress + ":" + rpcServerProperties.getProviderPort();
// key(path) = /rpc/provider/myProviderName/127.0.0.1:8080 value:127.0.0.1:8080
zkServer.createPathTemp(providerGroupDir + "/" + providerInstance, providerInstance);
} catch (Exception e) {
e.printStackTrace();
}
}
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext = applicationContext;
}
}
服务消费者则从构思遇到的难点进行拆解,服务消费者比服务提供者难多了!!!
客户端比如果你看过部分Netty的源码,那你比较容易下面的设计思路。没看过也没关系,反正我写的也是个demo,也很容易理解。
//Netty连接服务提供者的自定义客户端
public class NettyClient {
public NettyClient(String ip, Integer port) {
this.ip = ip;
this.port = port;
}
private String ip;
private Integer port;
EventLoopGroup group = new NioEventLoopGroup();
Bootstrap b = new Bootstrap();
//...
}
//服务提供者组(集群)
public class NettyClientGroup {
/** 下一个下标 */
private AtomicInteger index = new AtomicInteger(0);
/** 服务提供者名称 */
private String providerName;
/** 服务提供者列表 */
List<NettyClient> providerList = new ArrayList<>();
/** key:服务提供者ip:端口 value:NettyClient */
Map<String, NettyClient> providerMap = new HashMap<>();
}
//服务消费者启动器
public class NettyClientBootStarp implements ApplicationContextAware {
/**key:服务提供者组名称,服务提供者组*/
Map<String, NettyClientGroup> providers = new HashMap<>();
List<NettyClient> providerList = new ArrayList<>();
public NettyClientBootStarp() {}
}
//注册默认的watcher
zk = new ZooKeeper(url, 5000, watcher);
String listenProviderPath = path + providerPath;
// 给某节点添加watcher
zk.getChildren(listenProviderPath, true);
就是把某个上线或者下线的服务提供者从NettyClientBootStarp 中移除,但是此处我用了一个比较极端的操作,即重新链接所有的服务提供者,这样就可以添加或者删除掉变动的提供者
public class RpcServiceChangeWatcher implements Watcher, ApplicationContextAware {
private RpcProperties rpcProperties;
private NettyClientBootStarp nettyClientBootStarp;
@Override
public void process(WatchedEvent event) {
System.out.println(event);
// 实际业务
try {
nettyClientBootStarp.refreshProviders();
} catch (Exception e) {
e.printStackTrace();
}
// 重新监听
String providersPath = rpcProperties.getPath() + rpcProperties.getProviderPath();
try {
zkServer.getZk().getChildren(providersPath, true);
} catch (Exception e) {
e.printStackTrace();
}
}
}
如下所示,我在SpringBoot的客户端中的HelloController中放入StudentService(接口,非实现类),怎么就能发送数据呢????
@RestController
public class HelloController {
@RpcService("provider01") // 自定义注解,其中value为服务提供者名称,类似OpenFeign的使用
private StudentService studentService;
@GetMapping("/index/{id}")
public Object hello(@PathVariable Integer id) {
String res = studentService.getId(id);
return res;
}
答案:代理、反射、自定义注解
/**
* 该注解用于注入远程服务
*/
@Target(ElementType.FIELD) // 方法注解
@Retention(RetentionPolicy.RUNTIME) // 运行时注解
public @interface RpcService {
String value();//服务提供者名称
}
public class RcpServiceInjectBeanPostProcessor
implements InstantiationAwareBeanPostProcessor, ApplicationContextAware {
private ApplicationContext context;
@Override
public Object postProcessBeforeInitialization(Object bean, String beanName)
throws BeansException {
// 判断类里是否有@RpcService注解
Class<?> clazz = context.getType(beanName);
if (Objects.isNull(clazz)) {
return bean;
}
Field[] declaredFields = clazz.getDeclaredFields();
for (Field field : declaredFields) {
// 找出标记了InjectService注解的属性
RpcService injectService = field.getAnnotation(RpcService.class);
if (injectService == null) {
continue;
}
// 获取服务名称
String providerName = injectService.value();
// 获取接口Class
Class<?> fieldClass = field.getType();
// 获取nettyClient
NettyClientBootStarp nettyClientBootStarp = context.getBean(NettyClientBootStarp.class);
RpcFactoryProxy rpcFactoryProxy =
new RpcFactoryProxy(fieldClass, providerName, nettyClientBootStarp);
Object proxy = rpcFactoryProxy.getProxy();
Object object = bean;
field.setAccessible(true);
try {
// 请开始你的表演
field.set(object, proxy);
} catch (IllegalAccessException e) {
e.printStackTrace();
}
}
return bean;
}
@Override
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
return bean;
}
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.context = applicationContext;
}
public RcpServiceInjectBeanPostProcessor() {
System.out.println("-----RcpServiceInjectBeanPostProcessor-----------");
}
}
通过反射发送数据。
(1)invoke方法可以获取到调用的接口名称、方法名称和参数
(2)代理对象构造的时候传入了需要调用的服务名称和NettyClientBootStarp
(3)在invoke中把接口名称、方法名称和参数通过Netty发送给服务提供者是可以实现的
public class RpcFactoryProxy<T> implements InvocationHandler {
private Class<T> proxyInterface;
// 这里可以维护一个缓存,存这个接口的方法抽象的对象
private NettyClientBootStarp nettyClientBootStarp;
private String serviceName;
public RpcFactoryProxy(
Class<T> proxyInterface, String serviceName, NettyClientBootStarp nettyClient) {
this.serviceName = serviceName;
this.proxyInterface = proxyInterface;
this.nettyClientBootStarp = nettyClient;
}
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
System.out.println("invoke");
Map<String, NettyClientGroup> providers = nettyClientBootStarp.getProviders();
NettyClientGroup nettyClientGroup = providers.get(serviceName);
if (null == nettyClientGroup) {
RpcResponse response = RpcResponse.NO_SERVICE();
return response.getReturnValue();
}
NettyClient nettyClient = nettyClientGroup.next();
if (null == nettyClient) {
RpcResponse response = RpcResponse.NO_SERVICE();
return response.getReturnValue();
}
RpcRequest rpcRequest = new RpcRequest();
rpcRequest.setRequestId(UUID.randomUUID().toString().substring(0, 8));
// 设置服务名称
rpcRequest.setServiceName(serviceName);
// 设置是哪个类
rpcRequest.setClazzName(proxyInterface.getName());
// 设置哪个方法
rpcRequest.setMethodName(method.getName());
// 设置参数类型
Class<?>[] parameterTypes = method.getParameterTypes();
String[] parameterTypeString = Class2String.class2String(parameterTypes);
rpcRequest.setParameterTypeStrings(parameterTypeString);
// 设置参数
rpcRequest.setParameters(args);
// 发送消息
RpcResponse response = nettyClient.sendMessage(rpcRequest);
if (response == null) {
response = RpcResponse.TIME_OUT(rpcRequest.getRequestId());
}
return response.getReturnValue();
}
public T getProxy() {
return (T)
Proxy.newProxyInstance(proxyInterface.getClassLoader(), new Class[] {proxyInterface}, this);
}
}
首先自定义实现Future的实现类RpcFuture,注意这里面有一个CountDownLatch(1)
public class RpcFuture<T> implements Future<T> {
private T response;
/**
* 因为请求和响应是一一对应的,所以这里是1
*/
private CountDownLatch countDownLatch = new CountDownLatch(1);
/**
* 获取响应,直到有结果才返回
* @return
* @throws InterruptedException
* @throws ExecutionException
*/
@Override
public T get() throws InterruptedException, ExecutionException {
countDownLatch.await();
return response;
}
@Override
public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
if (countDownLatch.await(timeout,unit)){
return response;
}
return null;
}
public void setResponse(T response) {
this.response = response;
countDownLatch.countDown();
}
}
当发送数据的时候创建一个RcpFuture,然后把该RcpFuture存在一个地方FuturePool,此时调用RcpFuture.get方法是阻塞的(阻塞原因CountDownLatch ),如下所示
public RpcResponse sendMessage(RpcRequest msg) {
// 存起来
RpcFuture<RpcResponse> future = new RpcFuture<>();
FuturePool.put(msg.getRequestId(), future);
RpcResponse rpcResponse = null;
try {
String s = JSONUtil.toJsonStr(msg);
f.channel().writeAndFlush(s);
rpcResponse = future.get(2000, TimeUnit.MILLISECONDS);
} catch (Exception e) {
e.printStackTrace();
} finally {
FuturePool.remove(msg.getRequestId());
}
return rpcResponse;
}
那么在打印的地方通过key获取RpcFuture ,然后把结果通过RpcFuture.setResponse方法设计进去,如下所示,上面就能返回结果了。
public class RpcClientHandler extends SimpleChannelInboundHandler<String> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
String resp = msg;
System.out.println("resp:"+resp);
RpcResponse rpcResponse = JSONUtil.toBean(resp, RpcResponse.class);
RpcFuture future = FuturePool.get(rpcResponse.getRequestId());
future.setResponse(rpcResponse);
}
1)一个小案例的实现要整合多个知识点,并且写了这篇晦涩难懂的文章,还是花了点时间的。
2)dubbo中使用的FactoryBean实现的远程调用,可以看看人家的思路。
3)学习到了很多东西,比如我现在才知道zk的监听机制只监听一次。
4)需要用到的知识点太多了,自定义注解,BeanPostProcessor,反射,Future等,是个学习Netty不错的案例。
5)新的一年,希望大家步步高升,一年比一年好。
https://github.com/2YSP/rpc-spring-boot-starter#rpc-spring-boot-starter