前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >libuv源码学习笔记:tcp-echo-server

libuv源码学习笔记:tcp-echo-server

原创
作者头像
晨星成焰
发布2024-10-20 18:06:06
1340
发布2024-10-20 18:06:06
举报
文章被收录于专栏:网络编程

前言

选择libuv库里的tcp-echo-server作为源码阅读开头的主要动机是

  • 简单性:回声服务端的功能非常简单,接收客户端发送的数据并原封不动地返回。这种功能的简单性使得代码更易于理解和跟踪。
  • 基础概念:通过实现一个回声服务端,可以学习到libuv类网络通信的基本操作,如套接字(socket)的创建、监听、接受连接请求、读写数据等。
  • 错误处理:尽管回声服务端相对简单,但它仍然涉及到一些常见的错误处理逻辑,比如处理连接失败、读写超时等问题。这可以帮助新手理解如何在实际应用中处理这些常见问题。
  • 可以逐步增加复杂度:一旦掌握了回声服务端的工作原理,可以在此基础上逐步增加复杂度,例如添加多线程支持以处理多个并发连接,或者实现更复杂的协议逻辑。
  • 可移植性和标准化:回声服务端通常基于标准的网络协议实现,这意味着可以通过这个过程了解到一些跨平台编程的基本知识,并且学会编写符合网络标准的代码。

首先我们要知道libuv的tcp-echo-server的main函数里干了什么

代码语言:cpp
复制
int main()
{
    loop = uv_default_loop(); // 获取默认的事件循环

    uv_tcp_t server; // 声明一个TCP句柄
    uv_tcp_init(loop, &server); // 初始化服务端TCP句柄

    uv_ip4_addr("0.0.0.0", DEFAULT_PORT, &addr); // 设置地址和端口

    uv_tcp_bind(&server, (const struct sockaddr*)&addr, 0); // 绑定地址
    
    // 开始监听,并指定新连接到达后的回调
    int r = uv_listen((uv_stream_t*)&server, DEFAULT_BACKLOG, on_new_connection); 
    if (r) {
        fprintf(stderr, "Listen error %s\n", uv_strerror(r)); // 如果有错误发生则打印错误信息
        return 1; // 返回错误码
    }
    return uv_run(loop, UV_RUN_DEFAULT); // 启动事件循环
}
  1. 初始化libuv事件循环 (loop = uv_default_loop();)
  2. 初始化服务器TCP句柄 (uv_tcp_init(loop, &server);)
  3. 设置服务器地址和端口 (uv_ip4_addr("0.0.0.0", DEFAULT_PORT, &addr);)
  4. 绑定服务器到地址 (uv_tcp_bind(&server, (const struct sockaddr*)&addr, 0);)
  5. 开始监听新的连接 (uv_listen((uv_stream_t*)&server, DEFAULT_BACKLOG, on_new_connection);)
  • 如果监听成功:
    • 启动事件循环 (uv_run(loop, UV_RUN_DEFAULT);)
  • 如果监听失败:
    • 打印错误信息 (fprintf(stderr, "Listen error %s\n", uv_strerror(r));)
    • 返回错误码 (return 1;)

流程图
流程图

看一下 uv_loop_t 结构体和uv_default_loop函数

事件循环(Event Loop)

事件循环是 libuv 功能的核心部分,它负责对 I/O 进行轮询,并基于不同的事件源执行它们的回调函数。libuv 的设计目标之一是为了让异步 I/O 操作变得简单易用,同时保持高性能。

uv_loop_t 是 libuv 中的一个核心结构,用于管理事件循环及其相关的所有资源。它是一个句柄类型,代表了一个事件循环实例。每个 uv_loop_t 对象管理着同一事件循环上的所有资源,并在整个事件循环的生命周期内都是可用的。

uv_loop_t 的作用

uv_loop_t 作为事件循环所有资源的统一入口,所有在事件循环上运行的各类 Handle 和 Request 实例都被注册到 uv_loop_t 内部定义的数据结构中。这使得 libuv 能够有效地管理各种 I/O 操作和其他异步任务。

uv_default_loop中uv_loop_init()的作用

这个函数就是将uv_loop_t初始化,给这个loop对象初始化一些默认的成员变量,比如初始化定时器、工作队列、观察者队列等。

代码语言:cpp
复制
/* Handle types. */
typedef struct uv_loop_s uv_loop_t;

struct uv_loop_s {
    /* 用户数据 - 可以用这个字段存储任意数据。 */
    void* data; // 用户可以用来存储指向任意数据的指针。

    /* 活跃状态句柄计数器 */
    unsigned int active_handles; // 当前活跃的句柄数量。libuv 使用这个字段来跟踪事件循环中的活跃句柄数量,当所有句柄都被关闭时,事件循环也会停止。

    void* handle_queue[2]; // handle 双向队列 存储在 loop 中的哨兵头节点 句柄队列。用于存放等待处理的句柄,包括需要关闭的句柄和需要激活的句柄。

    union {
        void* unused; // 保留字段
        unsigned int count; // 活跃请求的数量。用于跟踪当前活跃的请求(如定时器、异步请求等)的数量。
    } active_reqs; // req 资源计数器 请求的活跃状态。这是一个联合体,可以用来存储活跃请求的数量或者作为未使用的内存。

    /* 为将来扩展保留的内部存储空间。 */
    void* internal_fields; // 为将来版本可能添加的新功能预留的空间。

    /* 内部标志位,用来指示事件循环何时停止。 */
    unsigned int stop_flag; // 停止标志。当设置此标志时,表示事件循环应该停止。

    UV_LOOP_PRIVATE_FIELDS // 私有字段。这是 libuv 内部使用的字段,对于用户来说是不可见的,具体实现细节可能因版本和平台而不同。
};

