生产者-消费者模型(Producer-Consumer Model) 是多线程编程中的经典并发控制模型,主要用于协调多个线程之间的数据访问,防止竞争条件(race condition)和资源浪费,提高程序的并发能力。 抛开概念,我们用生活中的例子来举例——超市就是最好的例子。 超市充当着生产商和消费者的中间资源。 超市从生产商进货,生产商需要向超市提供货物,消费者在超市购物,超市需要向消费者提供商品。 如此一来,超市就成立生产者和消费者之间的桥梁。消费者就和生产者有一定的隔离,解决了生产者与消费者之间的强耦合。 得益于超市做缓冲区,整个生产消费的过程十分的高效,即便消费者没有在超市找到想要的商品,也可以借助超市向生产者进行反馈,从而做到生产对应的商品,也就是允许生产消费步调不一致。 生产消费者模型的本质就是:忙闲不均. 同时我们要知道超市不可能只面向单一的生产消费者,无论是哪一个,超市都会面向多个。也就是说,超市会被多个生产者消费者看到。 那么生产者、消费者间排列组合有什么关系呢? 生产者与生产者 还是以超市为例,多个生产者间存在互斥的关系,每个生产者都希望自己的产品能更多的出现到超市中,可以超市的空间始终是有限的,一个产品多了势必会影响到另一份的产品。比如可口与百事,统一与康师傅。 由此再一次得出结论:生产者与生产者存在互斥关系 消费者与消费者 假设超市准备打烊了,此时进来了两名顾客,他们看上了同一件商品,但是该商品以及卖的只省一件了,那么两名顾客必然就存在竞争的关系,也就是互斥。 由此得出:消费者与消费者间存在互斥关系. 生产者与消费者 依旧是同一个超市,某天顾客A进入超市打算购买商品A,但是此时的商品A已经卖完了,于是呢顾客A就去通知超市去备货,然后顾客A就离开了,走了一段路后,顾客A心想一定要买到商品A,于是又返回超市询问老板,商品A到了没有,可是还没有到。于是一整天的时间顾客A就在超市间往返,超市老板看他这么辛苦便要了顾客A的联系方式,声称一有货就通知他,于是顾客便回家安心等待了。 这个故事告诉我们什么呢? 超市老板添加顾客A的联系方式是为了将商品信息同步给生产者,这表明了生产者与消费者之间存在同步关系.同时,在超市的备货期间顾客是不能进行消费的,这表明了生产者与消费者之间存在互斥关系 由此得出:生产者与消费者间存在同步、互斥关系.
生产消费者模型的最根本特点——321原则 3种关系:
该模型通常包括三个核心组件:
阻塞队列Blocking Queue
是一种特殊的队列,在具有先进先出的基础上,还拥有队列大小的固定的这一特点。
堵塞队列可以为满,也可以为空。
首先我们肯定要创建一个堵塞队列的类。
那么这个堵塞队列应该具有什么样的属性呢?
从功能上出发,我们的堵塞队列需要完成的任务有:生产者向堵塞队列中入队,消费者从堵塞队列中出队,也就是说我们的类必须具有插入Push
和删除Pop
操作。同时因为阻塞队列是定长的,我能需要判断阻塞队列是否为空为满,那么IsFull和IsEmpty
操作也是必要的。
了解我方法后,类的属性其实也出来了。因为阻塞队列是一个队列,我们可以调用STL
的queue
来充当属性之一,除此之外还有定长的属性我还需要定义一个变量cap
来确定队列的长度。最后因为阻塞队列是一个公共资源,我们需要加锁mutex
也是必要的,同时我们因为我们在队列中有数据时需要通知消费者在队列中没有数据时选哟通知生产者,也就表明我们还需要两个条件变量cond
。
分析完后,阻塞队列具有的属性就包括了以下内容:
方法:
BlockQueue.hpp
#pragma once
#include <mutex>
#include <pthread.h>
#include <unistd.h>
#include <queue>
#define NUM 5
using namespace std;
namespace yui
{
template<class T>
class BlockQueue{
private:
bool isFull(){
return _blockQueue.size() == _cap;
}
bool isEmpty(){
return _blockQueue.empty();
}
public:
BlockQueue(int cap = NUM):_cap(cap)
{
pthread_mutex_init(&_mutex,nullptr);
pthread_cond_init(&_producer,nullptr);
pthread_cond_init(&_consumer,nullptr);
}
void push(const T& inData){
//访问公共资,加锁
pthread_mutex_lock(&_mutex);
//开始判断阻塞队列是否为满
while(isFull()){
//如果队列为满,生产者就必须开始等待
// pthread_cond_signal(&_consumer);
pthread_cond_wait(&_producer,&_mutex);
}
//运行到这里队列肯定没满,将数据入队,通知消费者消费。
_blockQueue.push(inData);
pthread_cond_signal(&_consumer);
pthread_mutex_unlock(&_mutex);//解锁
}
void pop(T* outData){
//加锁
pthread_mutex_lock(&_mutex);
while(isEmpty()){
// pthread_cond_signal(&_producer);
pthread_cond_wait(&_consumer,&_mutex);
}
*outData = _blockQueue.front();
_blockQueue.pop();
pthread_cond_signal(&_producer);
pthread_mutex_unlock(&_mutex);
}
~BlockQueue()
{
pthread_mutex_destroy(&_mutex);
pthread_cond_destroy(&_producer);
pthread_cond_destroy(&_consumer);
}
private:
queue<T> _blockQueue; // 阻塞队列内核
int _cap; // 队列长度
pthread_mutex_t _mutex; //互斥锁
pthread_cond_t _producer;
pthread_cond_t _consumer;
};
}
main.cc
#include <iostream>
#include <cstdlib>
#include <ctime>
#include "BlockQueue.hpp"
using namespace std;
void *runProducer(void *arg)
{
yui::BlockQueue<int> *bq = static_cast<yui::BlockQueue<int> *>(arg);
while (true)
{
//sleep(1);
int num = rand() % 100;
bq->push(num);
cout << "生产者生产了一个数据:" << num << endl;
cout << "--------------------------" << endl;
}
pthread_exit((void *)0); // 退出线程
}
void *runConsumer(void *arg)
{
yui::BlockQueue<int> *bq = static_cast<yui::BlockQueue<int> *>(arg);
while (true)
{
// sleep(1);
int num = 0;
bq->pop(&num);
cout << "消费者消费了一个数据:" << num << endl;
cout << "-------------------------" << endl;
}
pthread_exit((void *)0); // 退出线程
}
int main()
{
srand((unsigned int)time(nullptr));
yui::BlockQueue<int> *bq = new yui::BlockQueue<int>(5); // 创建阻塞队列
// 创建生产消费者线程
pthread_t producer;
pthread_t consumer;
if (pthread_create(&producer, nullptr, runProducer, bq) < 0)
{
perror("线程创建失败");
return 1;
}
if (pthread_create(&consumer, nullptr, runConsumer, bq) < 0)
{
perror("线程创建失败");
return 1;
}
if(pthread_join(producer,nullptr)<0){
perror("线程回收失败!");
return 1;
}
if(pthread_join(consumer,nullptr)<0){
perror("线程回收失败!");
return 1;
}
delete bq;
return 0;
}
运行结果:
通过上面的运行结果,我们可以看到生产者疯狂地生产,消费者疯狂地消费。这样地结果不方便我们观察堵塞队列地特点,为此我们可以通过休眠的方式来仔细观察。 方法1:消费者每隔一秒消费一次,生产者疯狂生产 修改代码:
void *runProducer(void *arg)
{
yui::BlockQueue<int> *bq = static_cast<yui::BlockQueue<int> *>(arg);
while (true)
{
//sleep(1);
int num = rand() % 100;
bq->push(num);
cout << "生产者生产了一个数据:" << num << endl;
cout << "--------------------------" << endl;
}
pthread_exit((void *)0); // 退出线程
}
void *runConsumer(void *arg)
{
yui::BlockQueue<int> *bq = static_cast<yui::BlockQueue<int> *>(arg);
while (true)
{
sleep(1);
int num = 0;
bq->pop(&num);
cout << "消费者消费了一个数据:" << num << endl;
cout << "-------------------------" << endl;
}
pthread_exit((void *)0); // 退出线程
}
运行结果:
阻塞队列大小是5,由此生产者生产5次后,就进入堵塞状态了,之后生产者与消费者步调一致,消费者每消费一个生产者就生产一个。 注意:此时消费的数据,是阻塞队列中队头的数据,也就是最先生产的数据。 策略2:生产者每隔一秒生产一次,消费者疯狂消费 预期结果为 刚开始阻塞队列为空,消费者无法进行消费,只能阻塞等待,一秒后,生产者生产了一个数据,并立即通知消费者进行消费,两者协同工作,消费者消费的就是生产者刚刚生产的数据 修改代码:
void *runProducer(void *arg)
{
yui::BlockQueue<int> *bq = static_cast<yui::BlockQueue<int> *>(arg);
while (true)
{
sleep(1);
int num = rand() % 100;
bq->push(num);
cout << "生产者生产了一个数据:" << num << endl;
cout << "--------------------------" << endl;
}
pthread_exit((void *)0); // 退出线程
}
void *runConsumer(void *arg)
{
yui::BlockQueue<int> *bq = static_cast<yui::BlockQueue<int> *>(arg);
while (true)
{
// sleep(1);
int num = 0;
bq->pop(&num);
cout << "消费者消费了一个数据:" << num << endl;
cout << "-------------------------" << endl;
}
pthread_exit((void *)0); // 退出线程
}
符合预期。
关于多生产消费模型,我们需要对上面的代码进行大改吗?
答案是完全不需要,我们刚刚写的代码就可以满足多生产消费的情境。
可能有些读者会认为需要使用到pthread_cond_broadcast
来唤醒所有线程。其实是不必的,假设在只生产了一个数据的情况下,唤醒所有的线程,会导致只有一个线程进行合法操作,其他线程都是非法操作了。
int main()
{
srand((unsigned int)time(nullptr));
yui::BlockQueue<int> *bq = new yui::BlockQueue<int>(5); // 创建阻塞队列
// 创建生产消费者线程
pthread_t pro[2];
pthread_t con[3];
for(int i = 0; i < 2; i++)
pthread_create(pro + i, nullptr, runProducer, bq);
for(int i = 0; i < 3; i++)
pthread_create(con + i, nullptr, runConsumer, bq);
for(int i = 0; i < 2; i++)
pthread_join(pro[i], nullptr);
for(int i = 0; i < 3; i++)
pthread_join(con[i], nullptr);
delete bq;
return 0;
}
Task.hpp
#pragma once
enum{
ZERODIVERROR = 1,
CHARERROR
};
template<class T>
class Task{
public:
Task(T a,T b,char ch):_a(a),_b(b),_ch(ch)
{}
Task(){}
~Task()
{}
int calculator(T& ret){
int flag = 0;
switch (_ch)
{
case '+':
ret = _a+_b;
break;
case '-':
ret = _a-_b;
break;
case '*':
ret = _a*_b;
break;
case '/':
if(_b != 0)
ret = _a/_b;
else
flag = ZERODIVERROR;
break;
default:
flag = CHARERROR;
break;
}
return flag;
}
public:
T _a;
T _b;
char _ch;
};
BlockQueue.hpp
#pragma once
#include <mutex>
#include <pthread.h>
#include <unistd.h>
#include <queue>
#define NUM 5
using namespace std;
namespace yui
{
template<class T>
class BlockQueue{
private:
bool isFull(){
return _blockQueue.size() == _cap;
}
bool isEmpty(){
return _blockQueue.empty();
}
public:
BlockQueue(int cap = NUM):_cap(cap)
{
pthread_mutex_init(&_mutex,nullptr);
pthread_cond_init(&_producer,nullptr);
pthread_cond_init(&_consumer,nullptr);
}
void push(const T& inData){
//访问公共资,加锁
pthread_mutex_lock(&_mutex);
//开始判断阻塞队列是否为满
while(isFull()){
//如果队列为满,生产者就必须开始等待
// pthread_cond_signal(&_consumer);
pthread_cond_wait(&_producer,&_mutex);
}
//运行到这里队列肯定没满,将数据入队,通知消费者消费。
_blockQueue.push(inData);
pthread_cond_signal(&_consumer);
pthread_mutex_unlock(&_mutex);//解锁
}
void pop(T* outData){
//加锁
pthread_mutex_lock(&_mutex);
while(isEmpty()){
// pthread_cond_signal(&_producer);
pthread_cond_wait(&_consumer,&_mutex);
}
*outData = _blockQueue.front();
_blockQueue.pop();
pthread_cond_signal(&_producer);
pthread_mutex_unlock(&_mutex);
}
~BlockQueue()
{
pthread_mutex_destroy(&_mutex);
pthread_cond_destroy(&_producer);
pthread_cond_destroy(&_consumer);
}
private:
queue<T> _blockQueue; // 阻塞队列内核
int _cap; // 队列长度
pthread_mutex_t _mutex; //互斥锁
pthread_cond_t _producer;
pthread_cond_t _consumer;
};
}
main.cc
#include <iostream>
#include <cstdlib>
#include <ctime>
#include "BlockQueue.hpp"
#include "task.hpp"
using namespace std;
char chs[4] = {'+', '-', '*', '/'};
void *runProducer(void *arg)
{
yui::BlockQueue<Task<int>> *bq = static_cast<yui::BlockQueue<Task<int>> *>(arg);
while (true)
{
sleep(1);
int a = rand() % 100;
int b = rand() % 100;
char c = chs[rand() % 4];
bq->push(Task<int>(a, b, c));
cout << "生产者生产了一个问题:" << a << c << b << "= ?" << endl;
cout << "--------------------------" << endl;
}
pthread_exit((void *)0); // 退出线程
}
void *runConsumer(void *arg)
{
yui::BlockQueue<Task<int>> *bq = static_cast<yui::BlockQueue<Task<int>> *>(arg);
while (true)
{
// sleep(1);
Task<int> *tmp = new Task<int>;
bq->pop(tmp);
int ret = 0;
if (tmp->calculator(ret) == 0)
{
cout << "消费者解决问题得:" << tmp->_a << tmp->_ch << tmp->_b << " = " << ret << endl;
cout << "--------------------------" << endl;
}
else if (tmp->calculator(ret) == ZERODIVERROR)
{
cout << "问题存在错误:" << tmp->_a << tmp->_ch << tmp->_b << " = ?" << "存在除0错误" << endl;
cout << "--------------------------" << endl;
}
else if (tmp->calculator(ret) == CHARERROR)
{
cout << "问题存在错误:" << tmp->_a << tmp->_ch << tmp->_b << " = ?" << "存在未定义字符:"<<tmp->_ch<< endl;
cout << "--------------------------" << endl;
}
}
pthread_exit((void *)0); // 退出线程
}
int main()
{
srand((unsigned int)time(nullptr));
yui::BlockQueue<Task<int>> *bq = new yui::BlockQueue<Task<int>>(5); // 创建阻塞队列
// 创建生产消费者线程
pthread_t producer;
pthread_t consumer;
if (pthread_create(&producer, nullptr, runProducer, bq) < 0)
{
perror("线程创建失败");
return 1;
}
if (pthread_create(&consumer, nullptr, runConsumer, bq) < 0)
{
perror("线程创建失败");
return 1;
}
if (pthread_join(producer, nullptr) < 0)
{
perror("线程回收失败!");
return 1;
}
if (pthread_join(consumer, nullptr) < 0)
{
perror("线程回收失败!");
return 1;
}
delete bq;
return 0;
}
这只是生产消费者模型的一个非常简单的应用,在以后的编程中,你可以把Task
类中的任务写成其他,比如网络请求、并行IO等等。