有时候,很多变量都需要在线程间共享,这样的变量称为共享变量,可以通过数据的共享,完成线程之间的交互。然而多个线程并发的操作共享变量,会带来⼀些问题。
共享变量被保护起来后就可以称之为临界资源
例如如下代码:
// 操作共享变量会有问题的售票系统代码
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <pthread.h>
int ticket = 100;
void *route(void *arg)
{
char *id = (char*)arg;
while ( 1 ) {
if ( ticket > 0 ) {
usleep(1000);
printf("%s sells ticket:%d\n", id, ticket);
ticket--;
} else {
break;
}
}
}
int main()
{
pthread_t t1, t2, t3, t4;
pthread_create(&t1, NULL, route, "thread 1");
pthread_create(&t2, NULL, route, "thread 2");
pthread_create(&t3, NULL, route, "thread 3");
pthread_create(&t4, NULL, route, "thread 4");
pthread_join(t1, NULL);
pthread_join(t2, NULL);
pthread_join(t3, NULL);
pthread_join(t4, NULL);
}会出现以下可能的结果:
thread 4 sells ticket:100
...
thread 4 sells ticket:1
thread 2 sells ticket:0
thread 1 sells ticket:-1
thread 3 sells ticket:-2为什么会出现票数为负数的情况?
--ticket 操作本⾝就不是⼀个原子操作要解决以上问题,需要做到三点:
要做到这三点,本质上就是需要⼀把锁。Linux上提供的这把锁叫互斥量。
如下图所示:

初始化互斥量有两种方法:
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZERint pthread_mutex_init(pthread_mutex_t *restrict mutex, const pthread_mutexattr_t *restrict attr);参数:
销毁互斥量需要注意:
PTHREAD_ MUTEX_ INITIALIZER 初始化的互斥量不需要销毁 int pthread_mutex_destroy(pthread_mutex_t *mutex);int pthread_mutex_lock(pthread_mutex_t *mutex);
int pthread_mutex_unlock(pthread_mutex_t *mutex);调⽤ pthread_ lock 时,可能会遇到以下情况:
pthread_ lock调用会陷⼊阻塞(执行流被挂起),等待互斥量解锁。通过对互斥量的学习,我们就可以改进上面的售票系统:
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <pthread.h>
#include <sched.h>
int ticket = 100;
pthread_mutex_t mutex;
void *route(void *arg)
{
char *id = (char*)arg;
while ( 1 ) {
pthread_mutex_lock(&mutex);
if ( ticket > 0 ) {
usleep(1000);
printf("%s sells ticket:%d\n", id, ticket);
ticket--;
pthread_mutex_unlock(&mutex);
} else {
pthread_mutex_unlock(&mutex);
break;
}
}
}
int main( void )
{
pthread_t t1, t2, t3, t4;
pthread_mutex_init(&mutex, NULL);
pthread_create(&t1, NULL, route, "thread 1");
pthread_create(&t2, NULL, route, "thread 2");
pthread_create(&t3, NULL, route, "thread 3");
pthread_create(&t4, NULL, route, "thread 4");
pthread_join(t1, NULL);
pthread_join(t2, NULL);
pthread_join(t3, NULL);
pthread_join(t4, NULL);
pthread_mutex_destroy(&mutex);
}通过对公共资源的加锁保护我们就可以保证访问资源的原子性,这样就不会出现之前的错误了。
通过上述互斥量mutex接口的学习,我们就可以基于上述函数对互斥量进行封装:
#pragma once
#include <iostream>
#include <string>
#include <pthread.h>
namespace MutexModule
{
class Mutex
{
public:
// 删除不要的拷⻉和赋值
Mutex(const Mutex &) = delete;
const Mutex &operator=(const Mutex &) = delete;
Mutex()
{
// 初始化锁
int n = pthread_mutex_init(&_mutex, nullptr);
if (n != 0)
std::perror("初始化失败...");
}
void Lock()
{
int n = pthread_mutex_lock(&_mutex);
if (n != 0)
std::perror("Lock失败...");
}
void Unlock()
{
int n = pthread_mutex_unlock(&_mutex);
if (n != 0)
std::perror("Unlock失败...");
}
pthread_mutex_t *MutexPtr() // 获取原始指针
{
return &_mutex;
}
~Mutex()
{
int n = pthread_mutex_destroy(&_mutex);
if (n != 0)
std::perror("_mutex销毁失败...");
}
private:
pthread_mutex_t _mutex;
};
// 采⽤RAII⻛格,进⾏锁管理
class LockGuard
{
public:
LockGuard(Mutex &mutex) : _mutex(mutex)
{
_mutex.Lock();
}
~LockGuard()
{
_mutex.Unlock();
}
private:
Mutex &_mutex;
};
}封装好互斥量后我们再采用RAII智能指针的风格,对锁进行管理。
// 抢票的代码就可以更新成为
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <pthread.h>
#include "Lock.hpp"
using namespace MutexModule;
int ticket = 1000;
Mutex mutex;
void *route(void *arg)
{
char *id = (char *)arg;
while (1)
{
LockGuard lockguard(mutex); // 使⽤RAII⻛格的锁
if (ticket > 0)
{
usleep(1000);
printf("%s sells ticket:%d\n", id, ticket);
ticket--;
}
else
{
break;
}
}
return nullptr;
}
int main(void)
{
pthread_t t1, t2, t3, t4;
pthread_create(&t1, NULL, route, (void *)"thread 1");
pthread_create(&t2, NULL, route, (void *)"thread 2");
pthread_create(&t3, NULL, route, (void *)"thread 3");
pthread_create(&t4, NULL, route, (void *)"thread 4");
pthread_join(t1, NULL);
pthread_join(t2, NULL);
pthread_join(t3, NULL);
pthread_join(t4, NULL);
}多个线程竞争临界资源时,未争夺到的线程需要在一个地方按顺序进行等待,竞争到的线程使用完临界资源释放锁后如果需要再次使用,也需要在该地方进行等待,这就是条件变量。
int pthread_cond_init(pthread_cond_t *restrict cond,const pthread_condattr_t *restrict attr);参数:
int pthread_cond_destroy(pthread_cond_t *cond)int pthread_cond_wait(pthread_cond_t *restrict cond,pthread_mutex_t *restrict mutex);参数:
✨为什么 pthread_cond_wait 需要互斥量?
简单来说就是线程在条件变量下等待时一定在临界资源内,当唤醒时一定需要重新持有锁,这样才能保护公共资源。
int pthread_cond_broadcast(pthread_cond_t *cond);
int pthread_cond_signal(pthread_cond_t *cond);测试代码如下:
#include <iostream>
#include <cstdio>
#include <string>
#include <pthread.h>
#include <unistd.h>
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
void *active(void *arg)
{
std::string name = static_cast<const char *>(arg);
while (true)
{
pthread_mutex_lock(&mutex);
// 没有对于资源是否就绪的判定
pthread_cond_wait(&cond, &mutex); // mutex??
printf("%s is active!\n", name.c_str());
pthread_mutex_unlock(&mutex);
}
}
int main()
{
pthread_t tid1, tid2, tid3;
pthread_create(&tid1, nullptr, active, (void *)"thread-1");
pthread_create(&tid2, nullptr, active, (void *)"thread-2");
pthread_create(&tid3, nullptr, active, (void *)"thread-3");
sleep(1);
printf("Main thread ctrl begin...\n");
while (true)
{
printf("main wakeup thread...\n");
pthread_cond_signal(&cond);//唤醒单个线程
//pthread_cond_broadcast(&cond);//唤醒所有线程
sleep(1);
}
pthread_join(tid1, nullptr);
pthread_join(tid2, nullptr);
pthread_join(tid3, nullptr);
}

