前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Reactor 模型

Reactor 模型

原创
作者头像
ge3m0r
发布2024-05-20 22:47:36
980
发布2024-05-20 22:47:36

Reactor 模型

reactor 是一种管理网络 I/O 的模型,我们知道,内核对于网络 I/O 的管理方式是用的 select/poll epoll ,那么应用程序之间可能也需要一种管理 I/O 的方式,reactor 模型就此诞生。

应用程序
应用程序

客户端发送数据,内核接受数据返回给应用层,这就好比去会所都要有个老鸨接待,而应用程序也需要一个接待处,可以假想成在应用程序和内核之间加入了一层接待处。

代码实现

reactor 模型,我们既然要管理 I/O ,那我们要怎么管理,如何管理。

首先我们需要确认的是,我们做的是应用层的网络 I/O 管理,除此之外,我们也无法改变内核如何管理 I/O ,因此我们做这个模型,需要借助内核 epoll 的管理方式。

而我们直到 epoll 会将 socket 的描述符加入红黑树 epfd

代码语言:c
复制
int epfd = epoll_create(1);

然后产生消息的事件会生成就绪队列,我们首先需要一个维护红黑树关系的数据结构,另外需要对每个 socket 描述符进行封装。

数据结构定义

socket 描述符可以封装成一个事件,而红黑树负责管理事件。

代码语言:c
复制
#define BUFFER_LENGTH    4096
struct ntyevent{
	int fd;
	int events;                //文件符事件
	void *arg;
	int (*callback)(int fd, int events, void* arg);  //回调函数

	int status;                           //状态为 1 表示修改,0 为添加
	char buffer[BUFFER_LENGTH];   //缓存
	int length;
	long last_active;
};

而需要管理这些封装的事件就是反应堆模型。

代码语言:c
复制
struct ntyreactor{
	int epfd;                         //红黑树管理
	struct ntyevent *events //可以使用链表管理事件,也可以使用数组
};

对数据结构的操作

首先就是对 ntyevent 结构体的操作,首先就是对事件类似增删查改的函数。

结构体肯定需要初始化,而我们把这个函数表示为设置。

代码语言:c
复制
//事件设置
void nty_event_set(struct ntyevent *ev, int fd, NCALLBACK callback, void *arg)
{
     //对events赋值
     ev->fd = fd;
	 //回调函数
	 ev->callback = callback;
	 ev->events = 0;
	 ev->arg = arg;
	 //当前的事件时间
	 ev->last_active = time(NULL);

	 return ;
}

这个函数类似很多初始化函数,当然他也承担一部分改的功能。

然后就是一个增一个删的函数,为什么没有查函数,因为只要声明了结构体指针然后直接引用函数内容即可。因此对于结构体来说增删是最需要的。

代码语言:c
复制
//主体功能将事件添加到 epfd 的红黑树
int nty_event_add(int epfd, int events, struct ntyevent *ev)
{
    //创建 epoll_event
    struct epoll_event ep_ev = {0, {0}};
	//将事件赋值给 epoll_event 和 ev
	ep_ev.events = ev->events = events;

	int op;
	//状态为 1 op 设置为可更改
	if(ev->status == 1){
		op = EPOLL_CTL_MOD;
	}else{
	    op = EPOLL_CTL_ADD;
		ev->status = 1;
	}

    //epoll 事件的管理
	if(epoll_ctl(epfd, op, ev->fd, &ep_ev) < 0){
		printf("event add failed[fd = %d], events[%d]\n", ev->fd,events);
		return -1;
	}

	return 0;
}

而删除函数为:

代码语言:c
复制
int nty_event_del(int epfd, struct ntyevent *ev){
    struct epoll_event ep_ev = {0, {0}};

	if(ev->status != 1){  //状态不是修改
		return -1;
	}

	ep_ev.data.ptr = ev;  //将ev添加到 epoll 结构体中
	ev->status = 0;
	epoll_ctl(epfd, EPOLL_CTL_DEL, ev->fd, &ep_ev);

	return 0;
}

说完对与 ntyevent 结构体的管理,那么即使 reactor 结构体管理。

首先就是初始化结构体。

代码语言:c
复制
#define MAX_EPOLL_EVENTS 1024
int ntyreactor_init(struct ntyreactor *reactor) {

	if (reactor == NULL) return -1;
	memset(reactor, 0, sizeof(struct ntyreactor));

    //创建一个 epfd 
	reactor->epfd = epoll_create(1);
	if (reactor->epfd <= 0) {
		printf("create epfd in %s err %s\n", __func__, strerror(errno));
		return -2;
	}

    //创建事件数组  也可以使用链表进行管理
	reactor->events = (struct ntyevent*)malloc((MAX_EPOLL_EVENTS) * sizeof(struct ntyevent));
	if (reactor->events == NULL) {
		printf("create epfd in %s err %s\n", __func__, strerror(errno));
		close(reactor->epfd);
		return -3;
	}
}

初始化好 reactor 结构体,我们然后要确认,在网络 I/O 过程中,我们期望这个结构体能干什么事情。