static uv_loop_t default_loop_struct; // 用于存储默认事件循环的结构体
static uv_loop_t* default_loop_ptr = NULL; // 指向默认事件循环的指针

uv_loop_t* uv_default_loop(void) {
    // 防止重复初始化
    if (default_loop_ptr != NULL)
        return default_loop_ptr;

    // 初始化uv事件循环  uv_loop_init里有具体的初始化细节
    if (uv_loop_init(&default_loop_struct))
        return NULL;

    // 将初始化后的事件循环结构体,赋值给默认事件循环指针并返回
    default_loop_ptr = &default_loop_struct;
    return default_loop_ptr;
}

int uv_loop_init(uv_loop_t* loop) {
    uv__loop_internal_fields_t* lfields; // 用于存放内部字段的指针
    struct heap* timer_heap; // 用于存放定时器堆的指针
    int err; // 存放错误码的变量

    /* 初始化 libuv 本身 */
    uv__once_init(); // 初始化一次性的全局资源

    /* 创建一个 I/O 完成端口 */
    loop->iocp = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 1);
    if (loop->iocp == NULL)
        return uv_translate_sys_error(GetLastError()); // 如果创建失败,则返回错误码

    lfields = (uv__loop_internal_fields_t*)uv__calloc(1, sizeof(*lfields));
    if (lfields == NULL)
        return UV_ENOMEM; // 分配内部字段内存,如果失败返回内存不足错误
    loop->internal_fields = lfields; // 将分配的内存赋值给事件循环的内部字段

    err = uv_mutex_init(&lfields->loop_metrics.lock); // 初始化互斥锁
    if (err)
        goto fail_metrics_mutex_init; // 如果初始化失败,跳转到错误处理部分

    memset(&lfields->loop_metrics.metrics,
        0,
        sizeof(lfields->loop_metrics.metrics)); // 初始化度量结构体

    /* 防止未初始化内存访问,必须在首次调用 uv_update_time 之前初始化 loop->time */
    loop->time = 0;
    uv_update_time(loop); // 更新当前时间戳

    QUEUE_INIT(&loop->wq); // 初始化工作队列
    QUEUE_INIT(&loop->handle_queue); // 初始化句柄队列
    loop->active_reqs.count = 0; // 初始化活跃请求计数器
    loop->active_handles = 0; // 初始化活跃句柄计数器

    loop->pending_reqs_tail = NULL; // 初始化挂起请求尾指针

    loop->endgame_handles = NULL; // 初始化结束游戏句柄列表

    loop->timer_heap = timer_heap = uv__malloc(sizeof(*timer_heap)); // 分配定时器堆内存
    if (timer_heap == NULL) {
        err = UV_ENOMEM;
        goto fail_timers_alloc; // 如果分配失败,跳转到错误处理部分
    }

    heap_init(timer_heap); // 初始化定时器堆

    loop->check_handles = NULL; // 初始化检查句柄列表
    loop->prepare_handles = NULL; // 初始化准备句柄列表
    loop->idle_handles = NULL; // 初始化空闲句柄列表

    loop->next_prepare_handle = NULL; // 初始化下一个准备句柄指针
    loop->next_check_handle = NULL; // 初始化下一个检查句柄指针
    loop->next_idle_handle = NULL; // 初始化下一个空闲句柄指针

    memset(&loop->poll_peer_sockets, 0, sizeof loop->poll_peer_sockets); // 初始化轮询对等方套接字列表

    loop->timer_counter = 0; // 初始化定时器计数器
    loop->stop_flag = 0; // 初始化停止标志

    err = uv_mutex_init(&loop->wq_mutex); // 初始化工作队列互斥锁
    if (err)
        goto fail_mutex_init; // 如果初始化失败,跳转到错误处理部分

    err = uv_async_init(loop, &loop->wq_async, uv__work_done); // 初始化异步句柄用于执行工作
    if (err)
        goto fail_async_init; // 如果初始化失败,跳转到错误处理部分

    uv__handle_unref(&loop->wq_async); // 使工作队列异步句柄不受引用计数影响
    loop->wq_async.flags |= UV_HANDLE_INTERNAL; // 标记为内部句柄

    err = uv__loops_add(loop); // 将事件循环添加到 libuv 的内部列表中
    if (err)
        goto fail_async_init; // 如果添加失败,跳转到错误处理部分

    return 0; // 成功返回

fail_async_init:
    uv_mutex_destroy(&loop->wq_mutex); // 如果异步初始化失败,销毁工作队列互斥锁

fail_mutex_init:
    uv__free(timer_heap); // 销毁定时器堆内存
    loop->timer_heap = NULL; // 清空定时器堆指针

fail_timers_alloc:
    uv_mutex_destroy(&lfields->loop_metrics.lock); // 销毁度量互斥锁

fail_metrics_mutex_init:
    uv__free(lfields); // 销毁内部字段内存
    loop->internal_fields = NULL; // 清空内部字段指针
    CloseHandle(loop->iocp); // 关闭 I/O 完成端口
    loop->iocp = INVALID_HANDLE_VALUE; // 清空 I/O 完成端口指针

    return err; // 返回错误码
}


uv_tcp_t和uv_tcp_init

uv_tcp_t是 libuv 中用于表示 TCP 句柄的数据结构。

uv_tcp_inituv_tcp_init_ex 函数负责初始化 TCP 句柄,并根据指定的地址族创建 socket。

代码语言:cpp
复制
typedef struct uv_tcp_s uv_tcp_t;


struct uv_tcp_s {
  UV_HANDLE_FIELDS  //通用句柄字段
  UV_STREAM_FIELDS  //通用流读写处理字段
  UV_TCP_PRIVATE_FIELDS  //TCP 句柄特有的私有字段
};

