多路转接属于 IO 复用方式的一种。系统提供 select()
函数来实现多路复用输入/输出模型。select
系统调用是用来让我们的程序监视多个文件描述符的状态变化的。程序会停在 select
这里等待,直到被监视的文件描述符有一个或多个发生了状态改变。
select 只负责等待,而且一次可以等待多个文件描述符。其中接口如下:
其中第一个参数 nfds
表示 select 等待的多个文件描述符的最大值+1,例如需要 select 等待的 fd 有 1、2、3、4、5,那么 nfds 这个参数就是 6.
返回值如果大于0,代表有 n 个 fd 就绪了;如果返回值等于 0,代表超时,表示没有错误,也没有 fd 就绪;如果小于 0,表示等待出错。
关于最后一个参数 struct timeval
,我们需要另外介绍一下,在 Linux 中有对应的接口可以让我们获取时间,例如 gettimeofday()
可以获取特定时区下的特定时间,如下:
其中它的参数中也有 struct timeval
结构,该结构中的字段如下:
其中 tv_sec
表示时间戳,以秒为单位;tv_usec
以微秒为单位。
所以回到 select 接口中,最后一个参数 struct timeval
表示给 select 设置等待方式,例如设为 struct timeval timeout = {5, 0}
表示每隔 5 秒,timeout 一次,也就是在这 5 秒期间,没有任何一个文件描述符就绪,select 就会直接返回,然后再重新进入,设置 5 秒的时候,就重复刚才的工作。如果在等待 5 秒期间有文件描述符就绪了,那么就会立即返回。如果我们设为 {0, 0} 代表立马返回,非阻塞的一种。-1 表示阻塞等待。
另外,如果我们设置了,这个参数是一个输入输出型参数。例如我们设置每隔 5 秒 timeout 一次,可是刚过去 2 秒就有文件描述符就绪了,此时 timeout 输出时就变成了 3 秒。
最后,第二、三、四个参数都是同一个类型 fd_set
,fd_set
是内核提供的一种数据类型,它是位图。我们目前关心的 fd 上面的读写事件,要么特定的 fd 上读事件就绪,要么特定的 fd 上写事件就绪,要么特定的 fd 上有异常事件。所以对于任何一个文件描述符,如果只准它关心一种事件,那么就是这三种的其中一种。所以如果我们关心特定一个 fd 上读事件就绪,就让 select 来通知我们,我们就应该把文件描述符设置进第二个参数中。如果我们关心写事件就绪,就把文件描述符设置进第三个参数中。如果我们既关心读又关心写,我们可以同时设置进第二和第三个参数中。
下面我们单独拿第二个参数 readfds
来讲,这个集合也是输入输出型参数。当它是输入时,表示的是,用户告诉内核,我给你的一个或者多个 fd,你要帮我关心 fd 上面的读事件,如果事件就绪了,你就要告诉我!当它是输出时,也就是返回时,内核告诉用户,你让我关心的多个 fd 中,有哪些已经就绪了,你赶紧读取吧!其中这个位图传入的时候,比特位的位置,就表示文件描述符编号,比特位的内容,0 或者 1,就表示是否需要内核关心! 当有 fd 就绪时,操作系统就直接修改该位图中的内容,将已经就绪的 fd 在该位图的位置不变,也就是还是 1,将没有就绪的位置清0,也就是返回输出的时候,0 还是 1,表示哪些用户关心的 fd 上面的读事件已经就绪了!所以 fd_set
是一张位图,是为了让用户和内核传递 fd 是否就绪的信息的!
所以这就注定了使用 select 的时候,一定会有大量的位图操作,所以操作系统给我们提供了一系列的位图操作接口,如下:
最后我们知道,fd_set
是一个位图,并且是一个具体的类型,所以 fd_set
就一定有具体的大小,只要有实际的大小,那么 fd_set
就一定有它位图中比特位的个数,也就是说 select 等待多个文件描述符一定是有上限的!下面我们验证一下 select 最多可以等待多少个文件描述符,如下代码:
int main()
{
std::cout << "fd_set bits num: " << sizeof(fd_set) * 8 << std::endl;
return 0;
}
结果如下:
所以在我们当前机器的 select 能够等待的文件描述符个数是 1024 个。
下面我们写一段简单的代码使用 select 完成多个文件描述符的等待,详细解析参考代码注释,代码如下:
封装的 socket 套接字 Socket.hpp:
#pragma once
#include <iostream>
#include <string>
#include <cstring>
#include <unistd.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include "log.hpp"
enum
{
SocketErr = 2,
BindErr,
ListenErr,
};
const int backlog = 10;
class Sock
{
public:
Sock()
{}
~Sock()
{}
public:
void Socket()
{
_sockfd = socket(AF_INET, SOCK_STREAM, 0);
if(_sockfd < 0)
{
lg(Fatal, "socket error, %s: %d", strerror(errno), errno);
exit(SocketErr);
}
int opt = 1;
setsockopt(_sockfd, SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT, &opt, sizeof(opt));
}
void Bind(uint16_t port)
{
sockaddr_in local;
memset(&local, 0, sizeof(local));
local.sin_addr.s_addr = INADDR_ANY;
local.sin_family = AF_INET;
local.sin_port = htons(port);
if(bind(_sockfd, (const sockaddr*)&local, sizeof(local)) < 0)
{
lg(Fatal, "bind error, %s: %d", strerror(errno), errno);
exit(BindErr);
}
}
void Listen()
{
if(listen(_sockfd, backlog) < 0)
{
lg(Fatal, "listen error, %s: %d", strerror(errno), errno);
exit(ListenErr);
}
}
int Accept(std::string* client_ip, uint16_t* client_port)
{
sockaddr_in peer;
socklen_t len = sizeof(peer);
int newfd = accept(_sockfd, (sockaddr*)&peer, &len);
if(newfd < 0)
{
lg(Warning, "accept error, %s: %d", strerror(errno), errno);
return -1;
}
char buffer[64];
inet_ntop(AF_INET, &peer.sin_addr, buffer, sizeof(buffer));
*client_ip = buffer;
*client_port = ntohs(peer.sin_port);
return newfd;
}
void Close()
{
close(_sockfd);
}
bool Connect(std::string serverip, uint16_t serverport)
{
sockaddr_in peer;
memset(&peer, 0, sizeof(peer));
inet_pton(AF_INET, serverip.c_str(), &(peer.sin_addr));
peer.sin_family = AF_INET;
peer.sin_port = htons(serverport);
int n = connect(_sockfd, (const sockaddr*)&peer, sizeof(peer));
if(n < 0)
{
lg(Fatal, "connect error, %s: %d", strerror(errno), errno);
return false;
}
return true;
}
int GetFd()
{
return _sockfd;
}
private:
int _sockfd;
};
对 select 封装的 SelectServer.hpp:
#pragma once
#include <iostream>
#include <string>
#include <sys/select.h>
#include <sys/time.h>
#include "Socket.hpp"
#include "log.hpp"
static const uint16_t defaultport = 8888;
static const int fd_set_max = (sizeof(fd_set) * 8);
int default_fd = -1;
class SelectServer
{
public:
SelectServer(uint16_t port = defaultport)
: _port(port)
{
for (int i = 0; i < fd_set_max; ++i)
{
fd_array[i] = default_fd;
}
}
bool Init()
{
_listenSock.Socket();
_listenSock.Bind(8888);
_listenSock.Listen();
return true;
}
void Start()
{
int listenSock = _listenSock.GetFd();
fd_array[0] = listenSock;
while (true)
{
fd_set rfds;
FD_ZERO(&rfds); // 清空集合
int maxfd = fd_array[0];
for (int i = 0; i < fd_set_max; ++i)
{
if (fd_array[i] == default_fd)
continue;
FD_SET(fd_array[i], &rfds); // 向集合添加指定fd
// 更新最大的 fd
if (maxfd < fd_array[i])
{
maxfd = fd_array[i];
lg(Info, "max fd update, max fd is: %d", maxfd);
}
}
struct timeval timeout = {2, 0}; // 输入输出,可能要进行周期重复设置
// select 告诉我们就绪了,接下来的一次读取,我们读取 fd 的时候,不会被阻塞
// rfds 是输入输出型参数,所以在输入时可能是 1111,返回时可能只有一个fd就绪,那么就被覆盖成 0001
// 所以 rfds 原来的位图中的值就不见了,也就是需要内核关心的fd不见了!
// 所以就要求 select 每次返回处理完之后,回到循环开始,每一次都要把 rfds 的参数重新设置!
int n = select(maxfd + 1, &rfds, nullptr, nullptr, &timeout);
switch (n)
{
case 0:
// std::cout << "time out, timeout: " << timeout.tv_sec << "." << timeout.tv_usec << std::endl;
break;
case -1:
std::cerr << "select error" << std::endl;
break;
default:
// 有事件就绪了,交给事件派发器 Dispatcher
std::cout << "get a new link!" << std::endl;
Dispatcher(rfds);
break;
}
}
}
~SelectServer()
{
_listenSock.Close();
}
private:
Sock _listenSock;
uint16_t _port;
// 辅助数组,为了将合法的文件描述符添加到 rfds 中
int fd_array[fd_set_max];
};
事件派发器 Dispatcher()
void Dispatcher(fd_set &rfds)
{
for (int i = 0; i < fd_set_max; ++i)
{
int fd = fd_array[i];
if (fd == default_fd)
continue;
// 如果当前 fd 在 rfds 中已经就绪
if (FD_ISSET(fd, &rfds))
{
// 处理 listen 套接字
if (fd == _listenSock.GetFd())
{
// 连接管理器
Accepter();
}
// 其他文件描述符就绪, 也就是读事件就绪
else
{
Recver(fd, i);
}
}
}
}
连接管理器 Accepter()
void Accepter()
{
// 连接事件就绪
std::string clientip;
uint16_t clientport = 0;
int sock = _listenSock.Accept(&clientip, &clientport); // 这里不会阻塞,因为事件已经就绪
if (sock < 0) return;
lg(Info, "accept success, %s: %d", clientip.c_str(), clientport);
// 将已经就绪的 sock 添加到辅助数组中即可,当 select 下一次设置的时候就会将该 fd 设置到 rfds 中!
int pos = 1;
for (; pos < fd_set_max; ++pos)
{
if (fd_array[pos] != default_fd)
continue;
else
break;
}
if (pos == fd_set_max)
{
lg(Warning, "server is full, close %d now!", sock);
close(sock);
}
else
{
fd_array[pos] = sock;
}
}
读事件处理器 Recver()
void Recver(int fd, int pos)
{
char buffer[1024];
ssize_t n = read(fd, buffer, sizeof(buffer) - 1);
if (n > 0)
{
buffer[n] = 0;
std::cout << "get a message: " << buffer << std::endl;
}
else if (n == 0)
{
lg(Info, "client quit, me too, close fd is: %d", fd);
close(fd);
fd_array[pos] = default_fd; // 本质从 rfds 中移除
}
else
{
lg(Warning, "recv error, fd is: %d", fd);
close(fd);
fd_array[pos] = default_fd; // 本质从 rfds 中移除
}
}
poll 也是多路转接方案的一种,它主要解决的就是 select 中的等待 fd 有上限的问题,以及每次都要对关心的 fd 进行事件重置的问题。
下面我们看看 poll 的接口:
首先 poll 的返回值和 select 的返回值一模一样。
第三个参数 timeout 其实是一个整型,表示的是时间,单位为毫秒,含义和 select 中的 timeout 一样。
而我们发现,poll 的第一个参数,专门设计了一个结构体 struct pollfd
,其实我们可以理解成这个结构体是一个数组,而第一个参数就表示第一个元素的地址。
第二个参数 nfds 代表第一个参数的数组中有多少个元素。
我们知道,多路转接无非包括两点,第一,用户告诉内核;第二,内核告诉用户。poll 和 select 一样,只不过 select 用位图,而 poll 用结构体数组。所以 poll 在用户传给内核的时候,表示告诉内核需要关心 struct pollfd
结构体中的 fd 中的 events 事件;当返回时,代表 struct pollfd
结构体中的 fd 中的 revents 事件就绪了。所以,poll 最大的特点是将输入和输出事件进行了分离!
但是当我们告诉内核需要关心 events 事件的时候,内核怎么知道是关心读事件还是写事件还是其他事件呢?当内核返回用户也一样。那么我们可以看到 events 和 revents 都是 short 类型,都是 16 个比特位,也就是在 Linux 中,使用了比特位传参!所以它把事件设置成位图的形式,如下,其实它们都是宏:
所以,poll 的本质是将读写事件分离,然后传入用户定的数组元素的大小,通过 events 和 revents 以位图的方式来传递就绪和关心标记位的解决方案!
下面我们直接对 selectSever.hpp 做修改,改成一个 pollSever.hpp,代码如下:
#pragma once
#include <iostream>
#include <string>
#include <sys/time.h>
#include <poll.h>
#include "Socket.hpp"
#include "log.hpp"
static const uint16_t defaultport = 8888;
static const int fd_num_max = 64;
static const int default_fd = -1;
static const int non_event = 0;
class PollServer
{
public:
PollServer(uint16_t port = defaultport)
: _port(port)
{
for (int i = 0; i < fd_num_max; ++i)
{
_event_fds[i].fd = default_fd;
_event_fds[i].events = non_event;
_event_fds[i].revents = non_event;
}
}
bool Init()
{
_listenSock.Socket();
_listenSock.Bind(8888);
_listenSock.Listen();
return true;
}
void Accepter()
{
// 连接事件就绪
std::string clientip;
uint16_t clientport = 0;
int sock = _listenSock.Accept(&clientip, &clientport); // 这里不会阻塞,因为事件已经就绪
if (sock < 0) return;
lg(Info, "accept success, %s: %d", clientip.c_str(), clientport);
// 将已经就绪的 sock 添加到 _event_fds 中
// 并将它的 events 设置为读事件 POLLIN
int pos = 1;
for (; pos < fd_num_max; ++pos)
{
if (_event_fds[pos].fd != default_fd)
continue;
else
break;
}
if (pos == fd_num_max)
{
lg(Warning, "server is full, close %d now!", sock);
close(sock);
// 可以选择扩容...
}
else
{
_event_fds[pos].fd = sock;
_event_fds[pos].events = POLLIN;
}
}
void Recver(int fd, int pos)
{
char buffer[1024];
ssize_t n = read(fd, buffer, sizeof(buffer) - 1);
if (n > 0)
{
buffer[n] = 0;
std::cout << "get a message: " << buffer << std::endl;
}
else if (n == 0)
{
lg(Info, "client quit, me too, close fd is: %d", fd);
close(fd);
_event_fds[pos].fd = default_fd; // 本质从 结构体数组 中移除
}
else
{
lg(Warning, "recv error, fd is: %d", fd);
close(fd);
_event_fds[pos].fd = default_fd; // 本质从 结构体数组 中移除
}
}
void Dispatcher()
{
for (int i = 0; i < fd_num_max; ++i)
{
int fd = _event_fds[i].fd;
if (fd == default_fd)
continue;
// 如果当前 fd 在 _event_fds 中已经就绪
if (_event_fds[i].revents & POLLIN)
{
// 处理 listen 套接字
if (fd == _listenSock.GetFd())
{
// 连接管理器
Accepter();
}
// 其他文件描述符就绪, 也就是读事件就绪
else
{
Recver(fd, i);
}
}
}
}
void Start()
{
_event_fds[0].fd = _listenSock.GetFd();
_event_fds[0].events = POLLIN; // listen 套接字只关心获取连接,即读事件
int timeout = 2000; // 2s
while (true)
{
int n = poll(_event_fds, fd_num_max, timeout);
switch (n)
{
case 0:
std::cout << "time out..." << std::endl;
break;
case -1:
std::cerr << "poll error" << std::endl;
break;
default:
// 有事件就绪了,交给事件派发器 Dispatcher
std::cout << "get a new link!" << std::endl;
Dispatcher( );
break;
}
}
}
~PollServer()
{
_listenSock.Close();
}
private:
Sock _listenSock;
uint16_t _port;
struct pollfd _event_fds[fd_num_max];
};
_event_fds
这个数组的大小是由我们自己定的,所以我们可以定的非常大,大到内存扛不住,所以此时就是操作系统的问题了,不是 poll 接口本身的问题。而 select 等待 fd 有上限的问题,本质上是接口本身的问题,所以 poll 本质上是解决了 select 等待 fd 有上限的问题。所以 poll 和 select 都避免不开遍历的问题,也就是在效率上没有本质的提升。于是又出现了另一个接口 epoll,我们下一篇再介绍。