首页
学习
活动
专区
圈层
工具
发布
社区首页 >专栏 >【TinyWebServer】半同步半反应堆线程池

【TinyWebServer】半同步半反应堆线程池

作者头像
小文要打代码
发布2025-06-11 11:14:31
发布2025-06-11 11:14:31
1840
举报
文章被收录于专栏:C++学习历程C++学习历程

服务器编程基本框架

主要由I/O单元,逻辑单元和网络存储单元组成,其中每个单元之间通过请求队列进行通信,从而协同完成任务。

其中I/O单元用于处理客户端连接,读写网络数据;逻辑单元用于处理业务逻辑的线程;网络存储单元指本地数据库和文件等。

五种I/O模型

  • 阻塞IO:调用者调用了某个函数,等待这个函数返回,期间什么也不做,不停的去检查这个函数有没有返回,必须等这个函数返回才能进行下一步动作
  • 非阻塞IO:非阻塞等待,每隔一段时间就去检测IO事件是否就绪。没有就绪就可以做其他事。非阻塞I/O执行系统调用总是立即返回,不管时间是否已经发生,若时间没有发生,则返回-1,此时可以根据errno区分这两种情况,对于accept,recv和send,事件未发生时,errno通常被设置成eagain
  • 信号驱动IO:linux用套接口进行信号驱动IO,安装一个信号处理函数,进程继续运行并不阻塞,当IO时间就绪,进程收到SIGIO信号。然后处理IO事件。
  • IO复用:linux用select/poll函数实现IO复用模型,这两个函数也会使进程阻塞,但是和阻塞IO所不同的是这两个函数可以同时阻塞多个IO操作。而且可以同时对多个读操作、写操作的IO函数进行检测。知道有数据可读或可写时,才真正调用IO操作函数
  • 异步IO:linux中,可以调用aio_read函数告诉内核描述字缓冲区指针和缓冲区的大小、文件偏移及通知的方式,然后立即返回,当内核将数据拷贝到缓冲区后,再通知应用程序。

注意:阻塞I/O,非阻塞I/O,信号驱动I/O和I/O复用都是同步I/O。同步I/O指内核向应用程序通知的是就绪事件,比如只通知有客户端连接,要求用户代码自行执行I/O操作,异步I/O是指内核向应用程序通知的是完成事件,比如读取客户端的数据后才通知应用程序,由内核完成I/O操作。

事件处理模式

reactor模式

  • reactor模式中,主线程(I/O处理单元)只负责监听文件描述符上是否有事件发生,有的话立即通知工作线程(逻辑单元 ),读写数据、接受新连接及处理客户请求均在工作线程中完成。通常由同步I/O实现。
  • Reactor 模型有三个重要的组件: 多路复用器:由操作系统提供,在 linux 上一般是 select, poll, epoll 等系统调用。 事件分发器:将多路复用器中返回的就绪事件分到对应的处理函数中 事件处理器:负责处理特定事件的处理函数 具体流程如下: 注册读就绪事件和相应的事件处理器 事件分离器等待事件 事件到来,激活分离器,分离器调用事件对应的处理器 事件处理器完成实际的读操作,处理读到的数据,注册新的事件,然后返还控制权

  • 工作流程: ①主线程往epoll内核事件表中注册socket上有数据可读 ②主线程调用epoll_wait等待socket上有数据可读 ③当socket上有数据可读时,epoll_wait通知主线程。主线程则将socket可读事件放入请求队列 ④睡眠在请求请求队列上的某个工作线程被唤醒,它从socket读取数据,并处理客户请求,然后往epoll内核事件表中注册该socket上的写就绪时间 ⑤主线程调用epoll_wait等到socket可写 ⑥当socket可写时,epoll_wait通知主线程。主线程将socket可写事件放入请求队列 ⑦睡眠在请求队列上的某个工作线程被唤醒,它向socket上写入服务器处理客户请求的结果

proactor模式

proactor模式中,主线程和内核负责处理读写数据、接受新连接等I/O操作,工作线程仅负责业务逻辑,如处理客户请求。通常由异步I/O实现。