#define UV_TCP_PRIVATE_FIELDS                                                 \
  SOCKET socket;                                                              \
  int delayed_error;                                                          \
  union {                                                                     \
    struct { uv_tcp_server_fields } serv;                                     \
    struct { uv_tcp_connection_fields } conn;                                 \
  } tcp;

//SOCKET socket:
//TCP 句柄对应的 socket 文件描述符。
//int delayed_error:
//用于存储延迟错误的状态。
//union { struct { uv_tcp_server_fields } serv; struct { uv_tcp_connection_fields } conn; } tcp:
//用于区分 TCP 服务器句柄和 TCP 连接句柄的不同字段。

int uv_tcp_init(uv_loop_t* loop, uv_tcp_t* handle) {
    return uv_tcp_init_ex(loop, handle, AF_UNSPEC); // 调用 uv_tcp_init_ex 函数,使用 AF_UNSPEC 作为地址族
}

int uv_tcp_init_ex(uv_loop_t* loop, uv_tcp_t* handle, unsigned int flags) {
    int domain; // 存储地址族(协议族)

    /* 使用低八位来存储地址族 */
    domain = flags & 0xFF;
    if (domain != AF_INET && domain != AF_INET6 && domain != AF_UNSPEC)
        return UV_EINVAL; // 如果地址族不是 AF_INET、AF_INET6 或 AF_UNSPEC,则返回无效参数错误

    if (flags & ~0xFF)
        return UV_EINVAL; // 如果标志位中除了低八位以外还有其他位被设置了,则返回无效参数错误

    uv__stream_init(loop, (uv_stream_t*)handle, UV_TCP); // 初始化流句柄为 TCP 类型
    handle->tcp.serv.accept_reqs = NULL; // 初始化 accept 请求队列为 NULL
    handle->tcp.serv.pending_accepts = NULL; // 初始化挂起的 accept 请求队列为 NULL
    handle->socket = INVALID_SOCKET; // 初始化 socket 描述符为无效值
    handle->reqs_pending = 0; // 初始化待处理请求计数为 0
    handle->tcp.serv.func_acceptex = NULL; // 初始化 accept 函数为 NULL
    handle->tcp.conn.func_connectex = NULL; // 初始化 connect 函数为 NULL
    handle->tcp.serv.processed_accepts = 0; // 初始化已处理的 accept 计数为 0
    handle->delayed_error = 0; // 初始化延迟错误为 0

    /* 如果在此点之后有任何失败,我们需要从句柄队列中移除句柄,
     * 因为它在 uv__stream_init 中被加入到了句柄队列中。
     */

    if (domain != AF_UNSPEC) { // 如果指定了地址族
        SOCKET sock; // 创建一个 socket 描述符
        DWORD err; // 存储错误码

        sock = socket(domain, SOCK_STREAM, 0); // 创建一个 socket
        if (sock == INVALID_SOCKET) { // 如果创建失败
            err = WSAGetLastError(); // 获取错误码
            QUEUE_REMOVE(&handle->handle_queue); // 从句柄队列中移除句柄
            return uv_translate_sys_error(err); // 返回转换后的错误码
        }

        err = uv__tcp_set_socket(handle->loop, handle, sock, domain, 0); // 设置 socket 描述符
        if (err) { // 如果设置失败
            closesocket(sock); // 关闭 socket
            QUEUE_REMOVE(&handle->handle_queue); // 从句柄队列中移除句柄
            return uv_translate_sys_error(err); // 返回转换后的错误码
        }
    }

    return 0; // 成功返回 0
}

uv_ip4_addr

uv_ip4_addr 类似常规网络编程下的绑定地址的前置操作

uv_inet_pton(AF_INET, ip, &(addr->sin_addr.s_addr));:将字符串形式的 IP 地址转换为二进制格式,并填充到sin_addr字段。

代码语言:cpp
复制

int uv_ip4_addr(const char* ip, int port, struct sockaddr_in* addr) {
  memset(addr, 0, sizeof(*addr)); // 清零结构体
  addr->sin_family = AF_INET; // 设置地址族为 IPv4
  addr->sin_port = htons(port); // 设置端口号
#ifdef SIN6_LEN
  addr->sin_len = sizeof(*addr); // 设置长度(IPv6 特性)
#endif
  //许多网络库里都有的操作
  return uv_inet_pton(AF_INET, ip, &(addr->sin_addr.s_addr)); // 将 IP 地址转换为二进制格式,以便使用
}
  

uv_tcp_bind

实现了将一个 TCP 句柄绑定到指定地址的功能。首先验证输入参数,然后创建或重用 socket,并设置必要的 socket 选项。接着,尝试绑定 socket 到指定地址,并根据结果设置相应的标志。最后返回绑定的结果。

代码语言:cpp
复制
int uv_tcp_bind(uv_tcp_t* handle,
    const struct sockaddr* addr,
    unsigned int flags) {
    unsigned int addrlen;

    // 检查句柄类型是否为 TCP
    if (handle->type != UV_TCP)
        return UV_EINVAL; // 如果不是 TCP 类型,则返回无效参数错误

    // 检查句柄是否正在关闭
    if (uv__is_closing(handle)) {
        return UV_EINVAL; // 如果句柄正在关闭,则返回无效参数错误
    }

    // 根据地址族确定地址长度
    if (addr->sa_family == AF_INET) {
        addrlen = sizeof(struct sockaddr_in); // IPv4 地址长度
    }
    else if (addr->sa_family == AF_INET6) {
        addrlen = sizeof(struct sockaddr_in6); // IPv6 地址长度
    }
    else {
        return UV_EINVAL; // 如果地址族既不是 AF_INET 也不是 AF_INET6,则返回无效参数错误
    }

    // 调用内部函数绑定地址 执行类似网络编程中调用bind函数的操作
    return uv__tcp_bind(handle, addr, addrlen, flags); // 返回绑定结果
}

