前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Swoole 源码分析之 WebSocket 模块

Swoole 源码分析之 WebSocket 模块

原创
作者头像
码农先森
发布2024-06-26 22:37:16
900
发布2024-06-26 22:37:16
举报
文章被收录于专栏:Swoole源码分析

大家好,我是码农先森。

引言

WebSocket 是一种在单个 TCP 连接上进行全双工通信的协议。它允许客户端和服务器之间进行实时数据传输。

与传统的 HTTP 请求-响应模型不同,WebSocket 可以保持双向通信通道,从而使得服务器能够主动向客户端推送数据。

Swoole 中的 WebSocket 服务

下面这段代码是从Swoole 官方网站上的引用,从代码中可以看出创建了一个 WebScoket 对象且设置对应的 IP 地址及监听端口,同时还设置了四个回调方法处理对应的事件。

最后,调用 $server->start() 真正的启动 WebScoket 服务。

代码语言:php
复制
$server = new Swoole\Websocket\Server('127.0.0.1', 9502);

$server->on('start', function ($server) {
    echo "Websocket Server is started at ws://127.0.0.1:9502\n";
});

$server->on('open', function($server, $req) {
    echo "connection open: {$req->fd}\n";
});

$server->on('message', function($server, $frame) {
    echo "received message: {$frame->data}\n";
    $server->push($frame->fd, json_encode(['hello', 'world']));
});

$server->on('close', function($server, $fd) {
    echo "connection close: {$fd}\n";
});

$server->start();

那么接下来,我们就从源码角度来分析 Swoole 对 WebSocket 的实现。

源码拆解

这个函数的主要作用是启动 Server 服务。

代码语言:php
复制
static void php_swoole_server_onStart(Server *serv) {
	// 锁定 Server 对象操作
    serv->lock();
    
    // 从 Server 对象中获取到 onStart 回调函数
    zval *zserv = (zval *) serv->private_data_2;
    ServerObject *server_object = server_fetch_object(Z_OBJ_P(zserv));
    auto fci_cache = server_object->property->callbacks[SW_SERVER_CB_onStart];

	...

	// 通过 zend::function::call 调用 PHP 层注册的 onStart 处理函数,并传递参数
    if (fci_cache && UNEXPECTED(!zend::function::call(fci_cache, 1, zserv, nullptr, serv->is_enable_coroutine()))) {
        php_swoole_error(E_WARNING, "%s->onStart handler error", SW_Z_OBJCE_NAME_VAL_P(zserv));
    }
    
    // 解锁 Server 对象操作
    serv->unlock();
}

这个函数主要作用是 WebSocket 服务针对客户端建立连接时事件的处理。

代码语言:php
复制
void swoole_websocket_onOpen(Server *serv, HttpContext *ctx) {
    // 通过 session_id 获取与特定客户端连接相关的 Connection 对象
    Connection *conn = serv->get_connection_by_session_id(ctx->fd);
    if (!conn) {
        swoole_error_log(SW_LOG_TRACE, SW_ERROR_SESSION_NOT_EXIST, "session[%ld] is closed", ctx->fd);
        return;
    }
    
    // Server 对象中获取在 PHP 层设置的回调函数 onOpen。
    zend_fcall_info_cache *fci_cache = php_swoole_server_get_fci_cache(serv, conn->server_fd, SW_SERVER_CB_onOpen);
    if (fci_cache) {
        zval args[2];
        args[0] = *((zval *) serv->private_data_2);
        args[1] = *ctx->request.zobject;
        // 通过 zend::function::call 调用 PHP 层注册的 onOpen 处理函数,并传递参数
        if (UNEXPECTED(!zend::function::call(fci_cache, 2, args, nullptr, serv->is_enable_coroutine()))) {
            php_swoole_error(E_WARNING, "%s->onOpen handler error", ZSTR_VAL(swoole_websocket_server_ce->name));
            serv->close(ctx->fd, false);
        }
    }
}

这个函数主要作用是 WebSocket 服务器针对客户端发送消息事件的处理。