Proactor模式的工作流程 ①主线程调用aio_read函数向内核注册socket上读完成事件,并告诉内核用户读缓冲区的位置,以及读操作完成时如何通知应用程序(这里以信号为例) ②主线程继续处理其他逻辑 ③当socket上的数据被读入用户缓冲区后,内核将向应用程序发送一个信号,以通知应用程序数据已经可用 ④应用程序预先定义好的信号处理函数选择一个工作线程来处理客户请求。工作线程处理完客户请求之后,调用aio_write函数向内核注册socket上的写完成事件,并告诉内核用户写缓冲区的位置,以及写操作完成时如何通知应用程序(这里以信号为例) ⑤主线程继续处理其他逻辑 ⑥当用户缓冲区的数据被写入socket之后,内核将向应用程序发送一个信号,以通知应用程序数据已经发送完毕 ⑦应用程序预先定义好的信号处理函数选择一个工作线程来做善后处理,比如决定是否关闭socket

在上图中,连接socket上的读写事件是通过aio_read/aio_write向内核注册的,因此内核将通过信号来向应用程序报告连接socket上的读写事件。所以,主线程的epoll_wait调用仅能用来检测监听socket上的连接请求事件,而不能用来检测连接socket的读写事件。


同步I/O模拟proactor模式

同步I/O模型的工作流程如下(epoll_wait为例):

  • 主线程往epoll内核事件表注册socket上的读就绪事件。
  • 主线程调用epoll_wait等待socket上有数据可读
  • 当socket上有数据可读,epoll_wait通知主线程,主线程从socket循环读取数据,直到没有更多数据可读,然后将读取到的数据封装成一个请求对象并插入请求队列。
  • 睡眠在请求队列上某个工作线程被唤醒,它获得请求对象并处理客户请求,然后往epoll内核事件表中注册该socket上的写就绪事件
  • 主线程调用epoll_wait等待socket可写。
  • 当socket上有数据可写,epoll_wait通知主线程。主线程往socket上写入服务器处理客户请求的结果。

并发编程模式

并发编程方法的实现有多线程和多进程两种,但这里涉及的并发模式指I/O处理单元与逻辑单元的协同完成任务的方法。

  • 半同步/半异步模式
  • 领导者/追随者模式

半同步/半反应堆

半同步/半反应堆并发模式是半同步/半异步的变体,将半异步具体化为某种事件处理模式.

并发模式中的同步和异步

  • 同步指的是程序完全按照代码序列的顺序执行
  • 异步指的是程序的执行需要由系统事件驱动

半同步/半异步模式工作流程

  • 同步线程用于处理客户逻辑
  • 异步线程用于处理I/O事件
  • 异步线程监听到客户请求后,就将其封装成请求对象并插入请求队列中
  • 请求队列将通知某个工作在同步模式的工作线程来读取并处理该请求对象

半同步/半反应堆工作流程(以Proactor模式为例)

  • 主线程充当异步线程,负责监听所有socket上的事件
  • 若有新请求到来,主线程接收之以得到新的连接socket,然后往epoll内核事件表中注册该socket上的读写事件
  • 如果连接socket上有读写事件发生,主线程从socket上接收数据,并将数据封装成请求对象插入到请求队列中
  • 所有工作线程睡眠在请求队列上,当有任务到来时,通过竞争(如互斥锁)获得任务的接管权

pthread_create陷阱

首先看一下该函数的函数原型。

代码语言:javascript
复制
#include <pthread.h>
    int pthread_create (pthread_t *thread_tid,                 //返回新生成的线程的id
    const pthread_attr_t *attr,         //指向线程属性的指针,通常设置为NULL
    void * (*start_routine) (void *),   //处理线程函数的地址
    void *arg);                         //start_routine()中的参数

函数原型中的第三个参数,为函数指针,指向处理线程函数的地址。该函数,要求为静态函数。如果处理线程函数为类成员函数时,需要将其设置为静态成员函数

this指针的锅

pthread_create的函数原型中第三个参数的类型为函数指针,指向的线程处理函数参数类型为(void *),若线程函数为类成员函数,则this指针会作为默认的参数被传进函数中,从而和线程函数参数(void*)不能匹配,不能通过编译。

静态成员函数就没有这个问题,里面没有this指针。

线程池分析

线程池的设计模式为半同步/半反应堆,其中反应堆具体为Proactor事件处理模式。

具体的,主线程为异步线程,负责监听文件描述符,接收socket新连接,若当前监听的socket发生了读写事件,然后将任务插入到请求队列。工作线程从请求队列中取出任务,完成读写数据的处理。

线程池类定义

具体定义可以看代码。需要注意,线程处理函数和运行函数设置为私有属性。

