进程的多个线程共享 同一地址空间,因此Text Segment、Data Segment都是共享的,如果定义一个函数,在各线程中都可以调用,如果定义一个全局变量,在各线程中都可以访问到,除此之外,各线程还共享以下进程资源和环境:
进程和线程的关系如下图:
功能:创建一个新的线程
原型
int pthread_create(pthread_t *thread, const pthread_attr_t *attr, void *
(*start_routine)(void*), void *arg);
参数
thread:返回线程ID
attr:设置线程的属性,attr为NULL表示使用默认属性
start_routine:是个函数地址,线程启动后要执行的函数
arg:传给线程启动函数的参数
返回值:成功返回0;失败返回错误码
错误检查:
#include <unistd.h>
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <pthread.h>
void *rout(void *arg)
{
int i;
for (;;)
{
printf("I'am thread 1\n");
sleep(1);
}
}
int main(void)
{
pthread_t tid;
int ret;
if ((ret = pthread_create(&tid, NULL, rout, NULL)) != 0)
{
fprintf(stderr, "pthread_create : %s\n", strerror(ret));
exit(EXIT_FAILURE);
}
int i;
for (;;)
{
printf("I'am main thread\n");
sleep(1);
}
}
pthread_t pthread_self(void);
pthread_t 到底是什么类型呢?取决于实现。对于Linux目前实现的NPTL实现而言,pthread_t类型的线程ID,本质就是一个进程地址空间上的一个地址
如果需要只终止某个线程而不终止整个进程,可以有三种方法:
功能:线程终止
原型
void pthread_exit(void *value_ptr);
参数
value_ptr:value_ptr不要指向一个局部变量。
返回值:无返回值,跟进程一样,线程结束的时候无法返回到它的调用者(自身)
需要注意,pthread_exit或者return返回的指针所指向的内存单元必须是全局的或者是用malloc分配的,不能在线程函数的栈上分配,因为当其它线程得到这个返回指针时线程函数已经退出了
功能:取消一个执行中的线程
原型
int pthread_cancel(pthread_t thread);
参数
thread:线程ID
返回值:成功返回0;失败返回错误码
为什么需要线程等待?
功能:等待线程结束
原型
int pthread_join(pthread_t thread, void **value_ptr);
参数
thread:线程ID
value_ptr:它指向一个指针,后者指向线程的返回值
返回值:成功返回0;失败返回错误码
调用该函数的线程将挂起等待,直到id为thread的线程终止。thread线程以不同的方法终止,通过pthread_join得到的终止状态是不同的,总结如下:
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <pthread.h>
void *thread1(void *arg)
{
printf("thread 1 returning ... \n");
int *p = (int *)malloc(sizeof(int));
*p = 1;
return (void *)p;
}
void *thread2(void *arg)
{
printf("thread 2 exiting ...\n");
int *p = (int *)malloc(sizeof(int));
*p = 2;
pthread_exit((void *)p);
}
void *thread3(void *arg)
{
while (1)
{ //
printf("thread 3 is running ...\n");
sleep(1);
}
return NULL;
}
int main(void)
{
pthread_t tid;
void *ret;
// thread 1 return
pthread_create(&tid, NULL, thread1, NULL);
pthread_join(tid, &ret);
printf("thread return, thread id %X, return code:%d\n", tid, *(int *)ret);
free(ret);
// thread 2 exit
pthread_create(&tid, NULL, thread2, NULL);
pthread_join(tid, &ret);
printf("thread return, thread id %X, return code:%d\n", tid, *(int *)ret);
free(ret);
// thread 3 cancel by other
pthread_create(&tid, NULL, thread3, NULL);
sleep(3);
pthread_cancel(tid);
pthread_join(tid, &ret);
if (ret == PTHREAD_CANCELED)
printf("thread return, thread id %X, return code:PTHREAD_CANCELED\n", tid);
else
printf("thread return, thread id %X, return code:NULL\n", tid);
}
3.4.4 分离线程
int pthread_detach(pthread_t thread);
可以是线程组内其他线程对目标线程进行分离,也可以是线程自己分离:
pthread_detach(pthread_self());
joinable和分离是冲突的,一个线程不能既是joinable又是分离的
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <pthread.h>
void *thread_run(void *arg)
{
pthread_detach(pthread_self());
printf("%s\n", (char *)arg);
return NULL;
}
int main(void)
{
pthread_t tid;
if (pthread_create(&tid, NULL, thread_run, "thread1 run...") != 0)
{
printf("create thread error\n");
return 1;
}
int ret = 0;
sleep(1); // 很重要,要让线程先分离,再等待
if (pthread_join(tid, NULL) == 0)
{
printf("pthread wait success\n");
ret = 0;
}
else
{
printf("pthread wait failed\n");
ret = 1;
}
return ret;
}
进程线程间的互斥相关背景概念
互斥量mutex
// 操作共享变量会有问题的售票系统代码
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <pthread.h>
int ticket = 100;
void *route(void *arg)
{
char *id = (char *)arg;
while (1)
{
if (ticket > 0)
{
usleep(1000);
printf("%s sells ticket:%d\n", id, ticket);
ticket--;
}
else
{
break;
}
}
}
int main(void)
{
pthread_t t1, t2, t3, t4;
pthread_create(&t1, NULL, route, "thread 1");
pthread_create(&t2, NULL, route, "thread 2");
pthread_create(&t3, NULL, route, "thread 3");
pthread_create(&t4, NULL, route, "thread 4");
pthread_join(t1, NULL);
pthread_join(t2, NULL);
pthread_join(t3, NULL);
pthread_join(t4, NULL);
}
为什么可能无法获得争取结果?
取出ticket--部分的汇编代码
objdump -d a.out > test.objdump
152 40064b: 8b 05 e3 04 20 00 mov 0x2004e3(%rip),%eax # 600b34 <ticket>
153 400651: 83 e8 01 sub $0x1,%eax
154 400654: 89 05 da 04 20 00 mov %eax,0x2004da(%rip) # 600b34 <ticket>
- - 操作并不是原子操作,而是对应三条汇编指令:
要解决以上问题,需要做到三点:
要做到这三点,本质上就是需要一把锁。Linux上提供的这把锁叫互斥量
初始化互斥量有两种方法:
方法1,静态分配:
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER
方法2,动态分配:
int pthread_mutex_init(pthread_mutex_t *restrict mutex, const pthread_mutexattr_t *restrictattr);
参数:
mutex:要初始化的互斥量
attr:NULL
销毁互斥量需要注意:
int pthread_mutex_destroy(pthread_mutex_t *mutex);
int pthread_mutex_lock(pthread_mutex_t *mutex);
int pthread_mutex_unlock(pthread_mutex_t *mutex);
返回值:成功返回0,失败返回错误号
调用 pthread_ lock 时,可能会遇到以下情况:
改进上面的售票系统:
死锁是指在一组进程中的各个进程均占有不会释放的资源,但因互相申请被其他进程所站用不会释放的资源而处于的一种永久等待状态
int pthread_cond_init(pthread_cond_t *restrict cond,const pthread_condattr_t *restrictattr);
参数:
cond:要初始化的条件变量
attr:NULL
int pthread_cond_destroy(pthread_cond_t *cond)
int pthread_cond_wait(pthread_cond_t *restrict cond,pthread_mutex_t *restrict mutex);
参数:
cond:要在这个条件变量上等待
mutex:互斥量,后面详细解释
int pthread_cond_broadcast(pthread_cond_t *cond);
int pthread_cond_signal(pthread_cond_t *cond);
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <pthread.h>
pthread_cond_t cond;
pthread_mutex_t mutex;
void *r1(void *arg)
{
while (1)
{
pthread_cond_wait(&cond, &mutex);
printf("活动\n");
}
}
void *r2(void *arg)
{
while (1)
{
pthread_cond_signal(&cond);
sleep(1);
}
}
int main(void)
{
pthread_t t1, t2;
pthread_cond_init(&cond, NULL);
pthread_mutex_init(&mutex, NULL);
pthread_create(&t1, NULL, r1, NULL);
pthread_create(&t2, NULL, r2, NULL);
pthread_join(t1, NULL);
pthread_join(t2, NULL);
pthread_mutex_destroy(&mutex);
pthread_cond_destroy(&cond);
}
为什么 pthread_cond_wait 需要互斥量?
按照上面的说法,我们设计出如下的代码:先上锁,发现条件不满足,解锁,然后等待在条件变量上不就行了,如下代码:
// 错误的设计
pthread_mutex_lock(&mutex);
while (condition_is_false)
{
pthread_mutex_unlock(&mutex);
// 解锁之后,等待之前,条件可能已经满足,信号已经发出,但是该信号可能被错过
pthread_cond_wait(&cond);
pthread_mutex_lock(&mutex);
}
pthread_mutex_unlock(&mutex);
pthread_mutex_lock(&mutex);
while (条件为假)
pthread_cond_wait(cond, mutex);
修改条件
pthread_mutex_unlock(&mutex);
pthread_mutex_lock(&mutex);
设置条件为真
pthread_cond_signal(cond);
pthread_mutex_unlock(&mutex);
生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。这个阻塞队列就是用来给生产者和消费者解耦的
6.3 基于BlockingQueue的生产者消费者模型
在多线程编程中阻塞队列(Blocking Queue)是一种常用于实现生产者和消费者模型的数据结构。其与普通的队列区别在于,当队列为空时,从队列获取元素的操作将会被阻塞,直到队列中被放入了元素;当队列满时,往队列里存放元素的操作也会被阻塞,直到有元素被从队列中取出(以上的操作都是基于不同的线程来说的,线程在对阻塞队列进程操作时会被阻塞)
#include <iostream>
#include <queue>
#include <stdlib.h>
#include <pthread.h>
#define NUM 8
class BlockQueue
{
private:
std::queue<int> q;
int cap;
pthread_mutex_t lock;
pthread_cond_t full;
pthread_cond_t empty;
private:
void LockQueue()
{
pthread_mutex_lock(&lock);
}
void UnLockQueue()
{
pthread_mutex_unlock(&lock);
}
void ProductWait()
{
pthread_cond_wait(&full, &lock);
}
void ConsumeWait()
{
pthread_cond_wait(&empty, &lock);
}
void NotifyProduct()
{
pthread_cond_signal(&full);
}
void NotifyConsume()
{
pthread_cond_signal(&empty);
}
bool IsEmpty()
{
return (q.size() == 0 ? true : false);
}
bool IsFull()
{
return (q.size() == cap ? true : false);
}
public:
BlockQueue(int _cap = NUM) : cap(_cap)
{
pthread_mutex_init(&lock, NULL);
pthread_cond_init(&full, NULL);
pthread_cond_init(&empty, NULL);
}
void PushData(const int &data)
{
LockQueue();
while (IsFull())
{
NotifyConsume();
std::cout << "queue full, notify consume data, product stop." << std::endl;
ProductWait();
}
q.push(data);
// NotifyConsume();
UnLockQueue();
}
void PopData(int &data)
{
LockQueue();
while (IsEmpty())
{
NotifyProduct();
std::cout << "queue empty, notify product data, consume stop." << std::endl;
ConsumeWait();
}
data = q.front();
q.pop();
// NotifyProduct();
UnLockQueue();
}
~BlockQueue()
{
pthread_mutex_destroy(&lock);
pthread_cond_destroy(&full);
pthread_cond_destroy(&empty);
}
};
void *consumer(void *arg)
{
BlockQueue *bqp = (BlockQueue *)arg;
int data;
for (;;)
{
bqp->PopData(data);
std::cout << "Consume data done : " << data << std::endl;
}
}
// more faster
void *producter(void *arg)
{
BlockQueue *bqp = (BlockQueue *)arg;
srand((unsigned long)time(NULL));
for (;;)
{
int data = rand() % 1024;
bqp->PushData(data);
std::cout << "Prodoct data done: " << data << std::endl;
// sleep(1);
}
}
int main()
{
BlockQueue bq;
pthread_t c, p;
pthread_create(&c, NULL, consumer, (void *)&bq);
pthread_create(&p, NULL, producter, (void *)&bq);
pthread_join(c, NULL);
pthread_join(p, NULL);
return 0;
}
POSIX信号量和SystemV信号量作用相同,都是用于同步操作,达到无冲突的访问共享资源目的。 但POSIX可以用于线程间同步
#include <semaphore.h>
int sem_init(sem_t *sem, int pshared, unsigned int value);
参数:
pshared:0表示线程间共享,非零表示进程间共享
value:信号量初始值
int sem_destroy(sem_t *sem);
功能:等待信号量,会将信号量的值减1
int sem_wait(sem_t *sem); //P()
功能:发布信号量,表示资源使用完毕,可以归还资源了。将信号量值加1。
int sem_post(sem_t *sem);//V()
环形队列采用数组模拟,用模运算来模拟环状特性
环形结构起始状态和结束状态都是一样的,不好判断为空或者为满,所以可以通过加计数器或者标记位来判断满或者空。另外也可以预留一个空的位置,作为满的状态
=
但是我们现在有信号量这个计数器,就很简单的进行多线程间的同步过程
#include <iostream>
#include <vector>
#include <stdlib.h>
#include <semaphore.h>
#include <pthread.h>
#include <unistd.h>
#define NUM 16
class RingQueue
{
private:
std::vector<int> q;
int cap;
sem_t data_sem;
sem_t space_sem;
int consume_step;
int product_step;
public:
RingQueue(int _cap = NUM) : q(_cap), cap(_cap)
{
sem_init(&data_sem, 0, 0);
sem_init(&space_sem, 0, cap);
consume_step = 0;
product_step = 0;
}
void PutData(const int &data)
{
sem_wait(&space_sem); // P
q[consume_step] = data;
consume_step++;
consume_step %= cap;
sem_post(&data_sem); // V
}
void GetData(int &data)
{
sem_wait(&data_sem);
data = q[product_step];
product_step++;
product_step %= cap;
sem_post(&space_sem);
}
~RingQueue()
{
sem_destroy(&data_sem);
sem_destroy(&space_sem);
}
};
void *consumer(void *arg)
{
RingQueue *rqp = (RingQueue *)arg;
int data;
for (;;)
{
rqp->GetData(data);
std::cout << "Consume data done : " << data << std::endl;
sleep(1);
}
}
// more faster
void *producter(void *arg)
{
RingQueue *rqp = (RingQueue *)arg;
srand((unsigned long)time(NULL));
for (;;)
{
int data = rand() % 1024;
rqp->PutData(data);
std::cout << "Prodoct data done: " << data << std::endl;
// sleep(1);
}
}
int main()
{
RingQueue rq;
pthread_t c, p;
pthread_create(&c, NULL, consumer, (void *)&rq);
pthread_create(&p, NULL, producter, (void *)&rq);
pthread_join(c, NULL);
pthread_join(p, NULL);
}
/* 线程池: * 一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。 * 线程池的应用场景: * 1. 需要大量的线程来完成任务,且完成任务的时间比较短。 WEB服务器完成网页请求这样的任务,使用线程池技术是非常合适的。因为单个任务小,而任务数量巨大,你可以想象一个热门网站的点击次数。 但对于长时间的任务,比如一个Telnet连接请求,线程池的优点就不明显了。因为Telnet会话时间比线程的创建时间大多了。 * 2. 对性能要求苛刻的应用,比如要求服务器迅速响应客户请求。 * 3. 接受突发性的大量请求,但不至于使服务器因此产生大量线程的应用。突发性大量客户请求,在没有线程池情况下,将产生大量线程,虽然理论上大部分操作系统线程数目最大值不是问题,短时间内产生大量线程可能使内存到达极限,出现错误. * 线程池示例: * 1. 创建固定数量线程池,循环从任务队列中获取任务对象, * 2. 获取到任务对象后,执行任务对象中的任务接口 */
#ifndef __M_TP_H__
#define __M_TP_H__
#include <iostream>
#include <queue>
#include <pthread.h>
#include <unistd.h>
#define MAX_THREAD 5
typedef bool (*handler_t)(int);
class ThreadTask
{
private:
int _data;
handler_t _handler;
public:
ThreadTask() : _data(-1), _handler(NULL) {}
ThreadTask(int data, handler_t handler)
{
_data = data;
_handler = handler;
}
void SetTask(int data, handler_t handler)
{
_data = data;
_handler = handler;
}
void Run()
{
_handler(_data);
}
};
class ThreadPool
{
private:
int _thread_max;
int _thread_cur;
bool _tp_quit;
std::queue<ThreadTask *> _task_queue;
pthread_mutex_t _lock;
pthread_cond_t _cond;
private:
void LockQueue()
{
pthread_mutex_lock(&_lock);
}
void UnLockQueue()
{
pthread_mutex_unlock(&_lock);
}
void WakeUpOne()
{
pthread_cond_signal(&_cond);
}
void WakeUpAll()
{
pthread_cond_broadcast(&_cond);
}
void ThreadQuit()
{
_thread_cur--;
UnLockQueue();
pthread_exit(NULL);
}
void ThreadWait()
{
if (_tp_quit)
{
ThreadQuit();
}
pthread_cond_wait(&_cond, &_lock);
}
bool IsEmpty()
{
return _task_queue.empty();
}
static void *thr_start(void *arg)
{
ThreadPool *tp = (ThreadPool *)arg;
while (1)
{
tp->LockQueue();
while (tp->IsEmpty())
{
tp->ThreadWait();
}
ThreadTask *tt;
tp->PopTask(&tt);
tp->UnLockQueue();
tt->Run();
delete tt;
}
return NULL;
}
public:
ThreadPool(int max = MAX_THREAD) : _thread_max(max), _thread_cur(max),
_tp_quit(false)
{
pthread_mutex_init(&_lock, NULL);
pthread_cond_init(&_cond, NULL);
}
~ThreadPool()
{
pthread_mutex_destroy(&_lock);
pthread_cond_destroy(&_cond);
}
bool PoolInit()
{
pthread_t tid;
for (int i = 0; i < _thread_max; i++)
{
int ret = pthread_create(&tid, NULL, thr_start, this);
if (ret != 0)
{
std::cout << "create pool thread error\n";
return false;
}
}
return true;
}
bool PushTask(ThreadTask *tt)
{
LockQueue();
if (_tp_quit)
{
UnLockQueue();
return false;
}
_task_queue.push(tt);
WakeUpOne();
UnLockQueue();
return true;
}
bool PopTask(ThreadTask **tt)
{
*tt = _task_queue.front();
_task_queue.pop();
return true;
}
bool PoolQuit()
{
LockQueue();
_tp_quit = true;
UnLockQueue();
while (_thread_cur > 0)
{
WakeUpAll();
usleep(1000);
}
return true;
}
};
#endif
#include <stdlib.h>
#include <time.h>
#include <stdio.h>
#include <unistd.h>
#include "threadpool.hpp"
bool handler(int data)
{
srand(time(NULL));
int n = rand() % 5;
printf("Thread: %p Run Tast: %d--sleep %d sec\n", pthread_self(), data, n);
sleep(n);
return true;
}
int main()
{
int i;
ThreadPool pool;
pool.PoolInit();
for (i = 0; i < 10; i++)
{
ThreadTask *tt = new ThreadTask(i, handler);
pool.PushTask(tt);
}
pool.PoolQuit();
return 0;
}
g++ -std=c++11 main.cpp -o mytest -pthread -lrt
单例模式是一种 "经典的, 常用的, 常考的" 设计模式.
IT行业这么火, 涌入的人很多. 俗话说林子大了啥鸟都有. 大佬和菜鸡们两极分化的越来越严重. 为了让菜鸡们不太拖大佬的后腿, 于是大佬们针对一些经典的常见的场景, 给定了一些对应的解决方案, 这个就是 设计模式
某些类, 只应该具有一个对象(实例), 就称之为单例.
例如一个男人只能有一个媳妇.
在很多服务器开发场景中, 经常需要让服务器加载很多的数据 (上百G) 到内存中. 此时往往要用一个单例的类来管理这些数据
洗碗的例子
懒汉方式最核心的思想是 "延时加载". 从而能够优化服务器的启动速度
template <typename T>
class Singleton
{
static T data;
public:
static T *GetInstance()
{
return &data;
}
};
只要通过 Singleton 这个包装类来使用 T 对象, 则一个进程中只有一个 T 对象的实例
template <typename T>
class Singleton
{
static T *inst;
public:
static T *GetInstance()
{
if (inst == NULL)
{
inst = new T();
}
return inst;
}
};
存在一个严重的问题, 线程不安全.
第一次调用 GetInstance 的时候, 如果两个线程同时调用, 可能会创建出两份 T 对象的实例.
但是后续再次调用, 就没有问题了.
// 懒汉模式, 线程安全
template <typename T>
class Singleton
{
volatile static T *inst; // 需要设置 volatile 关键字, 否则可能被编译器优化.
static std::mutex lock;
public:
static T *GetInstance()
{
if (inst == NULL)
{ // 双重判定空指针, 降低锁冲突的概率, 提高性能.
lock.lock(); // 使用互斥锁, 保证多线程情况下也只调用一次 new.
if (inst == NULL)
{
inst = new T();
}
lock.unlock();
}
return inst;
}
};
注意事项:
不是.
原因是, STL 的设计初衷是将性能挖掘到极致, 而一旦涉及到加锁保证线程安全, 会对性能造成巨大的影响.
而且对于不同的容器, 加锁方式的不同, 性能可能也不同(例如hash表的锁表和锁桶).
因此 STL 默认不是线程安全. 如果需要在多线程环境下使用, 往往需要调用者自行保证线程安全.
对于 unique_ptr, 由于只是在当前代码块范围内生效, 因此不涉及线程安全问题.
对于 shared_ptr, 多个对象需要共用一个引用计数变量, 所以会存在线程安全问题. 但是标准库实现的时候考虑到了这个问题, 基于原子操作(CAS)的方式保证 shared_ptr 能够高效, 原子的操作引用计数.
在编写多线程的时候,有一种情况是十分常见的。那就是,有些公共数据修改的机会比较少。相比较改写,它们读的机会反而高的多。通常而言,在读的过程中,往往伴随着查找的操作,中间耗时很长。给这种代码段加锁,会极大地降低我们程序的效率。那么有没有一种方法,可以专门处理这种多读少写的情况呢? 有,那就是读写锁。
int pthread_rwlockattr_setkind_np(pthread_rwlockattr_t *attr, int pref);
pref 共有 3 种选择
int pthread_rwlock_init(pthread_rwlock_t *restrict rwlock,const pthread_rwlockattr_t*restrict attr);
int pthread_rwlock_destroy(pthread_rwlock_t *rwlock);
int pthread_rwlock_rdlock(pthread_rwlock_t *rwlock);
int pthread_rwlock_wrlock(pthread_rwlock_t *rwlock);
int pthread_rwlock_unlock(pthread_rwlock_t *rwlock);
#include <vector>
#include <sstream>
#include <cstdio>
#include <cstdlib>
#include <cstring>
#include <unistd.h>
#include <pthread.h>
volatile int ticket = 1000;
pthread_rwlock_t rwlock;
void *reader(void *arg)
{
char *id = (char *)arg;
while (1)
{
pthread_rwlock_rdlock(&rwlock);
if (ticket <= 0)
{
pthread_rwlock_unlock(&rwlock);
break;
}
printf("%s: %d\n", id, ticket);
pthread_rwlock_unlock(&rwlock);
usleep(1);
}
return nullptr;
}
void *writer(void *arg)
{
char *id = (char *)arg;
while (1)
{
pthread_rwlock_wrlock(&rwlock);
if (ticket <= 0)
{
pthread_rwlock_unlock(&rwlock);
break;
}
printf("%s: %d\n", id, --ticket);
pthread_rwlock_unlock(&rwlock);
usleep(1);
}
return nullptr;
}
struct ThreadAttr
{
pthread_t tid;
std::string id;
};
std::string create_reader_id(std::size_t i)
{
// 利用 ostringstream 进行 string 拼接
std::ostringstream oss("thread reader ", std::ios_base::ate);
oss << i;
return oss.str();
}
std::string create_writer_id(std::size_t i)
{
// 利用 ostringstream 进行 string 拼接
std::ostringstream oss("thread writer ", std::ios_base::ate);
oss << i;
return oss.str();
}
void init_readers(std::vector<ThreadAttr> &vec)
{
for (std::size_t i = 0; i < vec.size(); ++i)
{
vec[i].id = create_reader_id(i);
pthread_create(&vec[i].tid, nullptr, reader, (void *)vec[i].id.c_str());
}
}
void init_writers(std::vector<ThreadAttr> &vec)
{
for (std::size_t i = 0; i < vec.size(); ++i)
{
vec[i].id = create_writer_id(i);
pthread_create(&vec[i].tid, nullptr, writer, (void *)vec[i].id.c_str());
}
}
void join_threads(std::vector<ThreadAttr> const &vec)
{
// 我们按创建的 逆序 来进行线程的回收
for (std::vector<ThreadAttr>::const_reverse_iterator it = vec.rbegin(); it !=
vec.rend();
++it)
{
pthread_t const &tid = it->tid;
pthread_join(tid, nullptr);
}
}
void init_rwlock()
{
#if 0 // 写优先
pthread_rwlockattr_t attr;
pthread_rwlockattr_init(&attr);
pthread_rwlockattr_setkind_np(&attr, PTHREAD_RWLOCK_PREFER_WRITER_NONRECURSIVE_NP);
pthread_rwlock_init(&rwlock, &attr);
pthread_rwlockattr_destroy(&attr);
#else // 读优先,会造成写饥饿
pthread_rwlock_init(&rwlock, nullptr);
#endif
}
int main()
{
// 测试效果不明显的情况下,可以加大 reader_nr
// 但也不能太大,超过一定阈值后系统就调度不了主线程了
const std::size_t reader_nr = 1000;
const std::size_t writer_nr = 2;
std::vector<ThreadAttr> readers(reader_nr);
std::vector<ThreadAttr> writers(writer_nr);
init_rwlock();
init_readers(readers);
init_writers(writers);
join_threads(writers);
join_threads(readers);
pthread_rwlock_destroy(&rwlock);
}
test: test.cc g++ -std=c++11 -Wall -Werror ^ -o @ -lpthread