int uv__tcp_bind(uv_tcp_t* tcp,
                 const struct sockaddr* addr,
                 unsigned int addrlen,
                 unsigned int flags) {
  int err;
  int on;

  /* 验证是否设置了 IPv6-only 标志,如果是,则确保地址族为 AF_INET6 */
  if ((flags & UV_TCP_IPV6ONLY) && addr->sa_family != AF_INET6)
    return UV_EINVAL; /* 如果设置了 IPv6-only 但是地址不是 AF_INET6,返回错误 */

  /* 创建一个新的 socket(如果还没有创建的话),并设置地址族 */
  err = maybe_new_socket(tcp, addr->sa_family, 0);
  if (err)
    return err; /* 如果创建 socket 失败,返回错误码 */

  /* 设置 SO_REUSEADDR 选项,允许快速重启 */
  on = 1;
  if (setsockopt(tcp->io_watcher.fd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)))
    return UV__ERR(errno); /* 如果设置 socket 选项失败,返回错误码 */

  #ifndef __OpenBSD__
  #ifdef IPV6_V6ONLY
    /* 如果地址族为 AF_INET6,则设置 IPv6-only 选项 */
    if (addr->sa_family == AF_INET6) {
      on = (flags & UV_TCP_IPV6ONLY) != 0;
      if (setsockopt(tcp->io_watcher.fd,
                     IPPROTO_IPV6,
                     IPV6_V6ONLY,
                     &on,
                     sizeof on) == -1) {

#if defined(__MVS__)
        /* 如果操作系统不支持此选项,则返回错误 */
        if (errno == EOPNOTSUPP)
          return UV_EINVAL;
#endif
        return UV__ERR(errno); /* 如果设置 IPv6-only 选项失败,返回错误码 */
      }
    }
  #endif
  #endif

  /* 清除 errno,避免 bind() 的错误被忽略 */
  errno = 0;

  /* 绑定 socket 到指定地址 */
  err = bind(tcp->io_watcher.fd, addr, addrlen);
//bind的定义:int bind(int sockfd, const struct sockaddr *addr, socklen_t addrlen);
//从定义可推测 tcp->io_watcher.fd 的底层是socket描述符
  if (err == -1 && errno != EADDRINUSE) {
    if (errno == EAFNOSUPPORT) {
      /* 如果操作系统不支持绑定给定地址族的地址,则返回错误 */
      /* 在某些系统(如 macOS、其他 BSD 和 Solaris)上,当尝试将 AF_INET socket 绑定到 AF_INET6 地址或反之亦然时,会返回 EAFNOSUPPORT。 */
      return UV_EINVAL;
    }
    return UV__ERR(errno); /* 如果绑定失败且不是因为地址已被使用,则返回错误码 */
  }

  /* 设置延迟错误,如果 bind() 返回 EADDRINUSE,则记录错误 */
  tcp->delayed_error = (err == -1) ? UV__ERR(errno) : 0;

  /* 设置标志位,表示已绑定 */
  tcp->flags |= UV_HANDLE_BOUND;

  /* 如果地址族为 AF_INET6,则设置 IPv6 标志位 */
  if (addr->sa_family == AF_INET6)
    tcp->flags |= UV_HANDLE_IPV6;

  /* 成功绑定,返回 0 */
  return 0;
}

根据uv__tcp_bind里的bind函数使用方式可以看出

//tcp->io_watcher.fd 表示底层的 socket 文件描述符

IO 观察者(io_watcher)

在 libuv 内部,对所有 I/O 操作进行了统一的抽象。在底层操作系统 I/O 操作的基础上,结合事件循环机制,实现了 IO 观察者。对应的数据结构是 uv__io_s,通过它可以知道 I/O 相关的信息,如可读、可写等。

uv_tcp_t 结构体中,io_watcher 成员就是一个 uv__io_s 类型的对象,它使 TCP 句柄具备了监测 I/O 状态的能力。当 TCP 连接的状态发生变化(如变为可读或可写),事件循环会根据这些变化触发相应的回调函数。

例如监测一个TCP连接,那么TCP handle就算是一个 IO 观察者,其实它是一个句柄的同时又是一个 IO 观察者。

uv_listen

uv_listen 函数的作用是在 libuv 中设置一个 stream 为监听模式。

uv_listen中的uv__tcp_listen

主要完成了以下功能:

  • 检查和处理输入参数的有效性。
  • 设置监听标志,并更新连接回调函数。
  • 分配和初始化必要的资源,以便能够接受新的连接请求。
  • 确保在开始监听前,socket 已经绑定到一个地址,并且没有其他错误。
代码语言:cpp
复制
int uv_listen(uv_stream_t* stream, int backlog, uv_connection_cb cb) {
  int err;

  /* 检查 stream 是否正在关闭 */
  if (uv__is_closing(stream)) {
    return UV_EINVAL; /* 如果 stream 正在关闭,则返回无效参数错误 */
  }

  /* 初始化错误码 */
  err = ERROR_INVALID_PARAMETER;

  /* 根据 stream 的类型选择不同的监听函数 */
  switch (stream->type) {
    case UV_TCP:
      /* 如果是 TCP 类型的 stream,则调用 uv__tcp_listen */
      err = uv__tcp_listen((uv_tcp_t*)stream, backlog, cb);
      break;
    case UV_NAMED_PIPE:
      /* 如果是命名管道类型的 stream,则调用 uv__pipe_listen */
      err = uv__pipe_listen((uv_pipe_t*)stream, backlog, cb);
      break;
    default:
      /* 如果是未知类型,则断言失败 */
      assert(0); /* 断言失败,不应该到达这里 */
  }

  /* 将系统错误码转换为 libuv 错误码 */
  return uv_translate_sys_error(err);
}

