#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <uv.h>
#define DEFAULT_PORT 7000
#define DEFAULT_BACKLOG 128
uv_loop_t *loop;
struct sockaddr_in addr;
typedef struct {
uv_write_t req;
uv_buf_t buf;
} write_req_t;
void free_write_req(uv_write_t *req) {
write_req_t *wr = (write_req_t*) req;
free(wr->buf.base);
free(wr);
}
void alloc_buffer(uv_handle_t *handle, size_t suggested_size, uv_buf_t *buf) {
buf->base = (char*) malloc(suggested_size);
buf->len = suggested_size;
}
void on_close(uv_handle_t* handle) {
free(handle);
}
void echo_write(uv_write_t *req, int status) {
if (status) {
fprintf(stderr, "Write error %s\n", uv_strerror(status));
}
free_write_req(req);
}
// 某个客户有数据发来 可读
void echo_read(uv_stream_t *client, ssize_t nread, const uv_buf_t *buf) {
if (nread > 0) {
write_req_t *req = (write_req_t*) malloc(sizeof(write_req_t));
req->buf = uv_buf_init(buf->base, nread);
// 把读取到的数据再次写到目标 client 也就是客户那边 echo回显
uv_write((uv_write_t*) req, client, &req->buf, 1, echo_write);
return;
}
if (nread < 0) {
if (nread != UV_EOF)
fprintf(stderr, "Read error %s\n", uv_err_name(nread));
uv_close((uv_handle_t*) client, on_close);
}
free(buf->base);
}
// 监听socket可读代表着 有新连接到来了
void on_new_connection(uv_stream_t *server, int status) {
if (status < 0) {
fprintf(stderr, "New connection error %s\n", uv_strerror(status));
// error!
return;
}
uv_tcp_t *client = (uv_tcp_t*) malloc(sizeof(uv_tcp_t));
uv_tcp_init(loop, client);
// 取出连接 监听这个连接socket的可读事件
if (uv_accept(server, (uv_stream_t*) client) == 0) {
uv_read_start((uv_stream_t*) client, alloc_buffer, echo_read);
}
else {
uv_close((uv_handle_t*) client, on_close);
}
}
int main() {
loop = uv_default_loop();
// 初始化
uv_tcp_t server;
uv_tcp_init(loop, &server);
// 设置ip4 地址
uv_ip4_addr("0.0.0.0", DEFAULT_PORT, &addr);
// bind地址到server上
uv_tcp_bind(&server, (const struct sockaddr*)&addr, 0);
// 把socket变成监听socket
int r = uv_listen((uv_stream_t*) &server, DEFAULT_BACKLOG, on_new_connection);
if (r) {
fprintf(stderr, "Listen error %s\n", uv_strerror(r));
return 1;
}
// 等待监听socket可读 读取连接 然后再监听连接socket echo回显数据
return uv_run(loop, UV_RUN_DEFAULT);
}
先看一下要操作的结构体:
// 默认端口号
#define DEFAULT_PORT 7000
// 监听socket等待队列大小
#define DEFAULT_BACKLOG 128
uv_loop_t *loop;
struct sockaddr_in addr;
// 写req包装结构体
typedef struct {
uv_write_t req;
uv_buf_t buf;
} write_req_t;
/* uv_write_t is a subclass of uv_req_t. */
// 可以看到它由基础的 req 组成
struct uv_write_s {
UV_REQ_FIELDS
uv_write_cb cb;
uv_stream_t* send_handle; /* TODO: make private and unix-only in v2.x. */
uv_stream_t* handle;
UV_WRITE_PRIVATE_FIELDS
};
#define UV_REQ_FIELDS \
/* public */ \
void* data; \
/* read-only */ \
uv_req_type type; \
/* private */ \
void* reserved[6]; \
UV_REQ_PRIVATE_FIELDS \
#define UV_REQ_PRIVATE_FIELDS /* empty */
#define UV_WRITE_PRIVATE_FIELDS \
void* queue[2]; \
unsigned int write_index; \
uv_buf_t* bufs; \
unsigned int nbufs; \
int error; \
uv_buf_t bufsml[4]; \
// 再看下tcp handler 类型 可以看到它其实和stream类型是一样的
/*
* uv_tcp_t is a subclass of uv_stream_t.
*
* Represents a TCP stream or TCP server.
*/
struct uv_tcp_s {
UV_HANDLE_FIELDS
UV_STREAM_FIELDS
UV_TCP_PRIVATE_FIELDS
};
#define UV_STREAM_FIELDS \
/* number of bytes queued for writing */ \
size_t write_queue_size; \
uv_alloc_cb alloc_cb; \
uv_read_cb read_cb; \
/* private */ \
UV_STREAM_PRIVATE_FIELDS
#define UV_STREAM_PRIVATE_FIELDS \
uv_connect_t *connect_req; \
uv_shutdown_t *shutdown_req; \
uv__io_t io_watcher; \
void* write_queue[2]; \
void* write_completed_queue[2]; \
uv_connection_cb connection_cb; \
int delayed_error; \
int accepted_fd; \
void* queued_fds; \
UV_STREAM_PRIVATE_PLATFORM_FIELDS \
#ifndef UV_STREAM_PRIVATE_PLATFORM_FIELDS
# define UV_STREAM_PRIVATE_PLATFORM_FIELDS /* empty */
#endif
#define UV_TCP_PRIVATE_FIELDS /* empty */
再看一下具体的一些函数:
int uv_tcp_init(uv_loop_t* loop, uv_tcp_t* tcp) {
return uv_tcp_init_ex(loop, tcp, AF_UNSPEC);
}
int uv_tcp_init_ex(uv_loop_t* loop, uv_tcp_t* tcp, unsigned int flags) {
int domain;
/* Use the lower 8 bits for the domain */
domain = flags & 0xFF;
// 协议参数限制
if (domain != AF_INET && domain != AF_INET6 && domain != AF_UNSPEC)
return UV_EINVAL;
// 比低8位更高的位上有置位的情况
if (flags & ~0xFF)
return UV_EINVAL;
// stream类型的初始化 指定stream的类型
uv__stream_init(loop, (uv_stream_t*)tcp, UV_TCP);
/* If anything fails beyond this point we need to remove the handle from
* the handle queue, since it was added by uv__handle_init in uv_stream_init.
*/
// 如果有指定具体的 4 6 ip地址 那就创建一个新socket
if (domain != AF_UNSPEC) {
int err = maybe_new_socket(tcp, domain, 0);
if (err) {
QUEUE_REMOVE(&tcp->handle_queue);
return err;
}
}
return 0;
}
看下创建新socket
static int maybe_new_socket(uv_tcp_t* handle, int domain, unsigned long flags) {
struct sockaddr_storage saddr;
socklen_t slen;
if (domain == AF_UNSPEC) {
handle->flags |= flags;
return 0;
}
// 如果之前这个handle已经有绑定fd了
if (uv__stream_fd(handle) != -1) {
if (flags & UV_HANDLE_BOUND) {
if (handle->flags & UV_HANDLE_BOUND) {
/* It is already bound to a port. */
handle->flags |= flags;
return 0;
}
/* Query to see if tcp socket is bound. */
slen = sizeof(saddr);
memset(&saddr, 0, sizeof(saddr));
if (getsockname(uv__stream_fd(handle), (struct sockaddr*) &saddr, &slen))
return UV__ERR(errno);
if ((saddr.ss_family == AF_INET6 &&
((struct sockaddr_in6*) &saddr)->sin6_port != 0) ||
(saddr.ss_family == AF_INET &&
((struct sockaddr_in*) &saddr)->sin_port != 0)) {
/* Handle is already bound to a port. */
handle->flags |= flags;
return 0;
}
// 随意绑定个port先
/* Bind to arbitrary port */
if (bind(uv__stream_fd(handle), (struct sockaddr*) &saddr, slen))
return UV__ERR(errno);
}
handle->flags |= flags;
return 0;
}
// 否则创建一个新的scoket
return new_socket(handle, domain, flags);
}
static int new_socket(uv_tcp_t* handle, int domain, unsigned long flags) {
struct sockaddr_storage saddr;
socklen_t slen;
int sockfd;
int err;
// 创建socket
err = uv__socket(domain, SOCK_STREAM, 0);
if (err < 0)
return err;
sockfd = err;
// socket与handler绑定
err = uv__stream_open((uv_stream_t*) handle, sockfd, flags);
if (err) {
uv__close(sockfd);
return err;
}
if (flags & UV_HANDLE_BOUND) {
/* Bind this new socket to an arbitrary port */
slen = sizeof(saddr);
memset(&saddr, 0, sizeof(saddr));
if (getsockname(uv__stream_fd(handle), (struct sockaddr*) &saddr, &slen)) {
uv__close(sockfd);
return UV__ERR(errno);
}
// 同上 先随意绑定一个
if (bind(uv__stream_fd(handle), (struct sockaddr*) &saddr, slen)) {
uv__close(sockfd);
return UV__ERR(errno);
}
}
return 0;
}
ip地址赋值:
int uv_ip4_addr(const char* ip, int port, struct sockaddr_in* addr) {
memset(addr, 0, sizeof(*addr));
addr->sin_family = AF_INET;
// 网络字节序
addr->sin_port = htons(port);
#ifdef SIN6_LEN
addr->sin_len = sizeof(*addr);
#endif
// 网络字节序
return uv_inet_pton(AF_INET, ip, &(addr->sin_addr.s_addr));
}
// 分情况调用
int uv_inet_pton(int af, const char* src, void* dst) {
if (src == NULL || dst == NULL)
return UV_EINVAL;
switch (af) {
case AF_INET:
return (inet_pton4(src, dst));
case AF_INET6: {
int len;
char tmp[UV__INET6_ADDRSTRLEN], *s, *p;
s = (char*) src;
p = strchr(src, '%');
if (p != NULL) {
s = tmp;
len = p - src;
if (len > UV__INET6_ADDRSTRLEN-1)
return UV_EINVAL;
memcpy(s, src, len);
s[len] = '\0';
}
return inet_pton6(s, dst);
}
default:
return UV_EAFNOSUPPORT;
}
/* NOTREACHED */
}
再看下 bind:
int uv_tcp_bind(uv_tcp_t* handle,
const struct sockaddr* addr,
unsigned int flags) {
unsigned int addrlen;
if (handle->type != UV_TCP)
return UV_EINVAL;
if (addr->sa_family == AF_INET)
addrlen = sizeof(struct sockaddr_in);
else if (addr->sa_family == AF_INET6)
addrlen = sizeof(struct sockaddr_in6);
else
return UV_EINVAL;
return uv__tcp_bind(handle, addr, addrlen, flags);
}
int uv__tcp_bind(uv_tcp_t* tcp,
const struct sockaddr* addr,
unsigned int addrlen,
unsigned int flags) {
int err;
int on;
/* Cannot set IPv6-only mode on non-IPv6 socket. */
if ((flags & UV_TCP_IPV6ONLY) && addr->sa_family != AF_INET6)
return UV_EINVAL;
err = maybe_new_socket(tcp, addr->sa_family, 0);
if (err)
return err;
// 设置socket可以重用
on = 1;
if (setsockopt(tcp->io_watcher.fd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)))
return UV__ERR(errno);
#ifndef __OpenBSD__
#ifdef IPV6_V6ONLY
if (addr->sa_family == AF_INET6) {
on = (flags & UV_TCP_IPV6ONLY) != 0;
if (setsockopt(tcp->io_watcher.fd,
IPPROTO_IPV6,
IPV6_V6ONLY,
&on,
sizeof on) == -1) {
#if defined(__MVS__)
if (errno == EOPNOTSUPP)
return UV_EINVAL;
#endif
return UV__ERR(errno);
}
}
#endif
#endif
// 调用bind 绑定地址到socket上
errno = 0;
if (bind(tcp->io_watcher.fd, addr, addrlen) && errno != EADDRINUSE) {
if (errno == EAFNOSUPPORT)
/* OSX, other BSDs and SunoS fail with EAFNOSUPPORT when binding a
* socket created with AF_INET to an AF_INET6 address or vice versa. */
return UV_EINVAL;
return UV__ERR(errno);
}
tcp->delayed_error = UV__ERR(errno);
tcp->flags |= UV_HANDLE_BOUND;
if (addr->sa_family == AF_INET6)
tcp->flags |= UV_HANDLE_IPV6;
return 0;
}
再看下listen:
int uv_listen(uv_stream_t* stream, int backlog, uv_connection_cb cb) {
int err;
switch (stream->type) {
case UV_TCP:
// 我们关注这里
err = uv_tcp_listen((uv_tcp_t*)stream, backlog, cb);
break;
case UV_NAMED_PIPE:
err = uv_pipe_listen((uv_pipe_t*)stream, backlog, cb);
break;
default:
err = UV_EINVAL;
}
if (err == 0)
// 正常情况下激活handler
uv__handle_start(stream);
return err;
}
int uv_tcp_listen(uv_tcp_t* tcp, int backlog, uv_connection_cb cb) {
static int single_accept_cached = -1;
unsigned long flags;
int single_accept;
int err;
if (tcp->delayed_error)
return tcp->delayed_error;
single_accept = uv__load_relaxed(&single_accept_cached);
if (single_accept == -1) {
const char* val = getenv("UV_TCP_SINGLE_ACCEPT");
single_accept = (val != NULL && atoi(val) != 0); /* Off by default. */
uv__store_relaxed(&single_accept_cached, single_accept);
}
if (single_accept)
tcp->flags |= UV_HANDLE_TCP_SINGLE_ACCEPT;
flags = 0;
#if defined(__MVS__)
/* on zOS the listen call does not bind automatically
if the socket is unbound. Hence the manual binding to
an arbitrary port is required to be done manually
*/
flags |= UV_HANDLE_BOUND;
#endif
err = maybe_new_socket(tcp, AF_INET, flags);
if (err)
return err;
// 变成监听socket
if (listen(tcp->io_watcher.fd, backlog))
return UV__ERR(errno);
tcp->connection_cb = cb;
tcp->flags |= UV_HANDLE_BOUND;
// 激活这个handler的 io观察者部分 同时指定可读事件的回调
/* Start listening for connections. */
tcp->io_watcher.cb = uv__server_io;
uv__io_start(tcp->loop, &tcp->io_watcher, POLLIN);
return 0;
}
listen完了之后run loop等待socket可读,新连接到来,执行指定回调
#if defined(UV_HAVE_KQUEUE)
# define UV_DEC_BACKLOG(w) w->rcount--;
#else
# define UV_DEC_BACKLOG(w) /* no-op */
#endif /* defined(UV_HAVE_KQUEUE) */
// stream上的可读事件
void uv__server_io(uv_loop_t* loop, uv__io_t* w, unsigned int events) {
uv_stream_t* stream;
int err;
stream = container_of(w, uv_stream_t, io_watcher);
assert(events & POLLIN);
assert(stream->accepted_fd == -1);
assert(!(stream->flags & UV_HANDLE_CLOSING));
// 再次注册可读事件监听
uv__io_start(stream->loop, &stream->io_watcher, POLLIN);
/* connection_cb can close the server socket while we're
* in the loop so check it on each iteration.
*/
while (uv__stream_fd(stream) != -1) {
assert(stream->accepted_fd == -1);
#if defined(UV_HAVE_KQUEUE)
if (w->rcount <= 0)
return;
#endif /* defined(UV_HAVE_KQUEUE) */
// 取出连接
err = uv__accept(uv__stream_fd(stream));
if (err < 0) {
if (err == UV_EAGAIN || err == UV__ERR(EWOULDBLOCK))
return; /* Not an error. */
if (err == UV_ECONNABORTED)
continue; /* Ignore. Nothing we can do about that. */
if (err == UV_EMFILE || err == UV_ENFILE) {
err = uv__emfile_trick(loop, uv__stream_fd(stream));
if (err == UV_EAGAIN || err == UV__ERR(EWOULDBLOCK))
break;
}
stream->connection_cb(stream, err);
continue;
}
// 绑定fd 执行新连接到来后 用户回调
UV_DEC_BACKLOG(w)
stream->accepted_fd = err;
stream->connection_cb(stream, 0);
// uv_accept 会改变 accepted_fd
if (stream->accepted_fd != -1) {
/* The user hasn't yet accepted called uv_accept() */
uv__io_stop(loop, &stream->io_watcher, POLLIN);
return;
}
if (stream->type == UV_TCP &&
(stream->flags & UV_HANDLE_TCP_SINGLE_ACCEPT)) {
/* Give other processes a chance to accept connections. */
// todo
struct timespec timeout = { 0, 1 };
nanosleep(&timeout, NULL);
}
}
}
#undef UV_DEC_BACKLOG
看下2个accept函数:
// 返回连接socket 且属性被设置好了
int uv__accept(int sockfd) {
int peerfd;
int err;
(void) &err;
assert(sockfd >= 0);
do
#ifdef uv__accept4
peerfd = uv__accept4(sockfd, NULL, NULL, SOCK_NONBLOCK|SOCK_CLOEXEC);
#else
peerfd = accept(sockfd, NULL, NULL);
#endif
while (peerfd == -1 && errno == EINTR);
if (peerfd == -1)
return UV__ERR(errno);
#ifndef uv__accept4
err = uv__cloexec(peerfd, 1);
if (err == 0)
err = uv__nonblock(peerfd, 1);
if (err != 0) {
uv__close(peerfd);
return err;
}
#endif
return peerfd;
}
// 用户回调中需要调用的 uv_accept 把连接socket绑定到client结构体上 监听socket已经不需要它了
int uv_accept(uv_stream_t* server, uv_stream_t* client) {
int err;
assert(server->loop == client->loop);
if (server->accepted_fd == -1)
return UV_EAGAIN;
switch (client->type) {
case UV_NAMED_PIPE:
case UV_TCP:
err = uv__stream_open(client,
server->accepted_fd,
UV_HANDLE_READABLE | UV_HANDLE_WRITABLE);
if (err) {
/* TODO handle error */
uv__close(server->accepted_fd);
goto done;
}
break;
case UV_UDP:
err = uv_udp_open((uv_udp_t*) client, server->accepted_fd);
if (err) {
uv__close(server->accepted_fd);
goto done;
}
break;
default:
return UV_EINVAL;
}
client->flags |= UV_HANDLE_BOUND;
done:
/* Process queued fds */
if (server->queued_fds != NULL) {
uv__stream_queued_fds_t* queued_fds;
queued_fds = server->queued_fds;
/* Read first */
server->accepted_fd = queued_fds->fds[0];
/* All read, free */
assert(queued_fds->offset > 0);
if (--queued_fds->offset == 0) {
uv__free(queued_fds);
server->queued_fds = NULL;
} else {
/* Shift rest */
memmove(queued_fds->fds,
queued_fds->fds + 1,
queued_fds->offset * sizeof(*queued_fds->fds));
}
} else {
// 这里改变
server->accepted_fd = -1;
if (err == 0)
uv__io_start(server->loop, &server->io_watcher, POLLIN);
}
return err;
}
然后看下 得到新连接之后业务逻辑做了什么:
void on_new_connection(uv_stream_t *server, int status) {
if (status < 0) {
fprintf(stderr, "New connection error %s\n", uv_strerror(status));
// error!
return;
}
// client也是一个tcp类型的handler
uv_tcp_t *client = (uv_tcp_t*) malloc(sizeof(uv_tcp_t));
uv_tcp_init(loop, client);
if (uv_accept(server, (uv_stream_t*) client) == 0) {
// 激活它
uv_read_start((uv_stream_t*) client, alloc_buffer, echo_read);
}
else {
uv_close((uv_handle_t*) client, on_close);
}
}
int uv_read_start(uv_stream_t* stream,
uv_alloc_cb alloc_cb,
uv_read_cb read_cb) {
if (stream == NULL || alloc_cb == NULL || read_cb == NULL)
return UV_EINVAL;
if (stream->flags & UV_HANDLE_CLOSING)
return UV_EINVAL;
if (stream->flags & UV_HANDLE_READING)
return UV_EALREADY;
if (!(stream->flags & UV_HANDLE_READABLE))
return UV_ENOTCONN;
return uv__read_start(stream, alloc_cb, read_cb);
}
int uv__read_start(uv_stream_t* stream,
uv_alloc_cb alloc_cb,
uv_read_cb read_cb) {
assert(stream->type == UV_TCP || stream->type == UV_NAMED_PIPE ||
stream->type == UV_TTY);
/* The UV_HANDLE_READING flag is irrelevant of the state of the tcp - it just
* expresses the desired state of the user.
*/
stream->flags |= UV_HANDLE_READING;
/* TODO: try to do the read inline? */
/* TODO: keep track of tcp state. If we've gotten a EOF then we should
* not start the IO watcher.
*/
assert(uv__stream_fd(stream) >= 0);
assert(alloc_cb);
stream->read_cb = read_cb;
stream->alloc_cb = alloc_cb;
// 注册连接socket可读 然后让loop等待客户发信息过来即可
uv__io_start(stream->loop, &stream->io_watcher, POLLIN);
uv__handle_start(stream);
uv__stream_osx_interrupt_select(stream);
return 0;
}
然后看 收到客户的消息后的回调:
void echo_read(uv_stream_t *client, ssize_t nread, const uv_buf_t *buf) {
// 正常读到数据了
if (nread > 0) {
write_req_t *req = (write_req_t*) malloc(sizeof(write_req_t));
req->buf = uv_buf_init(buf->base, nread);
// 直接写回到client的fd 也就是socket
uv_write((uv_write_t*) req, client, &req->buf, 1, echo_write);
return;
}
if (nread < 0) {
if (nread != UV_EOF)
fprintf(stderr, "Read error %s\n", uv_err_name(nread));
uv_close((uv_handle_t*) client, on_close);
}
// 连接结束后释放资源
free(buf->base);
}
x
再看下 uv_write:
int uv_write(uv_write_t* req,
uv_stream_t* handle,
const uv_buf_t bufs[],
unsigned int nbufs,
uv_write_cb cb) {
return uv_write2(req, handle, bufs, nbufs, NULL, cb);
}
int uv_write2(uv_write_t* req,
uv_stream_t* stream,
const uv_buf_t bufs[],
unsigned int nbufs,
uv_stream_t* send_handle,
uv_write_cb cb) {
int empty_queue;
assert(nbufs > 0);
assert((stream->type == UV_TCP ||
stream->type == UV_NAMED_PIPE ||
stream->type == UV_TTY) &&
"uv_write (unix) does not yet support other types of streams");
if (uv__stream_fd(stream) < 0)
return UV_EBADF;
if (!(stream->flags & UV_HANDLE_WRITABLE))
return UV_EPIPE;
if (send_handle) {
if (stream->type != UV_NAMED_PIPE || !((uv_pipe_t*)stream)->ipc)
return UV_EINVAL;
/* XXX We abuse uv_write2() to send over UDP handles to child processes.
* Don't call uv__stream_fd() on those handles, it's a macro that on OS X
* evaluates to a function that operates on a uv_stream_t with a couple of
* OS X specific fields. On other Unices it does (handle)->io_watcher.fd,
* which works but only by accident.
*/
if (uv__handle_fd((uv_handle_t*) send_handle) < 0)
return UV_EBADF;
#if defined(__CYGWIN__) || defined(__MSYS__)
/* Cygwin recvmsg always sets msg_controllen to zero, so we cannot send it.
See https://github.com/mirror/newlib-cygwin/blob/86fc4bf0/winsup/cygwin/fhandler_socket.cc#L1736-L1743 */
return UV_ENOSYS;
#endif
}
/* It's legal for write_queue_size > 0 even when the write_queue is empty;
* it means there are error-state requests in the write_completed_queue that
* will touch up write_queue_size later, see also uv__write_req_finish().
* We could check that write_queue is empty instead but that implies making
* a write() syscall when we know that the handle is in error mode.
*/
empty_queue = (stream->write_queue_size == 0);
// 初始化待写队列
/* Initialize the req */
uv__req_init(stream->loop, req, UV_WRITE);
req->cb = cb;
// 把写req的handler指向stream
req->handle = stream;
req->error = 0;
req->send_handle = send_handle;
QUEUE_INIT(&req->queue);
req->bufs = req->bufsml;
if (nbufs > ARRAY_SIZE(req->bufsml))
req->bufs = uv__malloc(nbufs * sizeof(bufs[0]));
if (req->bufs == NULL)
return UV_ENOMEM;
// 把数据复制过来
memcpy(req->bufs, bufs, nbufs * sizeof(bufs[0]));
req->nbufs = nbufs;
req->write_index = 0;
// 累加还有多少字符要写出去
stream->write_queue_size += uv__count_bufs(bufs, nbufs);
/* Append the request to write_queue. */
// 插入代写队列中
QUEUE_INSERT_TAIL(&stream->write_queue, &req->queue);
/* If the queue was empty when this function began, we should attempt to
* do the write immediately. Otherwise start the write_watcher and wait
* for the fd to become writable.
*/
if (stream->connect_req) {
/* Still connecting, do nothing. */
}
// 如果目前代写队列为空就直接写入
else if (empty_queue) {
uv__write(stream);
}
else {
/*
* blocking streams should never have anything in the queue.
* if this assert fires then somehow the blocking stream isn't being
* sufficiently flushed in uv__write.
*/
assert(!(stream->flags & UV_HANDLE_BLOCKING_WRITES));
// 否则注册socket可写事件监听 等待它空了之后再写
uv__io_start(stream->loop, &stream->io_watcher, POLLOUT);
uv__stream_osx_interrupt_select(stream);
}
return 0;
}
可以看下stream上io事件触发后 可事件是怎么处理的:
// 前文已经提过了 不过那时候关注的是读事件
static void uv__stream_io(uv_loop_t* loop, uv__io_t* w, unsigned int events) {
uv_stream_t* stream;
stream = container_of(w, uv_stream_t, io_watcher);
assert(stream->type == UV_TCP ||
stream->type == UV_NAMED_PIPE ||
stream->type == UV_TTY);
assert(!(stream->flags & UV_HANDLE_CLOSING));
if (stream->connect_req) {
uv__stream_connect(stream);
return;
}
assert(uv__stream_fd(stream) >= 0);
/* Ignore POLLHUP here. Even if it's set, there may still be data to read. */
if (events & (POLLIN | POLLERR | POLLHUP))
uv__read(stream);
if (uv__stream_fd(stream) == -1)
return; /* read_cb closed stream. */
/* Short-circuit iff POLLHUP is set, the user is still interested in read
* events and uv__read() reported a partial read but not EOF. If the EOF
* flag is set, uv__read() called read_cb with err=UV_EOF and we don't
* have to do anything. If the partial read flag is not set, we can't
* report the EOF yet because there is still data to read.
*/
if ((events & POLLHUP) &&
(stream->flags & UV_HANDLE_READING) &&
(stream->flags & UV_HANDLE_READ_PARTIAL) &&
!(stream->flags & UV_HANDLE_READ_EOF)) {
uv_buf_t buf = { NULL, 0 };
uv__stream_eof(stream, &buf);
}
if (uv__stream_fd(stream) == -1)
return; /* read_cb closed stream. */
// 在这里处理可写事件
if (events & (POLLOUT | POLLERR | POLLHUP)) {
uv__write(stream);
uv__write_callbacks(stream);
/* Write queue drained. */
if (QUEUE_EMPTY(&stream->write_queue))
uv__drain(stream);
}
}
可以看到也是调用 uv__write:
static void uv__write(uv_stream_t* stream) {
struct iovec* iov;
QUEUE* q;
uv_write_t* req;
int iovmax;
int iovcnt;
ssize_t n;
int err;
start:
assert(uv__stream_fd(stream) >= 0);
if (QUEUE_EMPTY(&stream->write_queue))
return;
// 取出待写队列
q = QUEUE_HEAD(&stream->write_queue);
req = QUEUE_DATA(q, uv_write_t, queue);
assert(req->handle == stream);
/*
* Cast to iovec. We had to have our own uv_buf_t instead of iovec
* because Windows's WSABUF is not an iovec.
*/
assert(sizeof(uv_buf_t) == sizeof(struct iovec));
iov = (struct iovec*) &(req->bufs[req->write_index]);
iovcnt = req->nbufs - req->write_index;
iovmax = uv__getiovmax();
/* Limit iov count to avoid EINVALs from writev() */
if (iovcnt > iovmax)
iovcnt = iovmax;
/*
* Now do the actual writev. Note that we've been updating the pointers
* inside the iov each time we write. So there is no need to offset it.
*/
// 先不看消息队列部分的
if (req->send_handle) {
int fd_to_send;
struct msghdr msg;
struct cmsghdr *cmsg;
union {
char data[64];
struct cmsghdr alias;
} scratch;
if (uv__is_closing(req->send_handle)) {
err = UV_EBADF;
goto error;
}
fd_to_send = uv__handle_fd((uv_handle_t*) req->send_handle);
memset(&scratch, 0, sizeof(scratch));
assert(fd_to_send >= 0);
msg.msg_name = NULL;
msg.msg_namelen = 0;
msg.msg_iov = iov;
msg.msg_iovlen = iovcnt;
msg.msg_flags = 0;
msg.msg_control = &scratch.alias;
msg.msg_controllen = CMSG_SPACE(sizeof(fd_to_send));
cmsg = CMSG_FIRSTHDR(&msg);
cmsg->cmsg_level = SOL_SOCKET;
cmsg->cmsg_type = SCM_RIGHTS;
cmsg->cmsg_len = CMSG_LEN(sizeof(fd_to_send));
/* silence aliasing warning */
{
void* pv = CMSG_DATA(cmsg);
int* pi = pv;
*pi = fd_to_send;
}
do
n = sendmsg(uv__stream_fd(stream), &msg, 0);
while (n == -1 && RETRY_ON_WRITE_ERROR(errno));
/* Ensure the handle isn't sent again in case this is a partial write. */
if (n >= 0)
req->send_handle = NULL;
} else {
// 我们这次关注的写操作 取出strema的fd 这时候代表的是客户socket
do
n = uv__writev(uv__stream_fd(stream), iov, iovcnt);
while (n == -1 && RETRY_ON_WRITE_ERROR(errno));
}
if (n == -1 && !IS_TRANSIENT_WRITE_ERROR(errno, req->send_handle)) {
err = UV__ERR(errno);
goto error;
}
if (n >= 0 && uv__write_req_update(stream, req, n)) {
uv__write_req_finish(req);
return; /* TODO(bnoordhuis) Start trying to write the next request. */
}
/* If this is a blocking stream, try again. */
if (stream->flags & UV_HANDLE_BLOCKING_WRITES)
goto start;
/* We're not done. */
// 再次注册可写事件
uv__io_start(stream->loop, &stream->io_watcher, POLLOUT);
/* Notify select() thread about state change */
uv__stream_osx_interrupt_select(stream);
return;
error:
req->error = err;
uv__write_req_finish(req);
uv__io_stop(stream->loop, &stream->io_watcher, POLLOUT);
if (!uv__io_active(&stream->io_watcher, POLLIN))
uv__handle_stop(stream);
uv__stream_osx_interrupt_select(stream);
}
static ssize_t uv__writev(int fd, struct iovec* vec, size_t n) {
if (n == 1)
return write(fd, vec->iov_base, vec->iov_len);
else
return writev(fd, vec, n);
}
分析完读取数据的数量后,如果是读到0,stream内置的io函数会终止handler,而异常数据则需要手动终止,看下调用:
uv_close((uv_handle_t*) client, on_close);
void uv_close(uv_handle_t* handle, uv_close_cb close_cb) {
assert(!uv__is_closing(handle));
handle->flags |= UV_HANDLE_CLOSING;
handle->close_cb = close_cb;
switch (handle->type) {
case UV_NAMED_PIPE:
uv__pipe_close((uv_pipe_t*)handle);
break;
case UV_TTY:
uv__stream_close((uv_stream_t*)handle);
break;
// 走这个逻辑
case UV_TCP:
uv__tcp_close((uv_tcp_t*)handle);
break;
case UV_UDP:
uv__udp_close((uv_udp_t*)handle);
break;
case UV_PREPARE:
uv__prepare_close((uv_prepare_t*)handle);
break;
case UV_CHECK:
uv__check_close((uv_check_t*)handle);
break;
case UV_IDLE:
uv__idle_close((uv_idle_t*)handle);
break;
case UV_ASYNC:
uv__async_close((uv_async_t*)handle);
break;
case UV_TIMER:
uv__timer_close((uv_timer_t*)handle);
break;
case UV_PROCESS:
uv__process_close((uv_process_t*)handle);
break;
case UV_FS_EVENT:
uv__fs_event_close((uv_fs_event_t*)handle);
break;
case UV_POLL:
uv__poll_close((uv_poll_t*)handle);
break;
case UV_FS_POLL:
uv__fs_poll_close((uv_fs_poll_t*)handle);
/* Poll handles use file system requests, and one of them may still be
* running. The poll code will call uv__make_close_pending() for us. */
return;
case UV_SIGNAL:
uv__signal_close((uv_signal_t*) handle);
break;
default:
assert(0);
}
uv__make_close_pending(handle);
}
void uv__tcp_close(uv_tcp_t* handle) {
uv__stream_close((uv_stream_t*)handle);
}
void uv__stream_close(uv_stream_t* handle) {
unsigned int i;
uv__stream_queued_fds_t* queued_fds;
#if defined(__APPLE__)
/* Terminate select loop first */
if (handle->select != NULL) {
uv__stream_select_t* s;
s = handle->select;
uv_sem_post(&s->close_sem);
uv_sem_post(&s->async_sem);
uv__stream_osx_interrupt_select(handle);
uv_thread_join(&s->thread);
uv_sem_destroy(&s->close_sem);
uv_sem_destroy(&s->async_sem);
uv__close(s->fake_fd);
uv__close(s->int_fd);
uv_close((uv_handle_t*) &s->async, uv__stream_osx_cb_close);
handle->select = NULL;
}
#endif /* defined(__APPLE__) */
// 移除这个io 观察者
uv__io_close(handle->loop, &handle->io_watcher);
// 停止监听事件
uv_read_stop(handle);
uv__handle_stop(handle);
handle->flags &= ~(UV_HANDLE_READABLE | UV_HANDLE_WRITABLE);
if (handle->io_watcher.fd != -1) {
/* Don't close stdio file descriptors. Nothing good comes from it. */
if (handle->io_watcher.fd > STDERR_FILENO)
uv__close(handle->io_watcher.fd);
handle->io_watcher.fd = -1;
}
if (handle->accepted_fd != -1) {
uv__close(handle->accepted_fd);
handle->accepted_fd = -1;
}
/* Close all queued fds */
if (handle->queued_fds != NULL) {
queued_fds = handle->queued_fds;
for (i = 0; i < queued_fds->offset; i++)
uv__close(queued_fds->fds[i]);
uv__free(handle->queued_fds);
handle->queued_fds = NULL;
}
assert(!uv__io_active(&handle->io_watcher, POLLIN | POLLOUT));
}
总结:比较简单的单进程 tcp echo 服务器程序分析完了,跟我们自己平常写的简单代码一样,都是 监听socket可读,然后写回到客户socket,客户断开后释放资源即可。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。