#pragma once
#include <iostream>
#include <pthread.h>
#include "Mutex.hpp"
namespace CondModule
{
using namespace MutexModule;
class Cond
{
public:
Cond()
{
int n = ::pthread_cond_init(&_cond, nullptr);
if(n!=0)
std::perror("Cond初始化失败...");
}
void Wait(Mutex &mutex)
{
int n = ::pthread_cond_wait(&_cond, mutex.LockPtr());
if(n!=0)
std::perror("cond wait失败...");
}
void Notify()
{
int n = ::pthread_cond_signal(&_cond);
if(n!=0)
std::perror("notify失败...");
}
void NotifyAll()
{
int n = ::pthread_cond_broadcast(&_cond);
if(n!=0)
std::perror("notifyall失败...");
}
~Cond()
{
int n = ::pthread_cond_destroy(&_cond);
if(n!=0)
std::perror("cond销毁失败...");
}
private:
pthread_cond_t _cond;
};
}为了让条件变量更具有通⽤性,建议封装的时候,不要在Cond类内部引⽤对应的封装互斥量,要不然后⾯组合的时候,会因为代码耦合的问题难以初始化,因为⼀般⽽⾔Mutex和Cond基本是⼀起创建的。
• 等待条件代码
pthread_mutex_lock(&mutex);
while (条件为假)
pthread_cond_wait(cond, mutex);
修改条件
pthread_mutex_unlock(&mutex);
生产消费者模型可以总结为
321原则;3种关系:生产者与生产者,生产者与消费者,消费者与消费者;2种角色:生产者与消费者;1个交易场所。
在多线程编程中阻塞队列(Blocking Queue)是⼀种常⽤于实现⽣产者和消费者模型的数据结构。其与普通的队列区别在于,当队列为空时,从队列获取元素的操作将会被阻塞,直到队列中被放入了元素;当队列满时,往队列⾥存放元素的操作也会被阻塞,直到有元素被从队列中取出(以上的操作都是基于不同的线程来说的,线程在对阻塞队列进程操作时会被阻塞):