int uv__tcp_listen(uv_tcp_t* handle, int backlog, uv_connection_cb cb) {
  unsigned int i, simultaneous_accepts;
  uv_tcp_accept_t* req;
  int err;

  /* 确保 backlog 大于 0 */
  assert(backlog > 0);

  /* 如果 handle 已经处于监听状态,则更新连接回调 */
  if (handle->flags & UV_HANDLE_LISTENING) {
    handle->stream.serv.connection_cb = cb;
  }

  /* 如果 handle 处于读取状态,则返回错误 */
  if (handle->flags & UV_HANDLE_READING) {
    return WSAEISCONN;
  }

  /* 如果 handle 有延迟错误,则返回延迟错误 */
  if (handle->delayed_error) {
    return handle->delayed_error;
  }

  /* 如果 handle 尚未绑定,则尝试绑定到任意 IP 地址 */
  if (!(handle->flags & UV_HANDLE_BOUND)) {
    err = uv__tcp_try_bind(handle,
                           (const struct sockaddr*) &uv_addr_ip4_any_,
                           sizeof(uv_addr_ip4_any_),
                           0);
    if (err)
      return err;
    if (handle->delayed_error)
      return handle->delayed_error;
  }

  /* 获取 acceptex 函数,如果不存在则返回错误 */
  if (!handle->tcp.serv.func_acceptex) {
    if (!uv__get_acceptex_function(handle->socket, &handle->tcp.serv.func_acceptex)) {
      return WSAEAFNOSUPPORT;
    }
  }

  /* 如果 handle 使用共享 socket,则跳过 listen 调用 */
  /* 否则,执行 listen 调用 */
  if (!(handle->flags & UV_HANDLE_SHARED_TCP_SOCKET) &&
      listen(handle->socket, backlog) == SOCKET_ERROR) {
    return WSAGetLastError();
  }

  /* 设置监听标志 */
  handle->flags |= UV_HANDLE_LISTENING;
  handle->stream.serv.connection_cb = cb;
  INCREASE_ACTIVE_COUNT(loop, handle);

  /* 计算同时接受的连接数 */
  simultaneous_accepts = handle->flags & UV_HANDLE_TCP_SINGLE_ACCEPT ? 1
    : uv_simultaneous_server_accepts;

  /* 如果 accept 请求数组为空,则分配内存 */
  if (handle->tcp.serv.accept_reqs == NULL) {
    handle->tcp.serv.accept_reqs =
      uv__malloc(uv_simultaneous_server_accepts * sizeof(uv_tcp_accept_t));
    if (!handle->tcp.serv.accept_reqs) {
      uv_fatal_error(ERROR_OUTOFMEMORY, "uv__malloc");
    }

    /* 初始化所有的 accept 请求 */
    for (i = 0; i < simultaneous_accepts; i++) {
      req = &handle->tcp.serv.accept_reqs[i];
      UV_REQ_INIT(req, UV_ACCEPT);
      req->accept_socket = INVALID_SOCKET;
      req->data = handle;

      req->wait_handle = INVALID_HANDLE_VALUE;
      if (handle->flags & UV_HANDLE_EMULATE_IOCP) {
        req->event_handle = CreateEvent(NULL, 0, 0, NULL);
        if (req->event_handle == NULL) {
          uv_fatal_error(GetLastError(), "CreateEvent");
        }
      } else {
        req->event_handle = NULL;
      }

      /* 将 accept 请求加入队列 */
      uv__tcp_queue_accept(handle, req);
    }

    /* 初始化剩余未使用的 accept 请求 */
    for (i = simultaneous_accepts; i < uv_simultaneous_server_accepts; i++) {
      req = &handle->tcp.serv.accept_reqs[i];
      UV_REQ_INIT(req, UV_ACCEPT);
      req->accept_socket = INVALID_SOCKET;
      req->data = handle;
      req->wait_handle = INVALID_HANDLE_VALUE;
      req->event_handle = NULL;
    }
  }

  /* 成功返回 */
  return 0;
}

自定义的一些回调函数

  • 监听新连接
    • 当服务器接收到新的连接请求时,调用 on_new_connection 回调函数。
  • 接受连接
    • 使用 uv_accept 接受连接,并初始化客户端句柄。
    • 开始读取客户端的数据,调用 echo_read 函数。
  • 读取并写回数据
    • 当从客户端读取到数据时,创建一个 write_req_t 结构体,并将数据写回客户端。
    • 写入完成后调用 echo_write 函数释放内存。
  • 处理错误
    • 如果在读取过程中发生错误(除了正常的连接关闭),关闭客户端连接,并释放资源。
    • 如果读取到的数据量为零(即客户端关闭连接),关闭客户端连接,并释放资源。
代码语言:cpp
复制
typedef struct { // 定义写请求结构体
    uv_write_t req; // 写请求句柄
    uv_buf_t buf;   // 缓冲区结构体
} write_req_t;

void free_write_req(uv_write_t* req); // 自定义释放写请求的函数
// 函数实现
void free_write_req(uv_write_t* req) {
    write_req_t* wr = (write_req_t*)req; // 转换为自定义类型
    free(wr->buf.base); // 释放缓冲区内存
    free(wr); // 释放写请求内存
}

void alloc_buffer(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf); // 分配缓冲区函数声明
// 函数实现
void alloc_buffer(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) {
    buf->base = (char*)malloc(suggested_size); // 分配内存
    buf->len = suggested_size; // 设置长度
}

void on_close(uv_handle_t* handle); // 关闭句柄后的回调函数声明
// 函数实现
void on_close(uv_handle_t* handle) {
    free(handle); // 释放内存
}

