前往小程序,Get更优阅读体验!
立即前往
发布
社区首页 >专栏 >初识Linux · 线程池

初识Linux · 线程池

作者头像
_lazy
发布2024-12-20 08:52:20
发布2024-12-20 08:52:20
6100
代码可运行
举报
文章被收录于专栏:Initial programmingInitial programming
运行总次数:0
代码可运行

前言:

前文我们介绍了基于线程同步和互斥两种关系的一种模型->生产消费模型,那么之前在学习进程的时候我们已经编写过了进程池,同理,学习线程的时候我们也要编写线程池。

那么对于线程池的编写,我们也不废话,直接进入主题。


thread pool成员变量分析

对于一个线程池来说,锁和条件变量肯定是少不了的,对于线程处理的任务来说,也需要一个队列用来表示任务队列,我们可以用环形队列也可以使用阻塞队列,这里我们就使用阻塞队列好了。

那么线程池存在那么多的线程,谁工作了谁休眠了总得有个数吧?所以我们不妨设置一个sleep_thread_num和一个thread_num,用来表示线程池中线程的情况。

最后不妨使用一个变量用来表示线程池是用运作起来了:

代码语言:javascript
代码运行次数:0
复制
template <typename T>
class ThreadPool
{
public: 
    


private:
    int _thread_num;
    int _sleep_thread_num;
    std::vector<Thread> _threads;
    std::queue<T> _task_queue;
    pthread_mutex_t _mutex;
    pthread_cond_t _cond;
    bool _isrunning;
};

对于这里来说,我们不妨使用一下自己刚编写的线程库。

成员变量有了,接下来就是接口编写了。


接口编写

一个类说最重要的接口是构造函数不过分吧?所以对于构造函数俩说,锁和条件变量因为都是局部的,所以我们使用函数pthread_mutex_init这种函数进行解决。

代码语言:javascript
代码运行次数:0
复制
    ThreadPool(int thread_num = gthread_num)
        :_thread_num(thread_num), _isrunning(false), _sleep_thread_num(0)
    {
        pthead_mutex_init(&_mutex);
        pthead_cond_init(&_cond);

    }
    ~ThreadPool()
    {
        pthread_mutex_destroy(&_mutex);
        pthread_cond_destroy(&_cond);
    }

对于两个表示线程数量的线程成员,一个最开始设置为默认值,当然,用户也可以自己传过来值初始化这个线程,其次是互斥锁和条件变量的初始化,对于任务队列我们不急着初始化,到时候使用到了再初始化也不迟。

其次是线程池有没有running起来,刚构造线程池的时候连线程都没有创建呢,也就不可能将参数设置为true了。

接下来的函数就是,线程池构造好了,得对任务队列初始化吧?既然是对任务队列初始化,那么处理任务的函数我们应该要有吧?有了处理任务的函数,说白了,必要条件还得是让thread创建出来吧?所以需要使用到我们自己封装的线程的start的。

当然了,还有更多的函数,我们先来解决任务处理函数吧:

代码语言:javascript
代码运行次数:0
复制
   void HandlerTask(const std::string &name) // this
    {
        while (true)
        {
            // 取任务
            LockQueue();
            while (IsEmpty() && _isrunning)
            {
                _sleep_thread_num++;
                LOG(INFO, "%s thread sleep begin!\n", name.c_str());
                Sleep();
                LOG(INFO, "%s thread wakeup!\n", name.c_str());
                _sleep_thread_num--;
            }
            // 判定一种情况
            if (IsEmpty() && !_isrunning)
            {
                UnlockQueue();
                LOG(INFO, "%s thread quit\n", name.c_str());
                break;
            }

            // 有任务
            T t = _task_queue.front();
            _task_queue.pop();
            UnlockQueue();

            // 处理任务
            t(); // 处理任务,此处不用/不能在临界区中处理
            // std::cout << name << ": " << t.result() << std::endl;
            LOG(DEBUG, "hander task done, task is : %s\n", t.result().c_str());
        }
    }

对于init函数,我们得保证绑定的函数是我们想要的,所以我们使用一下C++11里面新语法,bind:

代码语言:javascript
代码运行次数:0
复制
void Init()
{
    func_t func = std::bind(&ThreadPool::HandlerTask, this, std::placeholders::_1);
    for (int i = 0; i < _thread_num; i++)
    {
        std::string threadname = "thread-" + std::to_string(i + 1);
        _threads.emplace_back(threadname, func);
    }
}

其中bind的用法如果是忘记了的,不妨复习一下。

现在顺序表里面存的是线程,里面插入了对应的线程名字,那么我们还得start吧?虽然我们加入了名字,但是甚至没有phread_create,所以线程是没有创建的,那么肯定就要用到对应的start函数:

代码语言:javascript
代码运行次数:0
复制
void Start()
{
    _isrunning = true;
    for (auto &thread : _threads)
    {
        LOG(DEBUG, "start thread %s done.\n", thread.Name().c_str());
        thread.Start();
    }
}

同理,有了start就应该由stop函数:

代码语言:javascript
代码运行次数:0
复制
    void Stop()
    {
        LockQueue();
        _isrunning = false;
        WakeupAll();
        UnlockQueue();
        LOG(INFO, "Thread Pool Stop Success!\n");
    }

其中应该_isrunning是一个临界资源,所以需要加锁。

其中部分小函数为:

代码语言:javascript
代码运行次数:0
复制
void LockQueue()
    {
        pthread_mutex_lock(&_mutex);
    }
    void UnlockQueue()
    {
        pthread_mutex_unlock(&_mutex);
    }
    void Wakeup()
    {
        pthread_cond_signal(&_cond);
    }
    void WakeupAll()
    {
        pthread_cond_broadcast(&_cond);
    }
    void Sleep()
    {
        pthread_cond_wait(&_cond, &_mutex);
    }
    bool IsEmpty()
    {
        return _task_queue.empty();
    }

加任务为:

代码语言:javascript
代码运行次数:0
复制
void Equeue(const T &in)
{
    LockQueue();
    if (_isrunning)
    {
        _task_queue.push(in);
        if (_sleep_thread_num > 0)
            Wakeup();
    }
    UnlockQueue();
}

对于日志的编写已经读写锁和自旋锁的问题我们下两篇文章介绍,


感谢阅读!

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2024-12-20,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 前言:
  • thread pool成员变量分析
  • 接口编写
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档