代码如下:
#pragma once
#include <iostream>
#include <queue>
#include <pthread.h>
#include "Mutex.hpp"
#include "Cond.hpp"
namespace BlockQueueModule
{
using namespace MutexModule;
using namespace CondModule;
static const int gcap = 10;
template <typename T>
class BlockQueue
{
private:
bool IsFull()
{
return _q.size() == _cap;
}
bool IsEmpty()
{
return _q.size() == 0;
}
public:
BlockQueue(int cap = gcap)
: _cap(cap),
_cwait_num(0),
_pwait_num(0)
{
}
void Equeue(const T &in) // 生产者
{
LockGuard lockguard(_mutex);
while(IsFull())//必须得是while
{
std::cout<<"生产者线程进入等待..."<<std::endl;
++_pwait_num;
_productor_cond.Wait(_mutex);
//走到这里线程已经被唤醒,所以要--
std::cout<<"生产者线程被唤醒..."<<std::endl;
--_pwait_num;
}
//现在可以生产数据
_q.push(in);
// 肯定有数据!
if (_cwait_num)//如果消费者那边有线程在等待
{
_consumer_cond.Notify();
}
}
void Pop(T* out)
{
LockGuard lockguard(_mutex);
while(IsEmpty())
{
std::cout<<"消费者线程进入等待..."<<std::endl;
++_cwait_num;
_consumer_cond.Wait(_mutex);//线程被唤醒&&重新申请并持有锁(它会在临界区内醒来!)
std::cout<<"消费者线程被唤醒..."<<std::endl;
--_cwait_num;
}
//现在可以消费数据
*out = _q.front();
_q.pop();
//肯定有空位给消费者生产
if(_pwait_num)//如果生产者那边有线程在等待
{
_productor_cond.Notify();
}
}
~BlockQueue()
{
}
private:
std::queue<T> _q;
int _cap;
Mutex _mutex;
Cond _productor_cond;
Cond _consumer_cond;
int _cwait_num;
int _pwait_num;
};
}这里我们使用了之前封装的锁与条件变量
#include "BlockQueue.hpp"
#include <pthread.h>
#include <unistd.h>
using namespace BlockQueueModule;
void *Consumer(void *args)
{
BlockQueue<int> *bq = static_cast<BlockQueue<int> *>(args);
while(true)
{
int data;
// 1. 从bq拿到数据
bq->Pop(&data);
// 2.做处理
printf("Consumer, 消费了一个数据: %d\n", data);
}
}
void *Productor(void *args)
{
BlockQueue<int> *bq = static_cast<BlockQueue<int> *>(args);
int data = 0;
while (true)
{
sleep(2);
bq->Equeue(data);
printf("producter 生产了一个数据: %d\n", data);
data++;
}
}
int main()
{
// 交易场所,不仅仅可以用来进行传递数据
// 传递任务!!!v1: 对象 v2
BlockQueue<int> *bq = new BlockQueue<int>(5); // 共享资源 -> 临界资源
// 单生产,单消费
pthread_t c1, p1;
pthread_create(&c1, nullptr, Consumer, bq);
pthread_create(&p1, nullptr, Productor, bq);
pthread_join(c1, nullptr);
pthread_join(p1, nullptr);
delete bq;
return 0;
}结果如下:

#include "BlockQueue.hpp"
#include <pthread.h>
#include <unistd.h>
using namespace BlockQueueModule;
void *Consumer(void *args)
{
BlockQueue<int> *bq = static_cast<BlockQueue<int> *>(args);
while(true)
{
int data;
// 1. 从bq拿到数据
bq->Pop(&data);
// 2.做处理
printf("Consumer, 消费了一个数据: %d\n", data);
}
}
void *Productor(void *args)
{
BlockQueue<int> *bq = static_cast<BlockQueue<int> *>(args);
int data = 0;
while (true)
{
sleep(2);
bq->Equeue(data);
printf("producter 生产了一个数据: %d\n", data);
data++;
}
}
int main()
{
// 交易场所,不仅仅可以用来进行传递数据
// 传递任务!!!v1: 对象 v2
BlockQueue<int> *bq = new BlockQueue<int>(5); // 共享资源 -> 临界资源
pthread_t c1, p1,c2,p2, p3;
pthread_create(&c1, nullptr, Consumer, bq);
pthread_create(&c2, nullptr, Consumer, bq);
pthread_create(&p1, nullptr, Productor, bq);
pthread_create(&p2, nullptr, Productor, bq);
pthread_create(&p3, nullptr, Productor, bq);
pthread_join(c1, nullptr);
pthread_join(c2, nullptr);
pthread_join(p1, nullptr);
pthread_join(p2, nullptr);
pthread_join(p3, nullptr);
delete bq;
return 0;
}结果如下:

以上就是有关线程互斥与同步有关的内容啦,线程互斥指的是多个线程访问公共资源,保证有且只有一个执行流进入临界区,访问临界资源,通常对临界资源起保护作用;线程同步指的是在保证数据安全的前提下,让线程能够按照某种特定的顺序访问临界资源,从而有效避免饥饿问题,叫做同步。以上就是今天所有的内容啦~ 完结撒花~ 🥳🎉🎉