在一般的网络模型,服务端就是生成 socket, bind 端口,然后listen,有客户端连接需要 accept,同时接受数据,发送数据。listen 是一个状态需要管理,而 accept 表示未接收数据的一个状态,这个状态都是需要管理的,而且状态只会改变,比方说从 listen 变成 accept,因此 reactor 管理跟普通的管理不太一样

创建 socket ,bind 和 listen基本不存在双方交互的一个情况,而另外四个方式都涉及与客户端的交互。

,那么首先就是监听事件添加

代码语言:c
复制
//添加监听事件
int ntyreactor_addlistener(struct ntyreactor *reactor, int sockfd, NCALLBACK *acceptor) {  

	if (reactor == NULL) return -1;
	if (reactor->events == NULL) return -1;

	nty_event_set(&reactor->events[sockfd], sockfd, acceptor, reactor); //设置事件
	nty_event_add(reactor->epfd, EPOLLIN, &reactor->events[sockfd]); //将事件添加到 epoll 红黑树  EPOLLIN 表示socket描述符为可读事件
 
	return 0;
}

然后即使监听事件的不同状态的转换

代码语言:c
复制
int ntyreactor_run(struct ntyreactor *reactor) {
	if (reactor == NULL) return -1;
	if (reactor->epfd < 0) return -1;
	if (reactor->events == NULL) return -1;
	
    //epoll事件
	struct epoll_event events[MAX_EPOLL_EVENTS+1];
	
	int checkpos = 0, i;

	while (1) {

		long now = time(NULL);  //当前时间
		for (i = 0;i < 100; i++, checkpos++) {  //每次检查 100个 socket 描述符
			if (checkpos == MAX_EPOLL_EVENTS) {  //当循环到时间是最大时间
				checkpos = 0;
			}

			if (reactor->events[checkpos].status != 1) { //表示状态不是添加,时间是修改
				continue;
			}

			long duration = now - reactor->events[checkpos].last_active;//时间上次活跃时间

			if (duration >= 60) {  //两个时间间隔超过 60s
				close(reactor->events[checkpos].fd);  //关闭 socket 描述符
				printf("[fd=%d] timeout\n", reactor->events[checkpos].fd);
				nty_event_del(reactor->epfd, &reactor->events[checkpos]); //将socket描述符删除
			}
		}


		int nready = epoll_wait(reactor->epfd, events, MAX_EPOLL_EVENTS, 1000);  //从红黑树中获取就绪数据数
		if (nready < 0) {  //没有就绪时间
			printf("epoll_wait error, exit\n");
			continue;
		}

		for (i = 0;i < nready;i ++) {

			struct ntyevent *ev = (struct ntyevent*)events[i].data.ptr; // epoll 的时间

			if ((events[i].events & EPOLLIN) && (ev->events & EPOLLIN)) { //事件为可读
				ev->callback(ev->fd, events[i].events, ev->arg);  //使用回调函数
			}
			if ((events[i].events & EPOLLOUT) && (ev->events & EPOLLOUT)) { //事件为可写
				ev->callback(ev->fd, events[i].events, ev->arg); //使用对应回调回到应用程序处理事件
			}
			
		}

	}
}

然后就是数据结构的销毁。

代码语言:c
复制
int ntyreactor_destory(struct ntyreactor *reactor) {

	close(reactor->epfd);
	free(reactor->events);

}

比较简单,就是关闭 epoll 的描述符,然后释放内存。

回调函数

回调函数使用内核将事件复制到应用程序,我们需要处理的逻辑,就网络 I/O 来说,从服务器来说,我们从监听开始,因此第一个函数就是 accept 函数。

代码语言:c
复制
typedef int NCALLBACK(int, int, void*);//基础函数模型
//找出所有的可读的 accept 事件,然后将事件下一个回调设置为 recv 然后将事件添加到红黑树中
int accept_cb(int cb, int events, void*arg)
{
    struct ntyreactor *reactor = (struct ntyreactor *)arg; //通过 arg 表示reactor 数据结构体
	if(reactor == NULL) return -1;

	struct sockaddr_in client_addr;   //初始化接收需要的数据结构
	socklen_t len = sizeof(client_addr);

	int clientfd;

	if((clientfd = accept(fd, (struct sockaddr*)&client_addr, &len)) == -1){  //接受函数
		if(errno != EAGAIN && errno != ENTER){  //错误处理
			printf("accept: %s\n", strerror(errno));
			return -1;
		}
	}

	int i = 0;
	do{
		for(i = 0; i < MAX_EPOLL_EVENTS; i++){
			if(reactor->events[i].status == 0){  找到 epoll 事件中第一个事件为 EPOLLIN 可读事件
				break;
			}
		}

		
        if (i == MAX_EPOLL_EVENTS) { //没有相关的事件
			printf("%s: max connect limit[%d]\n", __func__, MAX_EPOLL_EVENTS);
			break;
		}

		int flag = 0;
		if ((flag = fcntl(clientfd, F_SETFL, O_NONBLOCK)) < 0) {  //设置为非阻塞 I/O
			printf("%s: fcntl nonblocking failed, %d\n", __func__, MAX_EPOLL_EVENTS);
			break;
		}

		nty_event_set(&reactor->events[clientfd], clientfd, recv_cb, reactor);  //accept 之后要将事件下一个状态设置为recv
		nty_event_add(reactor->epfd, EPOLLIN, &reactor->events[clientfd]); //将事件添加到 epfd 中

	} while (0);

	printf("new connect [%s:%d][time:%ld], pos[%d]\n", 
		inet_ntoa(client_addr.sin_addr), ntohs(client_addr.sin_port), reactor->events[i].last_active, i);

	return 0;

}