void echo_write(uv_write_t* req, int status); // 写完成后的回调函数声明
// 函数实现
void echo_write(uv_write_t* req, int status) {
    if (status) { // 如果有错误发生
        fprintf(stderr, "Write error %s\n", uv_strerror(status)); // 打印错误信息
    }
    free_write_req(req); // 释放写请求
}

void echo_read(uv_stream_t* client, ssize_t nread, const uv_buf_t* buf); // 读取数据后的回调函数声明
// 函数实现
void echo_read(uv_stream_t* client, ssize_t nread, const uv_buf_t* buf) {
    if (nread > 0) { // 如果读取的数据量大于0
        write_req_t* req = (write_req_t*)malloc(sizeof(write_req_t)); // 分配写请求内存
        req->buf = uv_buf_init(buf->base, nread); // 初始化缓冲区
        uv_write((uv_write_t*)req, client, &req->buf, 1, echo_write); // 异步写入数据,并指定写完成后的回调
        return;
    }
    if (nread < 0) { // 如果有错误发生
        if (nread != UV_EOF) // 如果不是正常关闭连接
            fprintf(stderr, "Read error %s\n", uv_err_name(nread)); // 打印错误信息
        uv_close((uv_handle_t*)client, on_close); // 关闭客户端连接,并指定关闭后的回调
    }

    free(buf->base); // 释放读取的数据缓冲区
}

void on_new_connection(uv_stream_t* server, int status); // 新连接到达后的回调函数声明
// 函数实现
void on_new_connection(uv_stream_t* server, int status) {
    if (status < 0) { // 如果有错误发生
        fprintf(stderr, "New connection error %s\n", uv_strerror(status)); // 打印错误信息
        return; // 返回
    }

    uv_tcp_t* client = (uv_tcp_t*)malloc(sizeof(uv_tcp_t)); // 分配客户端内存
    uv_tcp_init(loop, client); // 初始化客户端
    if (uv_accept(server, (uv_stream_t*)client) == 0) { // 接受连接
        uv_read_start((uv_stream_t*)client, alloc_buffer, echo_read); // 开始读取数据,并指定读取后的回调
    }
    else {
        uv_close((uv_handle_t*)client, on_close); // 如果接受失败,则关闭客户端
    }
}

关注一下读写相关api的

uv__tcp_read_start

代码语言:cpp
复制
int uv__tcp_read_start(uv_tcp_t* handle, uv_alloc_cb alloc_cb, uv_read_cb read_cb) {
  uv_loop_t* loop = handle->loop; // 获取当前 handle 所属的事件循环

  // 设置读取标志
  handle->flags |= UV_HANDLE_READING;
  handle->read_cb = read_cb; // 设置读取回调函数
  handle->alloc_cb = alloc_cb; // 设置缓冲区分配回调函数
  INCREASE_ACTIVE_COUNT(loop, handle); // 增加活跃句柄计数

  /* 如果读取被停止然后再次启动,可能仍然有一个读请求在等待。 */
  if (!(handle->flags & UV_HANDLE_READ_PENDING)) {
    if (handle->flags & UV_HANDLE_EMULATE_IOCP && // 如果使用 IOCP 模拟
        handle->read_req.event_handle == NULL) { // 并且事件句柄还未创建
      handle->read_req.event_handle = CreateEvent(NULL, 0, 0, NULL); // 创建事件句柄
      if (handle->read_req.event_handle == NULL) { // 如果创建失败
        uv_fatal_error(GetLastError(), "CreateEvent"); // 报告错误
      }
    }
    uv__tcp_queue_read(loop, handle); // 将读请求加入队列
  }

  return 0; // 成功返回
}

uv__tcp_write

代码语言:cpp
复制
int uv__tcp_write(uv_loop_t* loop, uv_write_t* req, uv_tcp_t* handle,
 const uv_buf_t bufs[], unsigned int nbufs, uv_write_cb cb) {
  int result;
  DWORD bytes;

  UV_REQ_INIT(req, UV_WRITE); // 初始化写请求
  req->handle = (uv_stream_t*) handle; // 设置请求的句柄
  req->cb = cb; // 设置写完成后的回调函数

  /* 准备重叠结构 */
  memset(&(req->u.io.overlapped), 0, sizeof(req->u.io.overlapped)); // 清空重叠结构
  if (handle->flags & UV_HANDLE_EMULATE_IOCP) { // 如果使用 IOCP 模拟
    req->event_handle = CreateEvent(NULL, 0, 0, NULL); // 创建事件句柄
    if (req->event_handle == NULL) { // 如果创建失败
      uv_fatal_error(GetLastError(), "CreateEvent"); // 报告错误
    }
    req->u.io.overlapped.hEvent = (HANDLE) ((ULONG_PTR) req->event_handle | 1); // 设置重叠结构的事件句柄
    req->wait_handle = INVALID_HANDLE_VALUE; // 设置等待句柄为无效
  }

  result = WSASend(handle->socket, (WSABUF*) bufs, nbufs, &bytes, 0, &req->u.io.overlapped, NULL); // 发送数据

  if (UV_SUCCEEDED_WITHOUT_IOCP(result == 0)) { // 如果发送立即完成
    req->u.io.queued_bytes = 0; // 设置已排队字节数为0
    handle->reqs_pending++; // 增加待处理请求计数
    handle->stream.conn.write_reqs_pending++; // 增加写请求计数
    REGISTER_HANDLE_REQ(loop, handle, req); // 注册请求到事件循环
    uv__insert_pending_req(loop, (uv_req_t*) req); // 插入待处理请求
  } else if (UV_SUCCEEDED_WITH_IOCP(result == 0)) { // 如果发送排队成功
    req->u.io.queued_bytes = uv__count_bufs(bufs, nbufs); // 计算已排队字节数
    handle->reqs_pending++; // 增加待处理请求计数
    handle->stream.conn.write_reqs_pending++; // 增加写请求计数
    REGISTER_HANDLE_REQ(loop, handle, req); // 注册请求到事件循环
    handle->write_queue_size += req->u.io.queued_bytes; // 更新写入队列大小
    if (handle->flags & UV_HANDLE_EMULATE_IOCP && // 如果使用 IOCP 模拟
        !RegisterWaitForSingleObject(&req->wait_handle, req->event_handle, post_write_completion, (void*) req, INFINITE, WT_EXECUTEINWAITTHREAD | WT_EXECUTEONLYONCE)) { // 注册等待对象
      SET_REQ_ERROR(req, GetLastError()); // 设置请求错误
      uv__insert_pending_req(loop, (uv_req_t*)req); // 插入待处理请求
    }
  } else { // 如果发送失败
    req->u.io.queued_bytes = 0; // 设置已排队字节数为0
    handle->reqs_pending++; // 增加待处理请求计数
    handle->stream.conn.write_reqs_pending++; // 增加写请求计数
    REGISTER_HANDLE_REQ(loop, handle, req); // 注册请求到事件循环
    SET_REQ_ERROR(req, WSAGetLastError()); // 设置请求错误
    uv__insert_pending_req(loop, (uv_req_t*) req); // 插入待处理请求
  }

  return 0; // 成功返回
}

