选择libuv库里的tcp-echo-server作为源码阅读开头的主要动机是
首先我们要知道libuv的tcp-echo-server的main函数里干了什么
int main()
{
loop = uv_default_loop(); // 获取默认的事件循环
uv_tcp_t server; // 声明一个TCP句柄
uv_tcp_init(loop, &server); // 初始化服务端TCP句柄
uv_ip4_addr("0.0.0.0", DEFAULT_PORT, &addr); // 设置地址和端口
uv_tcp_bind(&server, (const struct sockaddr*)&addr, 0); // 绑定地址
// 开始监听,并指定新连接到达后的回调
int r = uv_listen((uv_stream_t*)&server, DEFAULT_BACKLOG, on_new_connection);
if (r) {
fprintf(stderr, "Listen error %s\n", uv_strerror(r)); // 如果有错误发生则打印错误信息
return 1; // 返回错误码
}
return uv_run(loop, UV_RUN_DEFAULT); // 启动事件循环
}
loop = uv_default_loop();
)uv_tcp_init(loop, &server);
)uv_ip4_addr("0.0.0.0", DEFAULT_PORT, &addr);
)uv_tcp_bind(&server, (const struct sockaddr*)&addr, 0);
)uv_listen((uv_stream_t*)&server, DEFAULT_BACKLOG, on_new_connection);
)uv_run(loop, UV_RUN_DEFAULT);
)fprintf(stderr, "Listen error %s\n", uv_strerror(r));
)return 1;
)事件循环是 libuv 功能的核心部分,它负责对 I/O 进行轮询,并基于不同的事件源执行它们的回调函数。libuv 的设计目标之一是为了让异步 I/O 操作变得简单易用,同时保持高性能。
uv_loop_t 是 libuv 中的一个核心结构,用于管理事件循环及其相关的所有资源。它是一个句柄类型,代表了一个事件循环实例。每个 uv_loop_t 对象管理着同一事件循环上的所有资源,并在整个事件循环的生命周期内都是可用的。
uv_loop_t 作为事件循环所有资源的统一入口,所有在事件循环上运行的各类 Handle 和 Request 实例都被注册到 uv_loop_t 内部定义的数据结构中。这使得 libuv 能够有效地管理各种 I/O 操作和其他异步任务。
这个函数就是将uv_loop_t初始化,给这个loop对象初始化一些默认的成员变量,比如初始化定时器、工作队列、观察者队列等。
/* Handle types. */
typedef struct uv_loop_s uv_loop_t;
struct uv_loop_s {
/* 用户数据 - 可以用这个字段存储任意数据。 */
void* data; // 用户可以用来存储指向任意数据的指针。
/* 活跃状态句柄计数器 */
unsigned int active_handles; // 当前活跃的句柄数量。libuv 使用这个字段来跟踪事件循环中的活跃句柄数量,当所有句柄都被关闭时,事件循环也会停止。
void* handle_queue[2]; // handle 双向队列 存储在 loop 中的哨兵头节点 句柄队列。用于存放等待处理的句柄,包括需要关闭的句柄和需要激活的句柄。
union {
void* unused; // 保留字段
unsigned int count; // 活跃请求的数量。用于跟踪当前活跃的请求(如定时器、异步请求等)的数量。
} active_reqs; // req 资源计数器 请求的活跃状态。这是一个联合体,可以用来存储活跃请求的数量或者作为未使用的内存。
/* 为将来扩展保留的内部存储空间。 */
void* internal_fields; // 为将来版本可能添加的新功能预留的空间。
/* 内部标志位,用来指示事件循环何时停止。 */
unsigned int stop_flag; // 停止标志。当设置此标志时,表示事件循环应该停止。
UV_LOOP_PRIVATE_FIELDS // 私有字段。这是 libuv 内部使用的字段,对于用户来说是不可见的,具体实现细节可能因版本和平台而不同。
};
static uv_loop_t default_loop_struct; // 用于存储默认事件循环的结构体
static uv_loop_t* default_loop_ptr = NULL; // 指向默认事件循环的指针
uv_loop_t* uv_default_loop(void) {
// 防止重复初始化
if (default_loop_ptr != NULL)
return default_loop_ptr;
// 初始化uv事件循环 uv_loop_init里有具体的初始化细节
if (uv_loop_init(&default_loop_struct))
return NULL;
// 将初始化后的事件循环结构体,赋值给默认事件循环指针并返回
default_loop_ptr = &default_loop_struct;
return default_loop_ptr;
}
int uv_loop_init(uv_loop_t* loop) {
uv__loop_internal_fields_t* lfields; // 用于存放内部字段的指针
struct heap* timer_heap; // 用于存放定时器堆的指针
int err; // 存放错误码的变量
/* 初始化 libuv 本身 */
uv__once_init(); // 初始化一次性的全局资源
/* 创建一个 I/O 完成端口 */
loop->iocp = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 1);
if (loop->iocp == NULL)
return uv_translate_sys_error(GetLastError()); // 如果创建失败,则返回错误码
lfields = (uv__loop_internal_fields_t*)uv__calloc(1, sizeof(*lfields));
if (lfields == NULL)
return UV_ENOMEM; // 分配内部字段内存,如果失败返回内存不足错误
loop->internal_fields = lfields; // 将分配的内存赋值给事件循环的内部字段
err = uv_mutex_init(&lfields->loop_metrics.lock); // 初始化互斥锁
if (err)
goto fail_metrics_mutex_init; // 如果初始化失败,跳转到错误处理部分
memset(&lfields->loop_metrics.metrics,
0,
sizeof(lfields->loop_metrics.metrics)); // 初始化度量结构体
/* 防止未初始化内存访问,必须在首次调用 uv_update_time 之前初始化 loop->time */
loop->time = 0;
uv_update_time(loop); // 更新当前时间戳
QUEUE_INIT(&loop->wq); // 初始化工作队列
QUEUE_INIT(&loop->handle_queue); // 初始化句柄队列
loop->active_reqs.count = 0; // 初始化活跃请求计数器
loop->active_handles = 0; // 初始化活跃句柄计数器
loop->pending_reqs_tail = NULL; // 初始化挂起请求尾指针
loop->endgame_handles = NULL; // 初始化结束游戏句柄列表
loop->timer_heap = timer_heap = uv__malloc(sizeof(*timer_heap)); // 分配定时器堆内存
if (timer_heap == NULL) {
err = UV_ENOMEM;
goto fail_timers_alloc; // 如果分配失败,跳转到错误处理部分
}
heap_init(timer_heap); // 初始化定时器堆
loop->check_handles = NULL; // 初始化检查句柄列表
loop->prepare_handles = NULL; // 初始化准备句柄列表
loop->idle_handles = NULL; // 初始化空闲句柄列表
loop->next_prepare_handle = NULL; // 初始化下一个准备句柄指针
loop->next_check_handle = NULL; // 初始化下一个检查句柄指针
loop->next_idle_handle = NULL; // 初始化下一个空闲句柄指针
memset(&loop->poll_peer_sockets, 0, sizeof loop->poll_peer_sockets); // 初始化轮询对等方套接字列表
loop->timer_counter = 0; // 初始化定时器计数器
loop->stop_flag = 0; // 初始化停止标志
err = uv_mutex_init(&loop->wq_mutex); // 初始化工作队列互斥锁
if (err)
goto fail_mutex_init; // 如果初始化失败,跳转到错误处理部分
err = uv_async_init(loop, &loop->wq_async, uv__work_done); // 初始化异步句柄用于执行工作
if (err)
goto fail_async_init; // 如果初始化失败,跳转到错误处理部分
uv__handle_unref(&loop->wq_async); // 使工作队列异步句柄不受引用计数影响
loop->wq_async.flags |= UV_HANDLE_INTERNAL; // 标记为内部句柄
err = uv__loops_add(loop); // 将事件循环添加到 libuv 的内部列表中
if (err)
goto fail_async_init; // 如果添加失败,跳转到错误处理部分
return 0; // 成功返回
fail_async_init:
uv_mutex_destroy(&loop->wq_mutex); // 如果异步初始化失败,销毁工作队列互斥锁
fail_mutex_init:
uv__free(timer_heap); // 销毁定时器堆内存
loop->timer_heap = NULL; // 清空定时器堆指针
fail_timers_alloc:
uv_mutex_destroy(&lfields->loop_metrics.lock); // 销毁度量互斥锁
fail_metrics_mutex_init:
uv__free(lfields); // 销毁内部字段内存
loop->internal_fields = NULL; // 清空内部字段指针
CloseHandle(loop->iocp); // 关闭 I/O 完成端口
loop->iocp = INVALID_HANDLE_VALUE; // 清空 I/O 完成端口指针
return err; // 返回错误码
}
uv_tcp_t是 libuv 中用于表示 TCP 句柄的数据结构。
uv_tcp_init的uv_tcp_init_ex 函数负责初始化 TCP 句柄,并根据指定的地址族创建 socket。
typedef struct uv_tcp_s uv_tcp_t;
struct uv_tcp_s {
UV_HANDLE_FIELDS //通用句柄字段
UV_STREAM_FIELDS //通用流读写处理字段
UV_TCP_PRIVATE_FIELDS //TCP 句柄特有的私有字段
};
#define UV_TCP_PRIVATE_FIELDS \
SOCKET socket; \
int delayed_error; \
union { \
struct { uv_tcp_server_fields } serv; \
struct { uv_tcp_connection_fields } conn; \
} tcp;
//SOCKET socket:
//TCP 句柄对应的 socket 文件描述符。
//int delayed_error:
//用于存储延迟错误的状态。
//union { struct { uv_tcp_server_fields } serv; struct { uv_tcp_connection_fields } conn; } tcp:
//用于区分 TCP 服务器句柄和 TCP 连接句柄的不同字段。
int uv_tcp_init(uv_loop_t* loop, uv_tcp_t* handle) {
return uv_tcp_init_ex(loop, handle, AF_UNSPEC); // 调用 uv_tcp_init_ex 函数,使用 AF_UNSPEC 作为地址族
}
int uv_tcp_init_ex(uv_loop_t* loop, uv_tcp_t* handle, unsigned int flags) {
int domain; // 存储地址族(协议族)
/* 使用低八位来存储地址族 */
domain = flags & 0xFF;
if (domain != AF_INET && domain != AF_INET6 && domain != AF_UNSPEC)
return UV_EINVAL; // 如果地址族不是 AF_INET、AF_INET6 或 AF_UNSPEC,则返回无效参数错误
if (flags & ~0xFF)
return UV_EINVAL; // 如果标志位中除了低八位以外还有其他位被设置了,则返回无效参数错误
uv__stream_init(loop, (uv_stream_t*)handle, UV_TCP); // 初始化流句柄为 TCP 类型
handle->tcp.serv.accept_reqs = NULL; // 初始化 accept 请求队列为 NULL
handle->tcp.serv.pending_accepts = NULL; // 初始化挂起的 accept 请求队列为 NULL
handle->socket = INVALID_SOCKET; // 初始化 socket 描述符为无效值
handle->reqs_pending = 0; // 初始化待处理请求计数为 0
handle->tcp.serv.func_acceptex = NULL; // 初始化 accept 函数为 NULL
handle->tcp.conn.func_connectex = NULL; // 初始化 connect 函数为 NULL
handle->tcp.serv.processed_accepts = 0; // 初始化已处理的 accept 计数为 0
handle->delayed_error = 0; // 初始化延迟错误为 0
/* 如果在此点之后有任何失败,我们需要从句柄队列中移除句柄,
* 因为它在 uv__stream_init 中被加入到了句柄队列中。
*/
if (domain != AF_UNSPEC) { // 如果指定了地址族
SOCKET sock; // 创建一个 socket 描述符
DWORD err; // 存储错误码
sock = socket(domain, SOCK_STREAM, 0); // 创建一个 socket
if (sock == INVALID_SOCKET) { // 如果创建失败
err = WSAGetLastError(); // 获取错误码
QUEUE_REMOVE(&handle->handle_queue); // 从句柄队列中移除句柄
return uv_translate_sys_error(err); // 返回转换后的错误码
}
err = uv__tcp_set_socket(handle->loop, handle, sock, domain, 0); // 设置 socket 描述符
if (err) { // 如果设置失败
closesocket(sock); // 关闭 socket
QUEUE_REMOVE(&handle->handle_queue); // 从句柄队列中移除句柄
return uv_translate_sys_error(err); // 返回转换后的错误码
}
}
return 0; // 成功返回 0
}
uv_ip4_addr 类似常规网络编程下的绑定地址的前置操作
uv_inet_pton(AF_INET, ip, &(addr->sin_addr.s_addr));:将字符串形式的 IP 地址转换为二进制格式,并填充到sin_addr字段。
int uv_ip4_addr(const char* ip, int port, struct sockaddr_in* addr) {
memset(addr, 0, sizeof(*addr)); // 清零结构体
addr->sin_family = AF_INET; // 设置地址族为 IPv4
addr->sin_port = htons(port); // 设置端口号
#ifdef SIN6_LEN
addr->sin_len = sizeof(*addr); // 设置长度(IPv6 特性)
#endif
//许多网络库里都有的操作
return uv_inet_pton(AF_INET, ip, &(addr->sin_addr.s_addr)); // 将 IP 地址转换为二进制格式,以便使用
}
实现了将一个 TCP 句柄绑定到指定地址的功能。首先验证输入参数,然后创建或重用 socket,并设置必要的 socket 选项。接着,尝试绑定 socket 到指定地址,并根据结果设置相应的标志。最后返回绑定的结果。
int uv_tcp_bind(uv_tcp_t* handle,
const struct sockaddr* addr,
unsigned int flags) {
unsigned int addrlen;
// 检查句柄类型是否为 TCP
if (handle->type != UV_TCP)
return UV_EINVAL; // 如果不是 TCP 类型,则返回无效参数错误
// 检查句柄是否正在关闭
if (uv__is_closing(handle)) {
return UV_EINVAL; // 如果句柄正在关闭,则返回无效参数错误
}
// 根据地址族确定地址长度
if (addr->sa_family == AF_INET) {
addrlen = sizeof(struct sockaddr_in); // IPv4 地址长度
}
else if (addr->sa_family == AF_INET6) {
addrlen = sizeof(struct sockaddr_in6); // IPv6 地址长度
}
else {
return UV_EINVAL; // 如果地址族既不是 AF_INET 也不是 AF_INET6,则返回无效参数错误
}
// 调用内部函数绑定地址 执行类似网络编程中调用bind函数的操作
return uv__tcp_bind(handle, addr, addrlen, flags); // 返回绑定结果
}
int uv__tcp_bind(uv_tcp_t* tcp,
const struct sockaddr* addr,
unsigned int addrlen,
unsigned int flags) {
int err;
int on;
/* 验证是否设置了 IPv6-only 标志,如果是,则确保地址族为 AF_INET6 */
if ((flags & UV_TCP_IPV6ONLY) && addr->sa_family != AF_INET6)
return UV_EINVAL; /* 如果设置了 IPv6-only 但是地址不是 AF_INET6,返回错误 */
/* 创建一个新的 socket(如果还没有创建的话),并设置地址族 */
err = maybe_new_socket(tcp, addr->sa_family, 0);
if (err)
return err; /* 如果创建 socket 失败,返回错误码 */
/* 设置 SO_REUSEADDR 选项,允许快速重启 */
on = 1;
if (setsockopt(tcp->io_watcher.fd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)))
return UV__ERR(errno); /* 如果设置 socket 选项失败,返回错误码 */
#ifndef __OpenBSD__
#ifdef IPV6_V6ONLY
/* 如果地址族为 AF_INET6,则设置 IPv6-only 选项 */
if (addr->sa_family == AF_INET6) {
on = (flags & UV_TCP_IPV6ONLY) != 0;
if (setsockopt(tcp->io_watcher.fd,
IPPROTO_IPV6,
IPV6_V6ONLY,
&on,
sizeof on) == -1) {
#if defined(__MVS__)
/* 如果操作系统不支持此选项,则返回错误 */
if (errno == EOPNOTSUPP)
return UV_EINVAL;
#endif
return UV__ERR(errno); /* 如果设置 IPv6-only 选项失败,返回错误码 */
}
}
#endif
#endif
/* 清除 errno,避免 bind() 的错误被忽略 */
errno = 0;
/* 绑定 socket 到指定地址 */
err = bind(tcp->io_watcher.fd, addr, addrlen);
//bind的定义:int bind(int sockfd, const struct sockaddr *addr, socklen_t addrlen);
//从定义可推测 tcp->io_watcher.fd 的底层是socket描述符
if (err == -1 && errno != EADDRINUSE) {
if (errno == EAFNOSUPPORT) {
/* 如果操作系统不支持绑定给定地址族的地址,则返回错误 */
/* 在某些系统(如 macOS、其他 BSD 和 Solaris)上,当尝试将 AF_INET socket 绑定到 AF_INET6 地址或反之亦然时,会返回 EAFNOSUPPORT。 */
return UV_EINVAL;
}
return UV__ERR(errno); /* 如果绑定失败且不是因为地址已被使用,则返回错误码 */
}
/* 设置延迟错误,如果 bind() 返回 EADDRINUSE,则记录错误 */
tcp->delayed_error = (err == -1) ? UV__ERR(errno) : 0;
/* 设置标志位,表示已绑定 */
tcp->flags |= UV_HANDLE_BOUND;
/* 如果地址族为 AF_INET6,则设置 IPv6 标志位 */
if (addr->sa_family == AF_INET6)
tcp->flags |= UV_HANDLE_IPV6;
/* 成功绑定,返回 0 */
return 0;
}
根据uv__tcp_bind里的bind函数使用方式可以看出
//tcp->io_watcher.fd 表示底层的 socket 文件描述符
在 libuv 内部,对所有 I/O 操作进行了统一的抽象。在底层操作系统 I/O 操作的基础上,结合事件循环机制,实现了 IO 观察者。对应的数据结构是 uv__io_s,通过它可以知道 I/O 相关的信息,如可读、可写等。
在 uv_tcp_t 结构体中,io_watcher 成员就是一个 uv__io_s 类型的对象,它使 TCP 句柄具备了监测 I/O 状态的能力。当 TCP 连接的状态发生变化(如变为可读或可写),事件循环会根据这些变化触发相应的回调函数。
例如监测一个TCP连接,那么TCP handle就算是一个 IO 观察者,其实它是一个句柄的同时又是一个 IO 观察者。
uv_listen 函数的作用是在 libuv 中设置一个 stream 为监听模式。
uv_listen中的uv__tcp_listen
主要完成了以下功能:
int uv_listen(uv_stream_t* stream, int backlog, uv_connection_cb cb) {
int err;
/* 检查 stream 是否正在关闭 */
if (uv__is_closing(stream)) {
return UV_EINVAL; /* 如果 stream 正在关闭,则返回无效参数错误 */
}
/* 初始化错误码 */
err = ERROR_INVALID_PARAMETER;
/* 根据 stream 的类型选择不同的监听函数 */
switch (stream->type) {
case UV_TCP:
/* 如果是 TCP 类型的 stream,则调用 uv__tcp_listen */
err = uv__tcp_listen((uv_tcp_t*)stream, backlog, cb);
break;
case UV_NAMED_PIPE:
/* 如果是命名管道类型的 stream,则调用 uv__pipe_listen */
err = uv__pipe_listen((uv_pipe_t*)stream, backlog, cb);
break;
default:
/* 如果是未知类型,则断言失败 */
assert(0); /* 断言失败,不应该到达这里 */
}
/* 将系统错误码转换为 libuv 错误码 */
return uv_translate_sys_error(err);
}
int uv__tcp_listen(uv_tcp_t* handle, int backlog, uv_connection_cb cb) {
unsigned int i, simultaneous_accepts;
uv_tcp_accept_t* req;
int err;
/* 确保 backlog 大于 0 */
assert(backlog > 0);
/* 如果 handle 已经处于监听状态,则更新连接回调 */
if (handle->flags & UV_HANDLE_LISTENING) {
handle->stream.serv.connection_cb = cb;
}
/* 如果 handle 处于读取状态,则返回错误 */
if (handle->flags & UV_HANDLE_READING) {
return WSAEISCONN;
}
/* 如果 handle 有延迟错误,则返回延迟错误 */
if (handle->delayed_error) {
return handle->delayed_error;
}
/* 如果 handle 尚未绑定,则尝试绑定到任意 IP 地址 */
if (!(handle->flags & UV_HANDLE_BOUND)) {
err = uv__tcp_try_bind(handle,
(const struct sockaddr*) &uv_addr_ip4_any_,
sizeof(uv_addr_ip4_any_),
0);
if (err)
return err;
if (handle->delayed_error)
return handle->delayed_error;
}
/* 获取 acceptex 函数,如果不存在则返回错误 */
if (!handle->tcp.serv.func_acceptex) {
if (!uv__get_acceptex_function(handle->socket, &handle->tcp.serv.func_acceptex)) {
return WSAEAFNOSUPPORT;
}
}
/* 如果 handle 使用共享 socket,则跳过 listen 调用 */
/* 否则,执行 listen 调用 */
if (!(handle->flags & UV_HANDLE_SHARED_TCP_SOCKET) &&
listen(handle->socket, backlog) == SOCKET_ERROR) {
return WSAGetLastError();
}
/* 设置监听标志 */
handle->flags |= UV_HANDLE_LISTENING;
handle->stream.serv.connection_cb = cb;
INCREASE_ACTIVE_COUNT(loop, handle);
/* 计算同时接受的连接数 */
simultaneous_accepts = handle->flags & UV_HANDLE_TCP_SINGLE_ACCEPT ? 1
: uv_simultaneous_server_accepts;
/* 如果 accept 请求数组为空,则分配内存 */
if (handle->tcp.serv.accept_reqs == NULL) {
handle->tcp.serv.accept_reqs =
uv__malloc(uv_simultaneous_server_accepts * sizeof(uv_tcp_accept_t));
if (!handle->tcp.serv.accept_reqs) {
uv_fatal_error(ERROR_OUTOFMEMORY, "uv__malloc");
}
/* 初始化所有的 accept 请求 */
for (i = 0; i < simultaneous_accepts; i++) {
req = &handle->tcp.serv.accept_reqs[i];
UV_REQ_INIT(req, UV_ACCEPT);
req->accept_socket = INVALID_SOCKET;
req->data = handle;
req->wait_handle = INVALID_HANDLE_VALUE;
if (handle->flags & UV_HANDLE_EMULATE_IOCP) {
req->event_handle = CreateEvent(NULL, 0, 0, NULL);
if (req->event_handle == NULL) {
uv_fatal_error(GetLastError(), "CreateEvent");
}
} else {
req->event_handle = NULL;
}
/* 将 accept 请求加入队列 */
uv__tcp_queue_accept(handle, req);
}
/* 初始化剩余未使用的 accept 请求 */
for (i = simultaneous_accepts; i < uv_simultaneous_server_accepts; i++) {
req = &handle->tcp.serv.accept_reqs[i];
UV_REQ_INIT(req, UV_ACCEPT);
req->accept_socket = INVALID_SOCKET;
req->data = handle;
req->wait_handle = INVALID_HANDLE_VALUE;
req->event_handle = NULL;
}
}
/* 成功返回 */
return 0;
}
typedef struct { // 定义写请求结构体
uv_write_t req; // 写请求句柄
uv_buf_t buf; // 缓冲区结构体
} write_req_t;
void free_write_req(uv_write_t* req); // 自定义释放写请求的函数
// 函数实现
void free_write_req(uv_write_t* req) {
write_req_t* wr = (write_req_t*)req; // 转换为自定义类型
free(wr->buf.base); // 释放缓冲区内存
free(wr); // 释放写请求内存
}
void alloc_buffer(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf); // 分配缓冲区函数声明
// 函数实现
void alloc_buffer(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) {
buf->base = (char*)malloc(suggested_size); // 分配内存
buf->len = suggested_size; // 设置长度
}
void on_close(uv_handle_t* handle); // 关闭句柄后的回调函数声明
// 函数实现
void on_close(uv_handle_t* handle) {
free(handle); // 释放内存
}
void echo_write(uv_write_t* req, int status); // 写完成后的回调函数声明
// 函数实现
void echo_write(uv_write_t* req, int status) {
if (status) { // 如果有错误发生
fprintf(stderr, "Write error %s\n", uv_strerror(status)); // 打印错误信息
}
free_write_req(req); // 释放写请求
}
void echo_read(uv_stream_t* client, ssize_t nread, const uv_buf_t* buf); // 读取数据后的回调函数声明
// 函数实现
void echo_read(uv_stream_t* client, ssize_t nread, const uv_buf_t* buf) {
if (nread > 0) { // 如果读取的数据量大于0
write_req_t* req = (write_req_t*)malloc(sizeof(write_req_t)); // 分配写请求内存
req->buf = uv_buf_init(buf->base, nread); // 初始化缓冲区
uv_write((uv_write_t*)req, client, &req->buf, 1, echo_write); // 异步写入数据,并指定写完成后的回调
return;
}
if (nread < 0) { // 如果有错误发生
if (nread != UV_EOF) // 如果不是正常关闭连接
fprintf(stderr, "Read error %s\n", uv_err_name(nread)); // 打印错误信息
uv_close((uv_handle_t*)client, on_close); // 关闭客户端连接,并指定关闭后的回调
}
free(buf->base); // 释放读取的数据缓冲区
}
void on_new_connection(uv_stream_t* server, int status); // 新连接到达后的回调函数声明
// 函数实现
void on_new_connection(uv_stream_t* server, int status) {
if (status < 0) { // 如果有错误发生
fprintf(stderr, "New connection error %s\n", uv_strerror(status)); // 打印错误信息
return; // 返回
}
uv_tcp_t* client = (uv_tcp_t*)malloc(sizeof(uv_tcp_t)); // 分配客户端内存
uv_tcp_init(loop, client); // 初始化客户端
if (uv_accept(server, (uv_stream_t*)client) == 0) { // 接受连接
uv_read_start((uv_stream_t*)client, alloc_buffer, echo_read); // 开始读取数据,并指定读取后的回调
}
else {
uv_close((uv_handle_t*)client, on_close); // 如果接受失败,则关闭客户端
}
}
int uv__tcp_read_start(uv_tcp_t* handle, uv_alloc_cb alloc_cb, uv_read_cb read_cb) {
uv_loop_t* loop = handle->loop; // 获取当前 handle 所属的事件循环
// 设置读取标志
handle->flags |= UV_HANDLE_READING;
handle->read_cb = read_cb; // 设置读取回调函数
handle->alloc_cb = alloc_cb; // 设置缓冲区分配回调函数
INCREASE_ACTIVE_COUNT(loop, handle); // 增加活跃句柄计数
/* 如果读取被停止然后再次启动,可能仍然有一个读请求在等待。 */
if (!(handle->flags & UV_HANDLE_READ_PENDING)) {
if (handle->flags & UV_HANDLE_EMULATE_IOCP && // 如果使用 IOCP 模拟
handle->read_req.event_handle == NULL) { // 并且事件句柄还未创建
handle->read_req.event_handle = CreateEvent(NULL, 0, 0, NULL); // 创建事件句柄
if (handle->read_req.event_handle == NULL) { // 如果创建失败
uv_fatal_error(GetLastError(), "CreateEvent"); // 报告错误
}
}
uv__tcp_queue_read(loop, handle); // 将读请求加入队列
}
return 0; // 成功返回
}
int uv__tcp_write(uv_loop_t* loop, uv_write_t* req, uv_tcp_t* handle,
const uv_buf_t bufs[], unsigned int nbufs, uv_write_cb cb) {
int result;
DWORD bytes;
UV_REQ_INIT(req, UV_WRITE); // 初始化写请求
req->handle = (uv_stream_t*) handle; // 设置请求的句柄
req->cb = cb; // 设置写完成后的回调函数
/* 准备重叠结构 */
memset(&(req->u.io.overlapped), 0, sizeof(req->u.io.overlapped)); // 清空重叠结构
if (handle->flags & UV_HANDLE_EMULATE_IOCP) { // 如果使用 IOCP 模拟
req->event_handle = CreateEvent(NULL, 0, 0, NULL); // 创建事件句柄
if (req->event_handle == NULL) { // 如果创建失败
uv_fatal_error(GetLastError(), "CreateEvent"); // 报告错误
}
req->u.io.overlapped.hEvent = (HANDLE) ((ULONG_PTR) req->event_handle | 1); // 设置重叠结构的事件句柄
req->wait_handle = INVALID_HANDLE_VALUE; // 设置等待句柄为无效
}
result = WSASend(handle->socket, (WSABUF*) bufs, nbufs, &bytes, 0, &req->u.io.overlapped, NULL); // 发送数据
if (UV_SUCCEEDED_WITHOUT_IOCP(result == 0)) { // 如果发送立即完成
req->u.io.queued_bytes = 0; // 设置已排队字节数为0
handle->reqs_pending++; // 增加待处理请求计数
handle->stream.conn.write_reqs_pending++; // 增加写请求计数
REGISTER_HANDLE_REQ(loop, handle, req); // 注册请求到事件循环
uv__insert_pending_req(loop, (uv_req_t*) req); // 插入待处理请求
} else if (UV_SUCCEEDED_WITH_IOCP(result == 0)) { // 如果发送排队成功
req->u.io.queued_bytes = uv__count_bufs(bufs, nbufs); // 计算已排队字节数
handle->reqs_pending++; // 增加待处理请求计数
handle->stream.conn.write_reqs_pending++; // 增加写请求计数
REGISTER_HANDLE_REQ(loop, handle, req); // 注册请求到事件循环
handle->write_queue_size += req->u.io.queued_bytes; // 更新写入队列大小
if (handle->flags & UV_HANDLE_EMULATE_IOCP && // 如果使用 IOCP 模拟
!RegisterWaitForSingleObject(&req->wait_handle, req->event_handle, post_write_completion, (void*) req, INFINITE, WT_EXECUTEINWAITTHREAD | WT_EXECUTEONLYONCE)) { // 注册等待对象
SET_REQ_ERROR(req, GetLastError()); // 设置请求错误
uv__insert_pending_req(loop, (uv_req_t*)req); // 插入待处理请求
}
} else { // 如果发送失败
req->u.io.queued_bytes = 0; // 设置已排队字节数为0
handle->reqs_pending++; // 增加待处理请求计数
handle->stream.conn.write_reqs_pending++; // 增加写请求计数
REGISTER_HANDLE_REQ(loop, handle, req); // 注册请求到事件循环
SET_REQ_ERROR(req, WSAGetLastError()); // 设置请求错误
uv__insert_pending_req(loop, (uv_req_t*) req); // 插入待处理请求
}
return 0; // 成功返回
}
传入uv_loop_t事假循环的句柄,还有一个运行的模式,它的模式有3种,分别为默认模式、单次模式、不等待模式。
注意,这个函数不是线程安全的。
typedef enum {
UV_RUN_DEFAULT = 0,
UV_RUN_ONCE,
UV_RUN_NOWAIT
} uv_run_mode;
int uv_run(uv_loop_t* loop, uv_run_mode mode) {
int timeout;
int r;
int ran_pending;
r = uv__loop_alive(loop);
if (!r)
uv__update_time(loop);
/* 这是一个while循环 */
while (r != 0 && loop->stop_flag == 0) {
/* 更新时间并开始倒计时 */
uv__update_time(loop);
uv__run_timers(loop);
/* 处理挂起的handle */
ran_pending = uv__run_pending(loop);
/* 运行idle handle */
uv__run_idle(loop);
/* 运行prepare handle */
uv__run_prepare(loop);
timeout = 0;
if ((mode == UV_RUN_ONCE && !ran_pending) || mode == UV_RUN_DEFAULT)
timeout = uv_backend_timeout(loop);
/* 计算要阻塞的时间,开始阻塞 */
uv__io_poll(loop, timeout);
/* 程序执行到这里表示被唤醒了,被唤醒的原因可能是I/O可读可写、或者超时了,检查handle是否可以操作 */
uv__run_check(loop);
/* 看看是否有close的handle */
uv__run_closing_handles(loop);
/* 单次模式 */
if (mode == UV_RUN_ONCE) {
/* UV_RUN_ONCE implies forward progress: at least one callback must have
* been invoked when it returns. uv__io_poll() can return without doing
* I/O (meaning: no callbacks) when its timeout expires - which means we
* have pending timers that satisfy the forward progress constraint.
*
* UV_RUN_NOWAIT makes no guarantees about progress so it's omitted from
* the check.
*/
uv__update_time(loop);
uv__run_timers(loop);
}
/* handle保活处理 */
r = uv__loop_alive(loop);
if (mode == UV_RUN_ONCE || mode == UV_RUN_NOWAIT)
break;
}
/* The if statement lets gcc compile it to a conditional store. Avoids
* dirtying a cache line.
*/
if (loop->stop_flag != 0)
loop->stop_flag = 0;
return r;
}
额外介绍
释放所有内部循环资源。仅当循环完成执行并且所有打开的句柄和请求已关闭时才调用此函数,否则它将返回UV_EBUSY
。此函数返回后,用户可以释放为循环分配的内存。
int uv_loop_close(uv_loop_t* loop) {
QUEUE* q;
uv_handle_t* h;
#ifndef NDEBUG
void* saved_data;
#endif
/* 如果存在处于活跃状态的请求,则返回UV_EBUSY */
if (uv__has_active_reqs(loop))
return UV_EBUSY;
/* 如果存在处于活跃状态的handle,则返回UV_EBUSY */
QUEUE_FOREACH(q, &loop->handle_queue) {
h = QUEUE_DATA(q, uv_handle_t, handle_queue);
if (!(h->flags & UV_HANDLE_INTERNAL))
return UV_EBUSY;
}
/* 关闭事件循环 */
uv__loop_close(loop);
#ifndef NDEBUG
saved_data = loop->data;
memset(loop, -1, sizeof(*loop));
loop->data = saved_data;
#endif
if (loop == default_loop_ptr)
default_loop_ptr = NULL;
return 0;
}
注意,这个函数也不是线程安全的。
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <uv.h> // libuv 库的头文件
#define DEFAULT_PORT 7000// 默认监听端口
#define DEFAULT_BACKLOG 128 // 连接队列的默认大小
uv_loop_t* loop; // 初始化 libuv 事件循环
struct sockaddr_in addr; // 地址结构体
typedef struct { // 定义写请求结构体
uv_write_t req; // 写请求句柄
uv_buf_t buf; // 缓冲区结构体
} write_req_t;
void free_write_req(uv_write_t* req); // 自定义释放写请求的函数
// 函数实现
void free_write_req(uv_write_t* req) {
write_req_t* wr = (write_req_t*)req; // 转换为自定义类型
free(wr->buf.base); // 释放缓冲区内存
free(wr); // 释放写请求内存
}
void alloc_buffer(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf); // 分配缓冲区函数声明
// 函数实现
void alloc_buffer(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) {
buf->base = (char*)malloc(suggested_size); // 分配内存
buf->len = suggested_size; // 设置长度
}
void on_close(uv_handle_t* handle); // 关闭句柄后的回调函数声明
// 函数实现
void on_close(uv_handle_t* handle) {
free(handle); // 释放内存
}
void echo_write(uv_write_t* req, int status); // 写完成后的回调函数声明
// 函数实现
void echo_write(uv_write_t* req, int status) {
if (status) { // 如果有错误发生
fprintf(stderr, "Write error %s\n", uv_strerror(status)); // 打印错误信息
}
free_write_req(req); // 释放写请求
}
void echo_read(uv_stream_t* client, ssize_t nread, const uv_buf_t* buf); // 读取数据后的回调函数声明
// 函数实现
void echo_read(uv_stream_t* client, ssize_t nread, const uv_buf_t* buf) {
if (nread > 0) { // 如果读取的数据量大于0
write_req_t* req = (write_req_t*)malloc(sizeof(write_req_t)); // 分配写请求内存
req->buf = uv_buf_init(buf->base, nread); // 初始化缓冲区
uv_write((uv_write_t*)req, client, &req->buf, 1, echo_write); // 异步写入数据,并指定写完成后的回调
return;
}
if (nread < 0) { // 如果有错误发生
if (nread != UV_EOF) // 如果不是正常关闭连接
fprintf(stderr, "Read error %s\n", uv_err_name(nread)); // 打印错误信息
uv_close((uv_handle_t*)client, on_close); // 关闭客户端连接,并指定关闭后的回调
}
free(buf->base); // 释放读取的数据缓冲区
}
void on_new_connection(uv_stream_t* server, int status); // 新连接到达后的回调函数声明
// 函数实现
void on_new_connection(uv_stream_t* server, int status) {
if (status < 0) { // 如果有错误发生
fprintf(stderr, "New connection error %s\n", uv_strerror(status)); // 打印错误信息
return; // 返回
}
uv_tcp_t* client = (uv_tcp_t*)malloc(sizeof(uv_tcp_t)); // 分配客户端内存
uv_tcp_init(loop, client); // 初始化客户端
if (uv_accept(server, (uv_stream_t*)client) == 0) { // 接受连接
uv_read_start((uv_stream_t*)client, alloc_buffer, echo_read); // 开始读取数据,并指定读取后的回调
}
else {
uv_close((uv_handle_t*)client, on_close); // 如果接受失败,则关闭客户端
}
}
int main()
{
loop = uv_default_loop(); // 获取默认的事件循环
uv_tcp_t server; // 声明一个TCP句柄
uv_tcp_init(loop, &server); // 初始化服务端TCP句柄
uv_ip4_addr("0.0.0.0", DEFAULT_PORT, &addr); // 设置地址和端口
uv_tcp_bind(&server, (const struct sockaddr*)&addr, 0); // 绑定地址
// 开始监听,并指定新连接到达后的回调
int r = uv_listen((uv_stream_t*)&server, DEFAULT_BACKLOG, on_new_connection);
if (r) {
fprintf(stderr, "Listen error %s\n", uv_strerror(r)); // 如果有错误发生则打印错误信息
return 1; // 返回错误码
}
return uv_run(loop, UV_RUN_DEFAULT); // 启动事件循环
}
借助AI对tcp-echo-server略微分析了下,其实也不过就是网络编程那几步曲,只要像平常一样调用api就行了。
后面试着调用api写一个回声服务端和客户端,或者结合一些客户端相关开发工具写一个多人聊天室就libuv的tcp-echo-server学习就完结了。
关于windows下的网络编程可以参考我写的这篇
windows环境下C/C++的socket相关网络编程详解以及部分TCP详解
或者网上其他人的文章
参考文章
【libuv高效编程】libuv学习超详细教程3——libuv事件循环 - _杰杰_
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。