前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >I/O 多路复用, select, poll, epoll

I/O 多路复用, select, poll, epoll

原创
作者头像
ge3m0r
发布2024-05-16 23:08:01
880
发布2024-05-16 23:08:01

简述 I/O

I/O 是应用程序必然逃不掉的一个话题。大家在计算机基础学习中,学过计组,操作系统和计网,而想要把 I/O 研究深入肯定要将对这三个计算机基础方面有所深入。

我们谈及高性能的时候逃不掉的五个角度就是 cpu,内存,操作系统,网络带宽和应用程序。当然我们平时最大,当然几乎所有性能方面的分析都离不开这五个层面的分析,cpu 层面的 numa 架构,三级缓存,多核并发。内存层面的内存应用,垃圾回收,内存泄漏等等,当然,在大数据领域磁盘的 I/O 成为了一个很重要指标,但是本质上这不是软件设计领域问题,而是现实工程领域的问题。网络带宽较为简单,带宽大小,基本脱离软件领域设计硬件层面,而操作系统涉及 I/O 就必然离不开select,poll 和 epoll了。

从宏观角度,这三个其实都会设计阻塞,包括基于 epoll 设计的 reactor 模型也会涉及阻塞,当然 windows 有基于异步的 IOCP 模型。不过鉴于大多数应用在服务端还是基于 Linux ,所以不多讨论。

select

select 是通过将文件连接后所有的文件描述符放入一个集合中,当调用 select 函数会把所有文件描述符集合拷贝到内核,然后内核遍历整个集合,有事件发生时候,对这个 socket 进行标记,然后将集合拷贝到内存,这个文件描述符也拷贝到内存。

可以看到这个过程非常粗粝暴力,像极了我们做算法题的样子。

在开始代码之前,我觉得还是要不代码重复部分拿出来,调用系统 api 这样是有一定规范,也就是套路的,每次都写套路一部分多少会有重复。

代码语言:c
复制
#define bind_port  8888
#define BUFFER_LENGTH 1024
#define POLL_SIZE     1024
#define EPOLL_SIZE    1024
#define addr_ip 127.0.0.1
int create_sockfd(int port, const char* ip )
{
    int sockfd = socket(AF_INET, SOCK_STREAM, 0);
    if(sockfd < 0){
		perror("socket\n");
		return -1;
	}
    struct addr_in addr;
    memset(&addr, 0 ,sizeof(struct addr_in));

    addr.sin_family = AF_INET;
	addr.sin_port = htons(port);
	addr.sin_addr.s_addr = ip;

    if(bind(sockfd, (struct sockaddr*)&addr, sizeof(struct sockaddr_in)) < 0){
		 perror("bind");
		 return 2;
	}

	if(listen(sockfd, 5) < 0){
		perror("listen");
		return 3;
	}
    return sockfd;

}

上述是一个函数常规操作,socket 生成文件描述符,绑定 ip,端口,监听。

select 函数实现如下:

