看武侠片的时候,经常会看到某个倒霉蛋,不小心拉到了哪根线,然后就出现了漫天箭雨,运气不好就射成箭猪了,没被射死呢,又飞来一堆木头,没点本事就被击飞了,就算躲过一劫,头上也回掉个笼子下来。
这一切的一切,都是因为那个倒霉蛋碰了那根不该碰的线,从而引发了后面一系列的事情。 也可以说,后面一系列机关,都在“盯着”这根线。 这就是观察者模式,那根线,就是“被观察者”,而那些机关,就是“观察者”。
这中间就有些难题了。
其实也很好办,用多线程就好,这个线程,可以是自己设定的,如线程池;也可以是系统设定的,像一些触发信号。这里我们就用线程池来聊聊这个观察者模式。
这次不是伪代码,我一般很少放伪代码。
//Pthread_pool.h
#pragma once
#include <pthread.h>
#include <unistd.h>
#include <list> //据说list不安全,不安全就不安全吧,更不安全的都忍了
#include "Cond.h" //封装过的条件变量类,继承自封装的mutex锁类,所以具有锁和条件变量的双重属性
using namespace std;
class Task //任务接口,每个任务必须实现的接口,以供工作线程调度任务的执行
{
public:
Task() {}
virtual ~Task() {}
virtual int run() = 0; //留给子类实现
};
typedef list<Task*> list_task; //任务队列,用于暂存等待处理的任务,等待线程唤醒时处理,提供一种缓冲机制。
class Pthread_Pool //线程池类
{
public:
Pthread_Pool(unsigned int max = 100, unsigned int min = 10, unsigned int wait = 60);
~Pthread_Pool();
void addTask(Task* task); // 往任务队列中添加新线程
private:
static void* taskThread(void* arg);// 工作线程
void createThread(); // 新建一个线程
void destroyThread(); // 销毁一个线程池
unsigned int maxcount; // 最大线程数
unsigned int mincount; // 最小线程数
unsigned int count; // 当前线程池中线程数
unsigned int waitcount; // 等待线程数
unsigned int waitsec; // 等待时间
list_task taskList; //任务队列
Cond taskCond; //任务锁,线程接任务时使用
Cond cond; //线程锁,创建线程时使用
bool Stop; //线程池是否被允许运作,初始化线程池对象时置0,线程池销毁时置为1
};
#include "Pthread_Pool.h"
//开放接口1
Pthread_Pool::Pthread_Pool(unsigned int max, unsigned int min, unsigned int wait)
{
//配置基本参数
count = 0; //当前线程池为空
waitcount = 0; //没有等待线程
mincount = min; //核心线程数(出厂配置)
maxcount = max; //最大线程数(能承受的最高配置)
waitsec = wait; //线程保活时长(过了时长还没接到任务,那就裁掉)
Stop = false; //允许运作
//上锁,创建一定数量的线程作为初始线程池
cond.lock();
for (unsigned i = 0; i < mincount; i++)
{
createThread(); //跳转到这个函数的实现->->->->->
}
cond.unlock();
}
Pthread_Pool::~Pthread_Pool()
{
destroyThread(); //销毁线程池
}
void Pthread_Pool::createThread()
{
pthread_t tid;
int ret = pthread_create(&tid, NULL, taskThread, (void*)this);
//以执行taskThread()为目的创建线程,跳转到taskThread()函数的实现 ->->->->->
if (ret < 0)
perror("pthread create error");
else
count++;
}
// 工作线程
void* Pthread_Pool::taskThread(void* arg)
{
pthread_detach(pthread_self()); //设置线程自分离属性
Pthread_Pool* pool = (Pthread_Pool*)arg;
while (1)
{
pool->cond.lock();
//如果没有工作线程在等待
if (pool->taskList.empty())
{
if (pool->Stop) //当收到线程池停止运行的消息时
{
pool->count--; //线程数减一
pool->cond.unlock();
pthread_exit(NULL); //本线程强制退出
}
pool->waitcount++; //等待任务的线程数加一
bool bSignal = pool->cond.timewait(pool->waitsec); //新任务等待被唤醒
pool->waitcount--; //没等到,没事干,喝西北风了
// 删除无用线程
if (!bSignal && pool->count > pool->mincount) //如果没事干 && 有多余线程
{
pool->count--; //先裁员一个,不要一次做绝了,反正是在while循环里面,没事干裁员机会多得是
pool->cond.unlock();
pthread_exit(NULL);
}
}
pool->cond.unlock(); //记得要释放锁
//如果有工作线程在等待
if (!pool->taskList.empty())
{
pool->taskCond.lock(); //上任务锁
Task* t = pool->taskList.front(); //获取任务队列中最前端的任务并执行
pool->taskList.pop_front(); //移除被领取的任务
pool->taskCond.unlock();//记得解锁
t->run(); //任务开始
delete t; //弄完就删了
}
}
pthread_exit(NULL);
}
//开放接口2,向任务队列中添加任务
void Pthread_Pool::addTask(Task* task)
{
if (Stop) //线程池是否停止工作
return;
//向任务队列中添加新任务
taskCond.lock(); //上任务锁
taskList.push_back(task); //添加任务
taskCond.unlock(); //记得解锁
cond.lock(); //上线程锁
if (waitcount) //如果有空闲线程
{
cond.signal(); //唤醒一个线程
}
else if (count < maxcount) //如果没有空闲线程,一般来说,走到这里面来,那这个线程池的设计是有点失败了
{
createThread(); //那就创建一个
cond.signal(); //然后唤醒
}
cond.unlock();
}
void Pthread_Pool::destroyThread()
{
printf("destroy?\n");
#if 0 //强行清理
list_task::iterator it = taskList.begin();
for (; it!= taskList.end(); it++)
{
Task* t = *it;
delete t;
t = NULL;
}
taskList.clear();
#endif
// 等待所有线程执行完毕
Stop = true;
while (count > 0)
{
cond.lock();
cond.broadcast(); //广播
cond.unlock();
sleep(1);
}
}
这里面还配置了保证线程同步的锁,而观察者模式的唤醒,即采用条件变量来唤醒,一旦有任务的到来,会判断是否有空余线程,如果有,就直接唤醒一个去处理,如果没有,就会加入到任务队列中去。
先看吧,如果看不懂的话可以在下面评论,观察者模式是需要一些多线程基础的。
观察者模式需要考虑一下开发和运行效率问题。
其实这都不是大问题,现在哪个项目不用多线程来跑啊,总不能因为调试困难就因噎废食吧。
它和责任链最大的区别就是,观察者广播链在传递的过程中,是可以被改变的,而且传播方向千变万化。
比方说我在ATM取钱,由于多次按错,卡被吞了,在这一瞬间要发生什么事情呢?1、摄像头连续抓拍。2、通知监控系统,有人被吞卡。3、初始化ATM屏幕,回到初始状态。
这个模式嘛,没必要刻意去记,到该用的时候,自然而然就会去用,不过,没有多线程基础的话还是先学学多线程基础吧。