uv_run

传入uv_loop_t事假循环的句柄,还有一个运行的模式,它的模式有3种,分别为默认模式、单次模式、不等待模式。

  • 默认模式UV_RUN_DEFAULT:运行事件循环,直到不再有活动的和引用的句柄或请求为止。
  • 单次模式UV_RUN_ONCE:轮询一次I/O,如果没有待处理的回调,则进入阻塞状态,完成处理后返回零,不继续运行事件循环。
  • 不等待模式UV_RUN_NOWAIT:对I/O进行一次轮询,但如果没有待处理的回调,则不会阻塞。

注意,这个函数不是线程安全的。

代码语言:cpp
复制
typedef enum {
  UV_RUN_DEFAULT = 0,
  UV_RUN_ONCE,
  UV_RUN_NOWAIT
} uv_run_mode;

代码语言:cpp
复制
int uv_run(uv_loop_t* loop, uv_run_mode mode) {
  int timeout;
  int r;
  int ran_pending;

  r = uv__loop_alive(loop);
  if (!r)
    uv__update_time(loop);

  /* 这是一个while循环 */
  while (r != 0 && loop->stop_flag == 0) {
    
    /* 更新时间并开始倒计时 */
    uv__update_time(loop);
    uv__run_timers(loop);
    /* 处理挂起的handle */
    ran_pending = uv__run_pending(loop);
    /* 运行idle handle */
    uv__run_idle(loop);
    /* 运行prepare handle */
    uv__run_prepare(loop);

    timeout = 0;
    if ((mode == UV_RUN_ONCE && !ran_pending) || mode == UV_RUN_DEFAULT)
      timeout = uv_backend_timeout(loop);
    
    /* 计算要阻塞的时间,开始阻塞 */
    uv__io_poll(loop, timeout);

    /* 程序执行到这里表示被唤醒了,被唤醒的原因可能是I/O可读可写、或者超时了,检查handle是否可以操作 */
    uv__run_check(loop);
    /* 看看是否有close的handle */
    uv__run_closing_handles(loop);

    /* 单次模式 */
    if (mode == UV_RUN_ONCE) {
      /* UV_RUN_ONCE implies forward progress: at least one callback must have
       * been invoked when it returns. uv__io_poll() can return without doing
       * I/O (meaning: no callbacks) when its timeout expires - which means we
       * have pending timers that satisfy the forward progress constraint.
       *
       * UV_RUN_NOWAIT makes no guarantees about progress so it's omitted from
       * the check.
       */
      uv__update_time(loop);
      uv__run_timers(loop);
    }

    /* handle保活处理 */
    r = uv__loop_alive(loop);
    if (mode == UV_RUN_ONCE || mode == UV_RUN_NOWAIT)
      break;
  }

  /* The if statement lets gcc compile it to a conditional store. Avoids
   * dirtying a cache line.
   */
  if (loop->stop_flag != 0)
    loop->stop_flag = 0;

  return r;
}

额外介绍

uv_loop_close()

释放所有内部循环资源。仅当循环完成执行并且所有打开的句柄和请求已关闭时才调用此函数,否则它将返回UV_EBUSY。此函数返回后,用户可以释放为循环分配的内存。

代码语言:cpp
复制
int uv_loop_close(uv_loop_t* loop) {
  QUEUE* q;
  uv_handle_t* h;
#ifndef NDEBUG
  void* saved_data;
#endif

  /* 如果存在处于活跃状态的请求,则返回UV_EBUSY */
  if (uv__has_active_reqs(loop))
    return UV_EBUSY;

  /* 如果存在处于活跃状态的handle,则返回UV_EBUSY */
  QUEUE_FOREACH(q, &loop->handle_queue) {
    h = QUEUE_DATA(q, uv_handle_t, handle_queue);
    if (!(h->flags & UV_HANDLE_INTERNAL))
      return UV_EBUSY;
  }

  /* 关闭事件循环 */
  uv__loop_close(loop);

#ifndef NDEBUG
  saved_data = loop->data;
  memset(loop, -1, sizeof(*loop));
  loop->data = saved_data;
#endif
  if (loop == default_loop_ptr)
    default_loop_ptr = NULL;

  return 0;
}

注意,这个函数也不是线程安全的。

完整的Demo:tcp-echo-server代码

代码语言:cpp
复制
#include <stdio.h> 
#include <stdlib.h>
#include <string.h> 
#include <uv.h>     // libuv 库的头文件