代码语言:c
复制
void handle_next(int sockfd){
    fd_set rfds, rset;             //声明文件描述符集合
    FD_ZERO(&rfds);         //初始化为 0
	FD_SET(sockfd, &rfds); //将sockfd 添加进去

	int max_fd = sockfd;    //设置最大 sockfd
	int i = 0;
	while(1){
		rset = rfds;

	    int nready = select(max_fd + 1, &rset, NULL, NULL, NULL);  //将文件集合拷贝到内核,然后内核返回发生事件文件描述符数量
	    if(nready < 0){ //没有事件发生
			printf("select error: %d\n", errno);
			continue;
	    }

		if(FD_ISSET(sockfd, &rset)){  //sockfd 在设置的文件描述符集合中
			struct sockaddr_in client_addr;
		    memset(&client_addr, 0, sizeof(struct sockaddr_in));
		    socklen_t client_len = sizeof(client_addr);

			int clientfd = accept(sockfd, (struct sockaddr*)&client_addr, &client_len);  //接受数据
		    if(clientfd < 0) continue;//没接收到

			char str[INET_ADDRSTRLEN] = {0};
			printf("recvied from %s at port, sockfd: %d, clientfd:%d", inet_ntop(AF_INET, &client_addr.sin_addr, str, sizeof(str)), ntohs(client_addr.sin_port), sockfd, clientfd);

			if(max_fd == FD_SETSIZE){  //文件集合数超出最大,select 有限制最大 1024 个,因为使用bitmap 进行标记
				printf("clientfd --> out range\n");
				break;
			}
			FD_SET(clientfd, & rfds);  //将接收数据返回的fd加入文件描述符集合
			if(clientfd > max_fd) max_fd = clientfd;

			printf(" sockfd: %d, max_fd:%d, clientfd:%d",  sockfd, max_fd clientfd);

			if(--nready == 0) continue;  //先接收数据所以先减后比较
	
		}
			
		for(i = sockfd + 1; i <= max_fd; i++){ //没有在 rset 中说明fd 已经进行了 accept
			if(FD_SET(i, &rset)){

			char buffer[BUFFER_LENGTH] = {0};
		    int ret = recv(clientfd, buffer, BUFFER_LENGTH, 0); //接受数据
			if(ret < 0){
				if(errno == EAGAIN || errno == EWOULDBLOCK){
					printf("read all data");
				}
				FD_CLR(i, &rfds);
				close(i);
			}else if(ret == 0){
				printf("disconnect %d\n", i);
				FD_CLR(i, &rfds);
				close(i);
				break;
			}else{
				printf("recv:%s, %d bytes\n", buffer, ret);
			}

			if(--nready == 0) break;
		   }
	   }
	}
}

poll

poll 跟 select 从设计层面基本没有区别,唯一的区别就是 select 使用 bitmap 来存储 fd 受限于设计字节数文件描述符集合最多有 1024 个,而 poll 使用动态数组存储事件状态和 fd ,所以在 fd 数量方面不受限制。

代码语言:c
复制
void handle_next(int sockfd)
{
    //poll 使用
    struct pollfd fds[POLL_SIZE] = {0};  //文件描述符使用动态数组
	fds[0].fd = sockfd; //初始化fd
	fds[0].events = POLLIN; //fd状态

	int max_fd = 0, i = 0;
	for (i = 1;i < POLL_SIZE;i ++) {  //初始化fd
		fds[i].fd = -1;
	}

	while (1) {
		int nready = poll(fds, max_fd+1, 5); //类似select ,这里设计了阻塞 I/O 和非阻塞 I/O ,最后一个参数表示等待事件的时间 -1 表示一直阻塞等到有 I/O 产生,而 0 表示每次询问都回答,而 5 表示每 5 ms 进行返回。
		if (nready <= 0) continue;

		if ((fds[0].revents & POLLIN) == POLLIN) { //实际发生的事情是 POLLIN 可读事件
			
			struct sockaddr_in client_addr;
			memset(&client_addr, 0, sizeof(struct sockaddr_in));
			socklen_t client_len = sizeof(client_addr);
		
			int clientfd = accept(sockfd, (struct sockaddr*)&client_addr, &client_len);  //从操作系统读取数据
			if (clientfd <= 0) continue;

			char str[INET_ADDRSTRLEN] = {0};
			printf("recvived from %s at port %d, sockfd:%d, clientfd:%d\n", inet_ntop(AF_INET, &client_addr.sin_addr, str, sizeof(str)),
				ntohs(client_addr.sin_port), sockfd, clientfd);

			fds[clientfd].fd = clientfd;
			fds[clientfd].events = POLLIN;

			if (clientfd > max_fd) max_fd = clientfd;

			if (--nready == 0) continue;
		}

		for (i = sockfd + 1;i <= max_fd;i ++) {
			if (fds[i].revents & (POLLIN|POLLERR)) {  //可读或错误事件
				char buffer[BUFFER_LENGTH] = {0};
				int ret = recv(i, buffer, BUFFER_LENGTH, 0); //接受数据
				if (ret < 0) {
					if (errno == EAGAIN || errno == EWOULDBLOCK) {
						printf("read all data");
					}
					
					//close(i);
					fds[i].fd = -1;
				} else if (ret == 0) {
					printf(" disconnect %d\n", i);
					
					close(i);
					fds[i].fd = -1;
					break;
				} else {
					printf("Recv: %s, %d Bytes\n", buffer, ret);
				}
				if (--nready == 0) break;
			}
		}
	}
}

