前文我们介绍了基于线程同步和互斥两种关系的一种模型->生产消费模型,那么之前在学习进程的时候我们已经编写过了进程池,同理,学习线程的时候我们也要编写线程池。
那么对于线程池的编写,我们也不废话,直接进入主题。
对于一个线程池来说,锁和条件变量肯定是少不了的,对于线程处理的任务来说,也需要一个队列用来表示任务队列,我们可以用环形队列也可以使用阻塞队列,这里我们就使用阻塞队列好了。
那么线程池存在那么多的线程,谁工作了谁休眠了总得有个数吧?所以我们不妨设置一个sleep_thread_num和一个thread_num,用来表示线程池中线程的情况。
最后不妨使用一个变量用来表示线程池是用运作起来了:
template <typename T>
class ThreadPool
{
public:
private:
int _thread_num;
int _sleep_thread_num;
std::vector<Thread> _threads;
std::queue<T> _task_queue;
pthread_mutex_t _mutex;
pthread_cond_t _cond;
bool _isrunning;
};
对于这里来说,我们不妨使用一下自己刚编写的线程库。
成员变量有了,接下来就是接口编写了。
一个类说最重要的接口是构造函数不过分吧?所以对于构造函数俩说,锁和条件变量因为都是局部的,所以我们使用函数pthread_mutex_init这种函数进行解决。
ThreadPool(int thread_num = gthread_num)
:_thread_num(thread_num), _isrunning(false), _sleep_thread_num(0)
{
pthead_mutex_init(&_mutex);
pthead_cond_init(&_cond);
}
~ThreadPool()
{
pthread_mutex_destroy(&_mutex);
pthread_cond_destroy(&_cond);
}
对于两个表示线程数量的线程成员,一个最开始设置为默认值,当然,用户也可以自己传过来值初始化这个线程,其次是互斥锁和条件变量的初始化,对于任务队列我们不急着初始化,到时候使用到了再初始化也不迟。
其次是线程池有没有running起来,刚构造线程池的时候连线程都没有创建呢,也就不可能将参数设置为true了。
接下来的函数就是,线程池构造好了,得对任务队列初始化吧?既然是对任务队列初始化,那么处理任务的函数我们应该要有吧?有了处理任务的函数,说白了,必要条件还得是让thread创建出来吧?所以需要使用到我们自己封装的线程的start的。
当然了,还有更多的函数,我们先来解决任务处理函数吧:
void HandlerTask(const std::string &name) // this
{
while (true)
{
// 取任务
LockQueue();
while (IsEmpty() && _isrunning)
{
_sleep_thread_num++;
LOG(INFO, "%s thread sleep begin!\n", name.c_str());
Sleep();
LOG(INFO, "%s thread wakeup!\n", name.c_str());
_sleep_thread_num--;
}
// 判定一种情况
if (IsEmpty() && !_isrunning)
{
UnlockQueue();
LOG(INFO, "%s thread quit\n", name.c_str());
break;
}
// 有任务
T t = _task_queue.front();
_task_queue.pop();
UnlockQueue();
// 处理任务
t(); // 处理任务,此处不用/不能在临界区中处理
// std::cout << name << ": " << t.result() << std::endl;
LOG(DEBUG, "hander task done, task is : %s\n", t.result().c_str());
}
}
对于init函数,我们得保证绑定的函数是我们想要的,所以我们使用一下C++11里面新语法,bind:
void Init()
{
func_t func = std::bind(&ThreadPool::HandlerTask, this, std::placeholders::_1);
for (int i = 0; i < _thread_num; i++)
{
std::string threadname = "thread-" + std::to_string(i + 1);
_threads.emplace_back(threadname, func);
}
}
其中bind的用法如果是忘记了的,不妨复习一下。
现在顺序表里面存的是线程,里面插入了对应的线程名字,那么我们还得start吧?虽然我们加入了名字,但是甚至没有phread_create,所以线程是没有创建的,那么肯定就要用到对应的start函数:
void Start()
{
_isrunning = true;
for (auto &thread : _threads)
{
LOG(DEBUG, "start thread %s done.\n", thread.Name().c_str());
thread.Start();
}
}
同理,有了start就应该由stop函数:
void Stop()
{
LockQueue();
_isrunning = false;
WakeupAll();
UnlockQueue();
LOG(INFO, "Thread Pool Stop Success!\n");
}
其中应该_isrunning是一个临界资源,所以需要加锁。
其中部分小函数为:
void LockQueue()
{
pthread_mutex_lock(&_mutex);
}
void UnlockQueue()
{
pthread_mutex_unlock(&_mutex);
}
void Wakeup()
{
pthread_cond_signal(&_cond);
}
void WakeupAll()
{
pthread_cond_broadcast(&_cond);
}
void Sleep()
{
pthread_cond_wait(&_cond, &_mutex);
}
bool IsEmpty()
{
return _task_queue.empty();
}
加任务为:
void Equeue(const T &in)
{
LockQueue();
if (_isrunning)
{
_task_queue.push(in);
if (_sleep_thread_num > 0)
Wakeup();
}
UnlockQueue();
}
对于日志的编写已经读写锁和自旋锁的问题我们下两篇文章介绍,
感谢阅读!