#define DEFAULT_PORT 7000// 默认监听端口
#define DEFAULT_BACKLOG 128 // 连接队列的默认大小

uv_loop_t* loop; // 初始化 libuv 事件循环
struct sockaddr_in addr; // 地址结构体

typedef struct { // 定义写请求结构体
    uv_write_t req; // 写请求句柄
    uv_buf_t buf;   // 缓冲区结构体
} write_req_t;

void free_write_req(uv_write_t* req); // 自定义释放写请求的函数
// 函数实现
void free_write_req(uv_write_t* req) {
    write_req_t* wr = (write_req_t*)req; // 转换为自定义类型
    free(wr->buf.base); // 释放缓冲区内存
    free(wr); // 释放写请求内存
}

void alloc_buffer(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf); // 分配缓冲区函数声明
// 函数实现
void alloc_buffer(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) {
    buf->base = (char*)malloc(suggested_size); // 分配内存
    buf->len = suggested_size; // 设置长度
}

void on_close(uv_handle_t* handle); // 关闭句柄后的回调函数声明
// 函数实现
void on_close(uv_handle_t* handle) {
    free(handle); // 释放内存
}

void echo_write(uv_write_t* req, int status); // 写完成后的回调函数声明
// 函数实现
void echo_write(uv_write_t* req, int status) {
    if (status) { // 如果有错误发生
        fprintf(stderr, "Write error %s\n", uv_strerror(status)); // 打印错误信息
    }
    free_write_req(req); // 释放写请求
}

void echo_read(uv_stream_t* client, ssize_t nread, const uv_buf_t* buf); // 读取数据后的回调函数声明
// 函数实现
void echo_read(uv_stream_t* client, ssize_t nread, const uv_buf_t* buf) {
    if (nread > 0) { // 如果读取的数据量大于0
        write_req_t* req = (write_req_t*)malloc(sizeof(write_req_t)); // 分配写请求内存
        req->buf = uv_buf_init(buf->base, nread); // 初始化缓冲区
        uv_write((uv_write_t*)req, client, &req->buf, 1, echo_write); // 异步写入数据,并指定写完成后的回调
        return;
    }
    if (nread < 0) { // 如果有错误发生
        if (nread != UV_EOF) // 如果不是正常关闭连接
            fprintf(stderr, "Read error %s\n", uv_err_name(nread)); // 打印错误信息
        uv_close((uv_handle_t*)client, on_close); // 关闭客户端连接,并指定关闭后的回调
    }

    free(buf->base); // 释放读取的数据缓冲区
}

void on_new_connection(uv_stream_t* server, int status); // 新连接到达后的回调函数声明
// 函数实现
void on_new_connection(uv_stream_t* server, int status) {
    if (status < 0) { // 如果有错误发生
        fprintf(stderr, "New connection error %s\n", uv_strerror(status)); // 打印错误信息
        return; // 返回
    }

    uv_tcp_t* client = (uv_tcp_t*)malloc(sizeof(uv_tcp_t)); // 分配客户端内存
    uv_tcp_init(loop, client); // 初始化客户端
    if (uv_accept(server, (uv_stream_t*)client) == 0) { // 接受连接
        uv_read_start((uv_stream_t*)client, alloc_buffer, echo_read); // 开始读取数据,并指定读取后的回调
    }
    else {
        uv_close((uv_handle_t*)client, on_close); // 如果接受失败,则关闭客户端
    }
}

int main()
{
	loop = uv_default_loop(); // 获取默认的事件循环

	uv_tcp_t server; // 声明一个TCP句柄
	uv_tcp_init(loop, &server); // 初始化服务端TCP句柄

	uv_ip4_addr("0.0.0.0", DEFAULT_PORT, &addr); // 设置地址和端口

	uv_tcp_bind(&server, (const struct sockaddr*)&addr, 0); // 绑定地址

	// 开始监听,并指定新连接到达后的回调
	int r = uv_listen((uv_stream_t*)&server, DEFAULT_BACKLOG, on_new_connection);
	if (r) {
		fprintf(stderr, "Listen error %s\n", uv_strerror(r)); // 如果有错误发生则打印错误信息
		return 1; // 返回错误码
	}
	return uv_run(loop, UV_RUN_DEFAULT); // 启动事件循环
}

总结

借助AI对tcp-echo-server略微分析了下,其实也不过就是网络编程那几步曲,只要像平常一样调用api就行了。

后面试着调用api写一个回声服务端和客户端,或者结合一些客户端相关开发工具写一个多人聊天室就libuv的tcp-echo-server学习就完结了。

关于windows下的网络编程可以参考我写的这篇

windows环境下C/C++的socket相关网络编程详解以及部分TCP详解

或者网上其他人的文章


参考文章

【libuv高效编程】libuv学习超详细教程3——libuv事件循环 - _杰杰_

libuv源码阅读(23)--tcp-echo-server

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 前言
  • 看一下 uv_loop_t 结构体和uv_default_loop函数
    • 事件循环(Event Loop)
      • uv_loop_t 的作用
        • uv_default_loop中uv_loop_init()的作用
        • uv_tcp_t和uv_tcp_init
        • uv_ip4_addr
        • uv_tcp_bind
          • IO 观察者(io_watcher)
          • uv_listen
          • 自定义的一些回调函数
            • 关注一下读写相关api的
              • uv__tcp_read_start
              • uv__tcp_write
          • uv_run
            • uv_loop_close()
            • 完整的Demo:tcp-echo-server代码
            • 总结
            相关产品与服务
            云服务器
            云服务器(Cloud Virtual Machine,CVM)提供安全可靠的弹性计算服务。 您可以实时扩展或缩减计算资源,适应变化的业务需求,并只需按实际使用的资源计费。使用 CVM 可以极大降低您的软硬件采购成本,简化 IT 运维工作。
            领券
            问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档