epoll

最后到了大名鼎鼎的 epoll ,epoll 其实很简单,就是在操作系统内核中将事件对象维护在一个红黑树中,通过函数表现就是 epoll_ctl(),当有文件描述符发生事件,会从红黑树形成一个就绪队列,然后用户不用来回拷贝红黑树,就绪队列形成链表,返回用户,通过函数表现就是 epoll_wait()。

上代码:

代码语言:c
复制
void handle_next(int sockfd)
{
    int epoll_fd = epoll_create(EPOLL_SIZE);          //用户端创建这样大小的epoll 红黑树
	struct epoll_event ev, events[EPOLL_SIZE] = {0}; //epoll 事件

	ev.events = EPOLLIN;  //可读事件
	ev.data.fd = sockfd;     //epoll 
	epoll_ctl(epoll_fd, EPOLL_CTL_ADD, sockfd, &ev);  //将 sockfd 添加到 epoll_fd 这个红黑树中

	while (1) {

		int nready = epoll_wait(epoll_fd, events, EPOLL_SIZE, -1);  //查看就绪队列,返回就绪事件内容
		if (nready == -1) {
			printf("epoll_wait\n");
			break;
		}

		int i = 0;
		for (i = 0;i < nready;i ++) {
			if (events[i].data.fd == sockfd) {
				
				struct sockaddr_in client_addr;
				memset(&client_addr, 0, sizeof(struct sockaddr_in));
				socklen_t client_len = sizeof(client_addr);
			
				int clientfd = accept(sockfd, (struct sockaddr*)&client_addr, &client_len);
				if (clientfd <= 0) continue;
	
				char str[INET_ADDRSTRLEN] = {0};
				printf("recvived from %s at port %d, sockfd:%d, clientfd:%d\n", inet_ntop(AF_INET, &client_addr.sin_addr, str, sizeof(str)),
					ntohs(client_addr.sin_port), sockfd, clientfd);

				ev.events = EPOLLIN | EPOLLET; //可读且是边缘触发  边缘触发就是有可读事件应用只会苏醒一次
				ev.data.fd = clientfd;
				epoll_ctl(epoll_fd, EPOLL_CTL_ADD, clientfd, &ev);  //添加 clientfd
			} else {
				int clientfd = events[i].data.fd;

				char buffer[BUFFER_LENGTH] = {0};
				int ret = recv(clientfd, buffer, BUFFER_LENGTH, 0);//接受数据
				if (ret < 0) {
					if (errno == EAGAIN || errno == EWOULDBLOCK) {
						printf("read all data");
					}
					
					close(clientfd);
					
					ev.events = EPOLLIN | EPOLLET;
					ev.data.fd = clientfd;
					epoll_ctl(epoll_fd, EPOLL_CTL_DEL, clientfd, &ev);  //接收后删除fd
				} else if (ret == 0) {
					printf(" disconnect %d\n", clientfd);
					
					close(clientfd);

					ev.events = EPOLLIN | EPOLLET;
					ev.data.fd = clientfd;
					epoll_ctl(epoll_fd, EPOLL_CTL_DEL, clientfd, &ev);
					
					break;
				} else {
					printf("Recv: %s, %d Bytes\n", buffer, ret);
				}
				
			}
		}

	}

}

至此,I/O 多路复用就结束了,所谓多路复用就是将所有文件描述维护起来,当然这个过程主要是内核层面完成,用户层面是不是也可以进行维护呢?这就涉及大名鼎鼎的 reactor 反应堆模型了,fd 这么乱,我要管管了,这是程序员版的我来了,我看见,我征服!

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 简述 I/O
  • select
  • poll
  • epoll
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档