accept 之后就是 recv 函数,recv 的回调函数为

代码语言:c
复制
int recv_cb(int fd, int events, void * args)
{
    struct ntyreactor *reactor = (struct ntyreactor *)arg;  //传入 reactor
	struct ntyevent *ev = reactor->events + fd; //传入 events 的位置,因为使用数组,链表就要进行遍历了

	int len = recv(fd, ev->buffer, BUFFER_LENGTH, 0);  //接受数据
	nty_event_del(reactor->epfd, ev);  //将事件从红黑树中删除

	if(len > 0){ //接受到数据
		ev->length = len;
		ev->buffer[len] = '\0';

		printf("C[%d]:%s\n", fd, ev->buffer); //打印接受数据
		nty_event_set(ev, fd, send_cb, reactor);  //设置数据为send 发送事件
		nty_event_add(reactor->epfd, EPOLLOUT, ev); //添加事件为可写事件
	}else if(len === 0){//没有数据
		close(ev->fd);
		printf("[fd=%d] pos[%d], closed\n", fd, ev->reactor->events);
	}else{//错误
		close(ev->fd);
		printf("recv[fd=%d] error[%d]:%s\n", fd, errno, strerror(errno));
	}

	return len;
}

最后就是发送的回调函数。

代码语言:c
复制
int send_cb(int fd, int events, void * args)  //发送回调函数
{
    struct ntyreactor *reactor = (struct ntyreactor *)arg;
	struct ntyevent *ev = reactor->events + fd;

	int len = send(fd, ev->buffer, ev->length, 0);  //发送数据
	if(len > 0){
		printf("send[fd=%d], [%d]%s\n", fd, len, ev->buffer);

	    nty_event_del(reactor->epfd, ev);  //socket描述符删除
	    nty_event_set(ev, fd, recv_cb, reactor); //因为不确定是否接受完毕,所以还要接收
	    nty_event_add(reactor->epfd, EPOLLOUT, ev);  //添加到epoll 描述符中
	}else{//接受完毕或者错误,直接删除数据
		close(ev->fd);
		nty_event_del(reactor->epfd, ev);
		printf("send[fd=%d] error %s\n", fd, strerror(errno));
	}

	return len;
}

从服务端最基础做起

上述就是利用 Linux 内核 epoll 对网络 I/O 进行管理的状态基本已经做完,而作为服务端,最开始就是要创建 socket 这些老套的东西。

代码语言:c
复制
int init_sock(short port) {

	int fd = socket(AF_INET, SOCK_STREAM, 0); //创建socket
	fcntl(fd, F_SETFL, O_NONBLOCK); //设置非阻塞 I/O

	struct sockaddr_in server_addr;
	memset(&server_addr, 0, sizeof(server_addr));
	server_addr.sin_family = AF_INET;
	server_addr.sin_addr.s_addr = htonl(INADDR_ANY);
	server_addr.sin_port = htons(port);

	bind(fd, (struct sockaddr*)&server_addr, sizeof(server_addr)); //绑定socket

	if (listen(fd, 20) < 0) { //监听接受队列的长度
		printf("listen failed : %s\n", strerror(errno));
	}

	return fd;  //返回连接符
}

最后

最后就是一个总体函数使用,main 函数来了。

代码语言:c
复制
int main(int argc, char *argv[]) {

	unsigned short port = SERVER_PORT;
	if (argc == 2) {
		port = atoi(argv[1]);
	}

	int sockfd = init_sock(port);  //获取连接符

    //分配内存
	struct ntyreactor *reactor = (struct ntyreactor*)malloc(sizeof(struct ntyreactor));
	ntyreactor_init(reactor);

	//设置其中数据然后将 socket 添加到相应的管理中
	ntyreactor_addlistener(reactor, sockfd, accept_cb);
	ntyreactor_run(reactor);

	ntyreactor_destory(reactor);
	close(sockfd);
	

	return 0;
}

综上所述,其实不管是什么模型,其实就是定义数据结构后,首先对数据结构进行管理和业务逻辑,然后跟我们操作系统机制进行交互。当然对于 Java 这种封装比较完善的语言,可能就主要就是数据结构管理然后加上业务逻辑。和操作系统管理可能就是跟另一个数据结构进行交互了。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • Reactor 模型
  • 代码实现
    • 数据结构定义
      • 对数据结构的操作
        • 回调函数
          • 从服务端最基础做起
          • 最后
          领券
          问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档