代码语言:php
复制
int swoole_websocket_onMessage(Server *serv, RecvData *req) {
    SessionId fd = req->info.fd;
    uchar flags = 0;
    zend_long opcode = 0;
    // 从接收到的数据中获取客户端的 session_id,并根据 session_id 获取对应的端口信息
    auto port = serv->get_port_by_session_id(fd);
    if (!port) {
        return SW_ERR;
    }

    zval zdata;
    char frame_header[2];
    // 从接收到的数据中解析出 WebSocket 消息的帧头信息和消息内容
    memcpy(frame_header, &req->info.ext_flags, sizeof(frame_header));

    php_swoole_get_recv_data(serv, &zdata, req);

    // 解析出 WebSocket 消息的标志位和操作码
    flags = frame_header[0];
    opcode = frame_header[1];

    // 根据操作码和服务的设置,判断是否需要特殊处理 Close、Ping 或 Pong 类型的消息
    if ((opcode == WebSocket::OPCODE_CLOSE && !port->open_websocket_close_frame) ||
        (opcode == WebSocket::OPCODE_PING && !port->open_websocket_ping_frame) ||
        (opcode == WebSocket::OPCODE_PONG && !port->open_websocket_pong_frame)) {
        if (opcode == WebSocket::OPCODE_PING) {
			...
        }
        zval_ptr_dtor(&zdata);
        return SW_OK;
    }

	...

	// Server 对象中获取在 PHP 层设置的回调函数 onMessage
    zend_fcall_info_cache *fci_cache =
        php_swoole_server_get_fci_cache(serv, req->info.server_fd, SW_SERVER_CB_onMessage);
    zval args[2];

    args[0] = *(zval *) serv->private_data_2;
    // 构造一个 WebSocket 消息帧的数据结构,并将结果存储在 args[1]
    php_swoole_websocket_construct_frame(&args[1], opcode, &zdata, flags);
    zend_update_property_long(swoole_websocket_frame_ce, SW_Z8_OBJ_P(&args[1]), ZEND_STRL("fd"), fd);

    // 通过 zend::function::call 调用 PHP 层注册的 onMessage 处理函数,并传递相应参数
    if (UNEXPECTED(!zend::function::call(fci_cache, 2, args, nullptr, serv->is_enable_coroutine()))) {
        php_swoole_error(E_WARNING, "%s->onMessage handler error", ZSTR_VAL(swoole_websocket_server_ce->name));
        serv->close(fd, false);
    }

    // 释放 zdata 和 args[1] 占用的内存
    zval_ptr_dtor(&zdata);
    zval_ptr_dtor(&args[1]);

    return SW_OK;
}

这个函数的主要作用是关闭 Server 服务。

代码语言:php
复制
void php_swoole_server_onClose(Server *serv, DataHead *info) {
    ...
    
    // Server 对象中获取在 PHP 层设置的回调函数 onClose
    auto *fci_cache = php_swoole_server_get_fci_cache(serv, info->server_fd, SW_SERVER_CB_onClose);
    Connection *conn = serv->get_connection_by_session_id(session_id);
    if (!conn) {
        return;
    }
    
    // 检查当前的 WebSocket 连接状态是否为非活动状态
    if (conn->websocket_status != swoole::websocket::STATUS_ACTIVE) {
        // 获取与当前连接相关的监听端口信息
        ListenPort *port = serv->get_port_by_server_fd(info->server_fd);
       	// 如果该端口开启了 WebSocket 协议,且设置了 onDisconnect 回调函数
        if (port && port->open_websocket_protocol &&
            php_swoole_server_isset_callback(serv, port, SW_SERVER_CB_onDisconnect)) {
            // 获取 onDisconnect 回调函数
            fci_cache = php_swoole_server_get_fci_cache(serv, info->server_fd, SW_SERVER_CB_onDisconnect);
        }
    }
    if (fci_cache) {
    
        ...

		// 通过 zend::function::call 调用 PHP 层注册的 onDisconnect 处理函数,并传递相应参数
        if (UNEXPECTED(!zend::function::call(fci_cache, argc, args, nullptr, serv->enable_coroutine))) {
            php_swoole_error(E_WARNING, "%s->onClose handler error", SW_Z_OBJCE_NAME_VAL_P(zserv));
        }
        
		...
    }
	
	...

}

这个函数的作用是断开 WebSocket 客户端的连接,并发送关闭帧。

代码语言:php
复制
static PHP_METHOD(swoole_websocket_server, disconnect) {
	// 从 ZEND_THIS 中获取 Server 对象
    Server *serv = php_swoole_server_get_and_check_server(ZEND_THIS);
    
    ...
    
    // 清空全局的 WebSocket 缓冲区
    swoole_websocket_buffer->clear();
    
    // 将关闭帧数据打包到 WebSocket 缓冲区中
    if (WebSocket::pack_close_frame(swoole_websocket_buffer, code, data, length, 0) < 0) {
        RETURN_FALSE;
    }
    
    // 调用 swoole_websocket_server_close 函数来关闭客户端连接,并返回结果
    RETURN_BOOL(swoole_websocket_server_close(serv, fd, swoole_websocket_buffer, 1));
}

这个函数的作用是在 WebSocket 服务中关闭客户端连接的操作。

代码语言:php
复制
static sw_inline bool swoole_websocket_server_close(Server *serv, SessionId fd, String *buffer, bool real_close) {
    // 尝试将数据推送给客户端,用于判断是否已经关闭连接
    bool ret = swoole_websocket_server_push(serv, fd, buffer);
    if (!ret || !real_close) {
        return ret;
    }
    
    // 获取到客户端连接相关的 Connection 对象
    Connection *conn = serv->get_connection_by_session_id(fd);
    if (conn) {
        // 将该连接的 websocket_status 改变为 WebSocket::STATUS_CLOSING
        conn->websocket_status = WebSocket::STATUS_CLOSING;
        // 立即关闭连接
        return serv->close(fd, false);
    } else {
        return false;
    }
}

总结

  • 在 Swoole 中 WebSocket 服务是继承于 Http 服务。
  • 在实际的使用过程中是通过 Http 服务来握手升级成 WebSocket 服务。
  • WebSocket 协议的出现解决了通过传统轮询方式来通信的效率问题。
  • 同时也为 PHP 在双向通信解决方式上提供了新的解决方案。

欢迎关注、分享、点赞、收藏、在看,我是微信公众号「码农先森」作者。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 引言
  • Swoole 中的 WebSocket 服务
  • 源码拆解
  • 总结
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档