代码语言:javascript
复制
template <typename T>
class threadpool
{
public:
    /*thread_number是线程池中线程的数量,max_requests是请求队列中最多允许的、等待处理的请求的数量*/
    threadpool(int actor_model, connection_pool *connPool, int thread_number = 8, int max_request = 10000);
    ~threadpool();
    bool append(T *request, int state);
    bool append_p(T *request);

private:
    /*工作线程运行的函数,它不断从工作队列中取出任务并执行之*/
    static void *worker(void *arg);
    void run();

private:
    int m_thread_number;        //线程池中的线程数
    int m_max_requests;         //请求队列中允许的最大请求数
    pthread_t *m_threads;       //描述线程池的数组,其大小为m_thread_number
    std::list<T *> m_workqueue; //请求队列
    locker m_queuelocker;       //保护请求队列的互斥锁
    sem m_queuestat;            //是否有任务需要处理
    connection_pool *m_connPool;  //数据库
    int m_actor_model;          //模型切换
};

线程池创建与回收

构造函数中创建线程池,pthread_create函数中将类的对象作为参数传递给静态函数(worker),在静态函数中引用这个对象,并调用其动态方法(run)。

具体的,类对象传递时用this指针,传递给静态函数后,将其转换为线程池类,并调用私有成员函数run。

代码语言:javascript
复制
template <typename T>
threadpool<T>::threadpool( int actor_model, connection_pool *connPool, int thread_number, int max_requests) : m_actor_model(actor_model),m_thread_number(thread_number), m_max_requests(max_requests), m_threads(NULL),m_connPool(connPool)
{
    if (thread_number <= 0 || max_requests <= 0)
        throw std::exception();
    m_threads = new pthread_t[m_thread_number];
    if (!m_threads)
        throw std::exception();
    for (int i = 0; i < thread_number; ++i)
    {
        if (pthread_create(m_threads + i, NULL, worker, this) != 0)
        {
            delete[] m_threads;
            throw std::exception();
        }
        if (pthread_detach(m_threads[i]))
        {
            delete[] m_threads;
            throw std::exception();
        }
    }
}

向请求队列中添加任务

通过list容器创建请求队列,向队列中添加时,通过互斥锁保证线程安全,添加完成后通过信号量提醒有任务要处理,最后注意线程同步。

代码语言:javascript
复制
template <typename T>
bool threadpool<T>::append(T *request, int state)
{
    m_queuelocker.lock();
    if (m_workqueue.size() >= m_max_requests)
    {
        m_queuelocker.unlock();
        return false;
    }
    request->m_state = state;
    m_workqueue.push_back(request);
    m_queuelocker.unlock();
    m_queuestat.post();
    return true;
}

线程处理函数

内部访问私有成员函数run,完成线程处理要求。

代码语言:javascript
复制
template <typename T>
void *threadpool<T>::worker(void *arg)
{
    threadpool *pool = (threadpool *)arg;
    pool->run();
    return pool;
}

run执行任务

主要实现,工作线程从请求队列中取出某个任务进行处理,注意线程同步。

代码语言:javascript
复制
template <typename T>
void threadpool<T>::run()
{
    while (true)
    {
        m_queuestat.wait();
        m_queuelocker.lock();
        if (m_workqueue.empty())
        {
            m_queuelocker.unlock();
            continue;
        }
        T *request = m_workqueue.front();
        m_workqueue.pop_front();
        m_queuelocker.unlock();
        if (!request)
            continue;
        if (1 == m_actor_model)
        {
            if (0 == request->m_state)
            {
                if (request->read_once())
                {
                    request->improv = 1;
                    connectionRAII mysqlcon(&request->mysql, m_connPool);
                    request->process();
                }
                else
                {
                    request->improv = 1;
                    request->timer_flag = 1;
                }
            }
            else
            {
                if (request->write())
                {
                    request->improv = 1;
                }
                else
                {
                    request->improv = 1;
                    request->timer_flag = 1;
                }
            }
        }
        else
        {
            connectionRAII mysqlcon(&request->mysql, m_connPool);
            request->process();
        }
    }
}
本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2025-06-10,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 服务器编程基本框架
  • 五种I/O模型
  • 事件处理模式
    • reactor模式
    • proactor模式
  • 同步I/O模拟proactor模式
  • 并发编程模式
    • 半同步/半反应堆
    • pthread_create陷阱
      • this指针的锅
  • 线程池分析
    • 线程池类定义
    • 线程池创建与回收
    • 向请求队列中添加任务
    • 线程处理函数
    • run执行任务
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档