libpthread.so.0
进行了封装,使得我们用户能通过库里的接口进程线程的创建,等待,终止等等
pthread_t
类型是用户空间线程库对线程的抽象(本质就是一个虚拟地址),用于在用户空间管理线程的创建、销毁等操作。而LWP则是内核管理轻量级进程的抽象,用于在内核空间进行线程的调度和管理。
在Linux系统中,线程库(如pthread库)会将pthread_t
映射到对应的LWP上,以便内核进行线程的调度。当创建一个线程时,线程库会分配一个pthread_t
标识符,并在内核中创建一个对应的LWP。线程库会负责将pthread_t
与LWP进行映射,以便在用户空间对线程进行操作。
struct pthread
:在Linux系统中,struct pthread
是指代线程控制块(Thread Control Block,TCB)的结构体。它包含了线程的状态信息、线程的调度信息、线程的栈信息等。struct pthread
结构体用于描述线程的属性和状态,是操作系统用来管理线程的数据结构。
__thread
关键字创建线程局部变量。使用后全局变量会发生拷贝到线程内(放到类型前面使用:__thread int a;
),线程会使用线程局部的那个。
#include <iostream>
#include <thread> // C++里的库
#include <unistd.h>
#include <sys/types.h>
using namespace std;
void *task(void *argv)
{
int count = 5;
while (true)
{
cout << "I'm a new thread ,pid : " << getpid() << ". count:" << count << "count's address:" << &count << endl;
sleep(1);
count--;
}
return nullptr;
}
int main()
{
pthread_t tid1;
pthread_t tid2;
pthread_create(&tid1, nullptr, task, nullptr);
pthread_create(&tid2, nullptr, task, nullptr); // 这里我们两个线程执行一个函数,里面有临时变量,看二者地址如何
pthread_join(tid1, nullptr);
pthread_join(tid2, nullptr);
return 0;
}
能证明独立栈的存在
我们利用上次自己封装的Thread来写一段多线程抢票代码
Thread.hpp
#ifndef __THREAD_HPP__
#define __THREAD_HPP__
#include <iostream>
#include <string>
#include <unistd.h>
#include <functional>
#include <pthread.h>
namespace ThreadModule
{
template <typename T>
using func_t = std::function<void(T &)>;
// typedef std::function<void(const T&)> func_t;
template <typename T>
class Thread
{
public:
void Excute()
{
_func(_data);
}
public:
Thread(func_t<T> func, T &data, const std::string &name = "none-name")
: _func(func), _data(data), _threadname(name), _stop(true)
{
}
static void *threadroutine(void *args) // 类成员函数,形参是有this指针的!!
{
Thread<T> *self = static_cast<Thread<T> *>(args);
self->Excute();
return nullptr;
}
bool Start()
{
int n = pthread_create(&_tid, nullptr, threadroutine, this);
if (!n)
{
_stop = false;
return true;
}
else
{
return false;
}
}
void Detach()
{
if (!_stop)
{
pthread_detach(_tid);
}
}
void Join()
{
if (!_stop)
{
pthread_join(_tid, nullptr);
}
}
std::string name()
{
return _threadname;
}
void Stop()
{
_stop = true;
}
~Thread() {}
private:
pthread_t _tid;
std::string _threadname;
T &_data; // 为了让所有的线程访问同一个全局变量
func_t<T> _func;
bool _stop;
};
} // namespace ThreadModule
#endif
#include "Thread.hpp"
using namespace MyThread;
class ThreadData
{
public:
ThreadData(int &tickets, const std::string &name)
: _tickets(tickets), _name(name), _total(0)
{
}
~ThreadData()
{
}
public:
int &_tickets; // 所有的线程,最后都会引用同一个全局的g_tickets
std::string _name; // 进程的名字
int _total; // 这个进程抢了多少票
};
int g_tickets = 10000; // 共享资源,没有保护的, 临界资源
const int num = 4; // 线程数量
void route(ThreadData *td)
{
while (true)
{
if (td->_tickets > 0)
{
usleep(1000);
printf("%s running, get tickets: %d\n", td->_name.c_str(), td->_tickets);
td->_total++;
td->_tickets--;
}
else
{
break;
}
}
}
int main()
{
std::vector<Thread<ThreadData *>> threads; // 所有的线程存在一个数组里
std::vector<ThreadData *> datas; // 所有的数据也是
// 1. 创建一批线程
for (int i = 0; i < num; i++)
{
std::string name = "thread-00" + std::to_string(i + 1);
ThreadData *td = new ThreadData(g_tickets, name);
threads.emplace_back(route, td, name);
datas.emplace_back(td); // 创建完后,都插入
}
for (auto &e : threads)
{
e.Start();
}
for (auto &e : threads)
{
e.Join();
}
return 0;
}
最后一运行发现:
为什么会抢到负数?:对全局的g_tickets
的判断不是原子的
此时,当第一个进程从内存里把g_tickets读到CPU的寄存器中,进行判断,此时1>0
成立。然后因为sleep(),线程挂起(带走自己是上下文数据),CPU调度线程让下一个来了,又是同样的,因为把g_tickets读到CPU的寄存器中(还是1)……
最后,新线程都在等待队列里面时_tickets 都是1,然后遇到了 td->_tickets--;
这条语句,都开始执行,先从内存读数据- ->>每次自减后都要写会回物理内存,那么就会导致,下一个线程执行 td-> _tickets–时,又会从内存里把已经减过一次的数据读回来
- -
直接给了多线程并发访问时,更多的切换机会其实
td->_tickets--;
不是原子的。本质上是这三步
那么最后的汇编语句大概率也是三条语句,在这三条语句之间都有可能发生时间片到了导致线程切换
汇编语句只有一句,那么就是原子的
要解决以上问题,需要做到三点:
要做到这三点,本质上就是需要一把锁。Linux上提供的这把锁叫互斥量
关于静态变量与全局变量的小知识:
总的来说,全局变量是整个程序可见的变量,可以在不同的文件中共享;而静态变量是局部的,只能在定义它的函数或文件内部使用。根据需求,可以选择使用全局变量或静态变量来存储数据。
初始化:
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER
静态初始化的互斥锁是在编译时就已经初始化好了,而不是在运行时动态初始化。
PTHREAD_MUTEX_INITIALIZER
宏会将互斥锁初始化为一个静态的、已经被初始化的状态,这样就可以不用显式调用pthread_mutex_init
来初始化互斥锁 不需要显式调用pthread_mutex_destroy
函数来销毁互斥锁。这是因为静态初始化的互斥锁是在编译时就已经初始化好了,并且通常会在程序结束时自动被系统释放
pthread_mutex_init
函数来创建并初始化互斥锁。这种方式允许在程序运行时根据需要动态创建和初始化互斥锁,而不是在编译时静态初始化。函数原型:
int pthread_mutex_init(pthread_mutex_t *restrict mutex, const pthread_mutexattr_t *restrict attr);
参数说明:
mutex
:要初始化的互斥锁,传入一个指向 pthread_mutex_t
类型的指针。attr
:互斥锁的属性,通常传入 NULL
,表示使用默认属性进行初始化。返回值:
销毁互斥量:
销毁互斥锁是在不再需要使用互斥锁时释放其资源的重要操作。在销毁互斥锁时需要注意以下几点:
PTHREAD_MUTEX_INITIALIZER
初始化的静态互斥锁不需要销毁:静态互斥锁在程序结束时会自动被系统释放,因此不需要显式调用 pthread_mutex_destroy
函数来销毁这种互斥锁。
pthread_mutex_destroy
函数销毁了一个互斥锁,该互斥锁的状态将不再可预测,不应再被用于加锁和解锁操作。
函数原型:
int pthread_mutex_destroy(pthread_mutex_t *mutex);
参数说明:
mutex
:要销毁的互斥锁,传入一个指向 pthread_mutex_t
类型的指针。返回值:
互斥量加锁和解锁:
在多线程编程中,互斥锁(mutex)是一种用于保护共享资源的同步机制。互斥锁需要在访问共享资源之前进行加锁操作,访问完成后进行解锁操作,以确保同一时刻只有一个线程可以访问共享资源,避免数据竞争和不确定行为的发生。
pthread_mutex_lock 函数:
int pthread_mutex_lock(pthread_mutex_t *mutex);
mutex
是要加锁的互斥锁。当调用
pthread_mutex_lock
函数时,如果互斥量处于未锁定状态,那么该函数会成功将互斥量锁定,并且立即返回成功。这意味着当前线程已经获得了对互斥量的独占访问权限。 然而,如果在调用pthread_mutex_lock
函数时,其他线程已经锁定了互斥量,或者有其他线程同时尝试锁定互斥量但未竞争成功,那么当前线程的调用将会被阻塞(即执行流被挂起),直到互斥量被解锁为止。这种行为确保了只有一个线程能够同时访问临界区,避免了数据竞争和不确定行为的发生。 只有一个线程会申请锁成功,成功的会接着执行。其余申请锁失败都会阻塞在那
pthread_mutex_unlock 函数:
int pthread_mutex_unlock(pthread_mutex_t *mutex);
mutex
是要解锁的互斥锁。解决方案1:出现的并发访问的问题,本质是因为多个执行流执行访问全局数据的代码导致的。保护全局共享资源的本质是通过保护临界区完成的。那我们就加锁让一个线程去抢票(全局互斥锁)
int g_tickets = 1000; // 共享资源,没有保护的, 临界资源
const int num = 4; // 线程数量
pthread_mutex_t gmutex = PTHREAD_MUTEX_INITIALIZER;
void route(ThreadData *td)
{
while (true)
{
pthread_mutex_lock(&gmutex); // 要在这里进行加锁,让一次只有一个线程(竞争力强的那个)能进里面
if (td->_tickets > 0) // 每个线程内部,访问临界资源的代码,就叫做临界区
{
usleep(1000);
printf("%s running, get tickets: %d\n", td->_name.c_str(), td->_tickets);
td->_tickets--;
pthread_mutex_unlock(&gmutex);
td->_total++;
}
else
{
pthread_mutex_unlock(&gmutex);
break;
}
}
}
但是如果我们换个操作系统就有可能发生,全部都是一个相同的线程来抢票(它的竞争力太强了) 竞争锁是自由竞争的,竞争锁的能力太强的线程,会导致其他线程抢不到锁 — 造成了其他线程的饥饿问题 下面我们会利用同步来解决
局部互斥锁
#include "Thread.hpp"
using namespace MyThread;
class ThreadData
{
public:
ThreadData(int &tickets, const std::string &name, pthread_mutex_t &mutex)
: _tickets(tickets), _name(name), _total(0), _mutex(mutex)
{
}
~ThreadData()
{
}
public:
int &_tickets; // 所有的线程,最后都会引用同一个全局的g_tickets
std::string _name; // 进程的名字
int _total; // 这个进程抢了多少票
pthread_mutex_t &_mutex; // 传一个动态锁过来,因为是引用,所以都是同一个锁
};
int g_tickets = 1000; // 共享资源,没有保护的, 临界资源
const int num = 4; // 线程数量
// pthread_mutex_t gmutex = PTHREAD_MUTEX_INITIALIZER;
void route(ThreadData *td)
{
while (true)
{
// pthread_mutex_lock(&gmutex); // 要在这里进行加锁,让一次只有一个线程(竞争力强的那个)能进里面
pthread_mutex_lock(&td->_mutex);
if (td->_tickets > 0) // 每个线程内部,访问临界资源的代码,就叫做临界区
{
usleep(1000);
printf("%s running, get tickets: %d\n", td->_name.c_str(), td->_tickets);
td->_tickets--;
pthread_mutex_unlock(&td->_mutex);
td->_total++;
}
else
{
pthread_mutex_unlock(&td->_mutex);
break;
}
}
}
int main()
{
pthread_mutex_t mutex;
pthread_mutex_init(&mutex, nullptr); // 进行初始化
std::vector<Thread<ThreadData *>> threads; // 所有的线程存在一个数组里
std::vector<ThreadData *> datas; // 所有的数据也是
// 1. 创建一批线程
for (int i = 0; i < num; i++)
{
std::string name = "thread-00" + std::to_string(i + 1);
ThreadData *td = new ThreadData(g_tickets, name, mutex);
threads.emplace_back(route, td, name);
datas.emplace_back(td); // 创建完后,都插入
}
for (auto &e : threads)
{
e.Start();
}
for (auto &e : threads)
{
e.Join();
}
pthread_mutex_destroy(&mutex);
return 0;
}
先来复习一下线程的状态 除了正在执行(running)和挂起(blocked/sleeping/waiting)状态外,还有几种常见的线程状态:
在操作系统中,挂起、等待和阻塞是相关但不完全相同的概念:
为了实现互斥锁操作,大多数体系结构都提供了swap或exchange指令(汇编指令),该指令的作用是把寄存器和内存单元的数据相交换,由于只有一条指令,保证了原子性。 现在我们把lock和unlock的伪代码改一下
lock:
movb $0, al ; 将值0加载到al寄存器中
xchgb al, mutex ; 将al寄存器的值和mutex的值进行交换
cmpb $0, al ; 比较al寄存器的值和0
jne wait ; 如果al寄存器的值不等于0,则跳转到等待(wait)标签
ret ; 返回,表示加锁成功,会去执行下面的代码
wait:
suspend ; 挂起线程等待
jmp lock ; 跳转到lock标签,重新尝试加锁
unlock:
movb $1, mutex ; 将值1写入mutex,表示解锁
wakeup ; 唤醒等待mutex的线程
ret ; 返回,表示解锁成功
本来我们定义的
mutex
是在内存中的。数据在内存里,所有线程都能访问,属未共享的。但是如果转移到CPU内部寄存器中,就属于一个线程私有 当线程1竞争成功时,1被交换到寄存器内,也就是线程1的上下文中。CPU寄存器硬件只有一套,但是CPU寄存器内的是数据线程的硬件上下文 而且我们执行的是交换,不是拷贝,这保证了mutex
只有一个。加之交换是原子的,即便线程被切换的时机是随时的,发生了切换,但是那时mutex已经到了某个线程的上下文中了,凭借这个值,就能执行下方代码,而其他线程就阻塞了
那现在还有个问题:在临界区内部,正在访问临界区的线程可以被OS切换调度吗?——答案是可以的。正在执行的线程是可以被操作系统(OS)切换调度的。即使一个线程已经获取了锁并进入了临界区,仍然有可能被操作系统暂时挂起
现在假设有一个线程 A 正在访问临界区(已经获取了锁),而其他线程 B、C、D 正在等待获取这个锁。在这种情况下,
int pthread_mutex_lock(pthread_mutex_t *mutex);
这条语句对于其他线程只有两种情况是有意义的(锁被释放,或者没线程申请到了锁):
临界区的代码对于其他线程是原子的,因为只有一个线程能够同时访问临界区。其他线程在等待获取锁的过程中不会执行临界区的代码,从而确保了临界区操作的原子性和线程安全性。
概念
线程安全是针对线程执行时,各个线程的相互关系。而重入是属于函数的特点
常见的线程不安全的情况
strtok
函数(线程不安全),如果该函数被多个线程同时调用,可能会导致出现奇怪的结果。常见的线程安全的情况
常见不可重入的情况
常见可重入的情况
死锁是指在并发系统中的一种状态,其中每个进程都在等待系统资源,但这些资源被其他进程占用,导致所有进程都无法继续执行,形成一种互相等待的僵局状态。 死锁是多线程对锁不合理的使用,导致代码不会继续向后正常推进
死锁是在并发系统中常见的一种问题,指的是多个进程或线程因竞争系统资源而陷入无限等待对方释放资源的状态,导致所有进程都无法继续执行,形成一种僵局。死锁的发生通常总是伴随着系统资源的互相占用和互相等待。
死锁发生的必要条件通常包括:
当满足以上四个条件时,就会发生死锁。
避免死锁的最有效方式是破坏死锁的四个必要条件:
还可以采取以下具体措施来避免死锁:
在了解线程同步之前先明确几个概念:串行、并发和并行。描述了多任务处理的不同方式。
在多核处理器中,可以实现并行处理,即同时在多个核心上执行不同的任务,以提高整体系统的执行效率。而并发则更多指的是在单个处理器上通过快速切换实现多任务间的交替执行
线程同步是指多个线程之间协调和控制其执行顺序,以避免出现竞态条件(Race Condition)和数据竞争(Data Race)等问题。
在保证数据安全的前提下,让线程能够按照某种特定的顺序访问临界资源,从而有效避免饥饿问题,叫做同步
同步:在保证数据安全的前提下,让线程能够按照某种特定的顺序访问临界资源,从而有效避免饥饿问题,叫做同步 竞态条件:因为时序问题,而导致程序异常,我们称之为竞态条件。由于多个线程的操作顺序不确定或不对称而导致的错误结果或异常情况。当多个线程在对共享资源进行读写操作时,如果它们的操作顺序不正确,可能会导致程序出现意外的结果
条件变量是一种线程同步的高级机制,它允许线程在某个特定条件下等待。条件变量通常与互斥锁一起使用,用于线程之间的协调和通信。条件变量允许一个线程在某个条件不满足时等待,当条件满足时,其他线程可以通知等待的线程继续执行。
条件变量是多线程编程中用于线程间协调和通信的一种机制。它通常与互斥锁一起使用,用于等待某个条件的发生并在条件满足时唤醒等待的线程。条件变量的接口函数包括初始化、销毁、等待条件满足和唤醒等待等操作。
初始化条件变量
静态初始化条件变量
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
上述代码使用了宏PTHREAD_COND_INITIALIZER
来进行静态初始化,这样就可以在定义条件变量时直接初始化,无需调用pthread_cond_init
函数。这种方式适用于条件变量的属性使用默认值的情况。
注意事项:
pthread_cond_init
函数。pthread_cond_destroy
函数来销毁,因为它们不会分配额外的资源,只是简单的初始化。动态初始化
int pthread_cond_init(pthread_cond_t *restrict cond, const pthread_condattr_t *restrict attr);
cond
:要初始化的条件变量attr
:条件变量的属性,通常为NULL表示使用默认属性销毁条件变量
int pthread_cond_destroy(pthread_cond_t *cond);
cond
:要销毁的条件变量等待条件满足
使当前线程等待在指定的条件变量上,直到条件满足或被其他线程唤醒。
当线程调用
pthread_cond_wait()
时,它会暂时离开临界区,因为pthread_cond_wait()
会自动释放传递给它的互斥锁。这是为了允许其他线程能够访问和修改与条件变量相关联的共享数据,同时避免死锁。 具体来说,当线程调用pthread_cond_wait()
时,会发生以下步骤:
&gmutex
),确保它是锁定的。&gcond
)被其他线程触发。pthread_cond_signal()
或 pthread_cond_broadcast()
被调用),线程会被唤醒。pthread_cond_wait()
会自动重新竞争之前释放的互斥锁。pthread_cond_wait()
调用之后的代码。 因此,在调用 pthread_cond_wait()
时,线程会短暂地离开临界区,等待条件变量被触发,然后再重新进入临界区。这种机制确保了线程在访问共享数据时能够正确地同步,并避免了竞态条件和其他并发问题。
int pthread_cond_wait(pthread_cond_t *restrict cond, pthread_mutex_t *restrict mutex);
cond
:要在这个条件变量上等待mutex
:与条件变量关联的互斥量,用于在等待条件变量之前解锁,等待结束后再次上锁在调用
pthread_cond_wait
函数时需要传入一个互斥锁(mutex),这是因为条件变量(condition variable)通常与互斥锁一起使用,以确保线程在等待条件时能够正确同步和避免竞态条件(race condition) 在使用条件变量时,通常会遵循以下步骤:
pthread_mutex_lock
函数对互斥锁进行加锁,以确保对共享资源的访问是互斥的,避免多个线程同时访问共享资源。
pthread_cond_wait
函数时,会先释放互斥锁,然后等待在条件变量上的信号。
所以就是:线程A得到锁,执行等待条件->释放锁,等条件变化 - -> 另一个线程又申请到锁,又在等条件变化…… 最后所有线程都在条件那里等着 在使用条件变量时,线程在等待条件变化时会先释放之前获取的互斥锁,然后等待在条件变量上的信号。当条件满足时,线程被唤醒后需要重新获取之前释放的互斥锁,这是因为在等待条件变化时释放互斥锁是条件变量机制的一部分。先释放再获取的 具体原因包括:
因此,在使用条件变量时,线程需要在等待条件变化时释放互斥锁,等待条件满足后重新获取互斥锁,以确保线程能够正确同步共享资源的访问。这样可以避免竞争条件和确保线程安全地访问共享资源。
pthread_mutex_unlock
函数对互斥锁进行解锁,释放资源(释放的是互斥锁的相关资源)。唤醒等待
pthread_cond_broadcast
:唤醒所有等待在指定条件变量上的线程。pthread_cond_signal
:唤醒等待在指定条件变量上的一个线程(如果有多个线程等待,则唤醒其中一个)int pthread_cond_broadcast(pthread_cond_t *cond);
int pthread_cond_signal(pthread_cond_t *cond);
cond
:要唤醒等待的条件变量#include <iostream>
#include <string>
#include <cstring>
#include <vector>
#include <pthread.h>
#include <unistd.h>
void *Master(void *args)
{
std::string name = static_cast<char *>(args);
while (true)
{
std::cout << name << std::endl;
sleep(1);
}
}
void StartMaster(std::vector<pthread_t> *tids)
{
pthread_t tid;
int n = pthread_create(&tid, nullptr, Master, (void *)"Master Thread");
if (n == 0)
{
std::cout << "create master success" << std::endl;
}
tids->emplace_back(tid);
}
void *Slaver(void *args)
{
std::string name = static_cast<char *>(args);
while (true)
{
std::cout << name << std::endl;
sleep(1);
}
}
void StartSlaver(std::vector<pthread_t> *tids, int num)
{
for (int i = 0; i < num; i++)
{
pthread_t tid;
char *name = new char[20];
snprintf(name, 20, "salver-00%d", i + 1);
int n = pthread_create(&tid, nullptr, Slaver, name);
if (n == 0)
{
std::cout << "create success: " << name << std::endl;
tids->emplace_back(tid);
}
}
}
void WaitThread(std::vector<pthread_t> tids)
{
for (auto &tid : tids)
{
pthread_join(tid, nullptr);
}
}
int main()
{
std::vector<pthread_t> tids; // 这里放所有的线程的tid
StartMaster(&tids); // 启动主线程
StartSlaver(&tids, 5); // 启动新线程
WaitThread(tids); // 等待新线程
return 0;
}
我们写了这样的一份代码,会发现最一开始输出是乱的
这是因为,所有的进行都向一个文件进行写入(标准输出流),那么此时标准输出流就是共享资源,是临界资源
使用条件变量来解决
#include <iostream>
#include <string>
#include <cstring>
#include <vector>
#include <pthread.h>
#include <unistd.h>
pthread_mutex_t gmutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t gcond = PTHREAD_COND_INITIALIZER; // 创建一个锁和条件变量
void *Master(void *args) // 我们选择在主线程里面进行条件的唤醒
{
std::string name = static_cast<char *>(args);
while (true)
{
// std::cout << name << std::endl;
sleep(1);
pthread_cond_signal(&gcond); // 唤醒其中一个队列首部的线程
// pthread_cond_broadcast(&gcond); // 唤醒队列中所有的线程
std::cout << "master 唤醒一个线程..." << std::endl;
}
}
void StartMaster(std::vector<pthread_t> *tids)
{
pthread_t tid;
int n = pthread_create(&tid, nullptr, Master, (void *)"Master Thread");
if (n == 0)
{
std::cout << "create master success" << std::endl;
}
tids->emplace_back(tid);
}
void *Slaver(void *args)
{
std::string name = static_cast<char *>(args);
while (true)
{
// 1. 加锁
pthread_mutex_lock(&gmutex);
// 2. 一般条件变量是在加锁和解锁之间使用的
pthread_cond_wait(&gcond, &gmutex); // gmutex:这个是,是用来被释放互斥锁的
std::cout << name << std::endl;
sleep(1);
pthread_mutex_unlock(&gmutex);
// 3.解锁
}
}
void StartSlaver(std::vector<pthread_t> *tids, int num)
{
for (int i = 0; i < num; i++)
{
pthread_t tid;
char *name = new char[20];
snprintf(name, 20, "salver-00%d", i + 1);
int n = pthread_create(&tid, nullptr, Slaver, name);
if (n == 0)
{
std::cout << "create success: " << name << std::endl;
tids->emplace_back(tid);
}
}
}
void WaitThread(std::vector<pthread_t> tids)
{
for (auto &tid : tids)
{
pthread_join(tid, nullptr);
}
}
int main()
{
std::vector<pthread_t> tids; // 这里放所有的线程的tid
StartMaster(&tids); // 启动主线程
StartSlaver(&tids, 5); // 启动新线程
WaitThread(tids); // 等待新线程
return 0;
}
就是在slave thread的执行函数里进行加锁和条件等待 在master thread的执行函数里进行唤醒
超市(交易场所):
生产者(Producer):
消费者(Consumer):
3种关系:
生产者 vs 生产者 — 互斥
多个生产者线程可能同时试图向共享缓冲区(如队列或数组)中写入数据。为了防止数据竞争和不一致,我们需要使用互斥机制来确保同一时间只有一个生产者线程能够访问共享资源。 互斥通常通过互斥锁(Mutex)来实现。当一个生产者线程获得互斥锁时,其他生产者线程将被阻塞,直到锁被释放。这样,每个生产者线程在写入缓冲区时都能独占资源,从而避免了数据竞争。
消费者 vs 消费者 — 互斥
多个消费者线程可能同时试图从共享缓冲区中读取数据。为了确保数据的正确性和一致性,我们同样需要使用互斥机制来防止多个消费者线程同时访问缓冲区。 互斥锁在这里同样起到关键作用。当一个消费者线程获得互斥锁时,其他消费者线程将被阻塞,直到锁被释放。这样,每个消费者线程在读取缓冲区时都能独占资源,避免了潜在的冲突和不一致。
生产者 vs 消费者 — 互斥 && 同步
生产者线程和消费者线程需要共享一个缓冲区。这要求我们使用互斥机制来确保同一时间只有一个线程(生产者或消费者)能够访问缓冲区,以避免数据竞争和不一致。 但是,仅仅互斥是不够的。我们还需要使用同步机制来确保生产者和消费者之间的协调。例如,当缓冲区为空时,消费者线程应该被阻塞,直到生产者线程向其中添加了数据。同样地,当缓冲区满时,生产者线程也应该被阻塞,直到消费者线程从中取走了数据。 同步通常通过条件变量(Condition Variables)来实现。生产者线程在添加数据到缓冲区后,会向条件变量发送信号(signal),以唤醒等待的消费者线程。类似地,消费者线程在取走数据后,也会向条件变量发送信号,以唤醒等待的生产者线程。通过这种方式,生产者和消费者线程能够协调地工作,确保缓冲区的有效使用和数据的一致性。
优点:
在多线程编程中阻塞队列(Blocking Queue)是一种常用于实现生产者和消费者模型的数据结构。 其与普通的队列区别在于,当队列为空时,从队列获取元素的操作将会被阻塞,直到队列中被放入了元素;当队列满时,往队列里存放元素的操作也会被阻塞,直到有元素被从队列中取出(以上的操作都是基于不同的线程来说的,线程在对阻塞队列进程操作时会被阻塞)
这里有个疑问,明明我们放任务和拿任务时都是串行的(加了锁,一次只有一个线程),为什么生产消费模型优点还是并发性呢?
我们来尝试实现一个BQ
#ifndef __BLOCK_QUEUE_HPP__
#define __BLOCK_QUEUE_HPP__
#include <iostream>
#include <string>
#include <queue>
#include <pthread.h>
template <class T>
class BlockQueue
{
private:
bool IsFull()
{
return _block_queue.size() == _cap;
}
bool IsEmpty()
{
return _block_queue.empty();
}
public:
BlockQueue(int cap) : _cap(cap)
{
_consum_wait_num = 0;
_product_wait_num = 0;
pthread_mutex_init(&_mutex, nullptr);
pthread_cond_init(&_product_cond, nullptr);
pthread_cond_init(&_consum_cond, nullptr);
}
void Enqueue(T &in) // 生产者用的接口
{
pthread_mutex_lock(&_mutex);
while (IsFull())
{
// 生产线程去等待,是在临界区中休眠的!你现在还持有锁呢!!!
// 1. pthread_cond_wait调用是: a. 让调用进程等待 b. 自动释放曾经持有的_mutex锁
_product_wait_num++;
pthread_cond_wait(&_product_cond, &_mutex);
_product_wait_num--;
}
// 进行生产
_block_queue.push(in);
// 通知消费者来消费
if (_consum_wait_num > 0)
{
pthread_cond_signal(&_consum_cond);
}
pthread_mutex_unlock(&_mutex); // 其实解锁和唤醒条件顺序无所谓,先唤醒后那边等着,解锁后直接竞争
// 如果先解锁,后唤醒:先解锁没任何效果,因为都在wait那里等,一唤醒就直接得到锁
}
void Pop(T *out) // 消费者用的接口
{
pthread_mutex_lock(&_mutex);
while (IsEmpty())
{
// 消费线程去等待,是在临界区中休眠的!你现在还持有锁呢!!!
// 1. pthread_cond_wait调用是: a. 让调用进程等待 b. 自动释放曾经持有的_mutex锁——
_consum_wait_num++;
pthread_cond_wait(&_consum_cond, &_mutex);
_consum_wait_num--;
}
// 进行消费
*out = _block_queue.front();
_block_queue.pop();
// 通知生产者来生产
if (_product_wait_num > 0)
{
pthread_cond_signal(&_product_cond);
}
pthread_mutex_unlock(&_mutex);
}
~BlockQueue()
{
pthread_mutex_destroy(&_mutex);
pthread_cond_destroy(&_product_cond);
pthread_cond_destroy(&_consum_cond);
}
private:
std::queue<T> _block_queue; // 阻塞队列
int _cap; // 总上限
pthread_mutex_t _mutex; // 保护_block_queue的锁
pthread_cond_t _product_cond; // 专门给生产者提供的条件变量
pthread_cond_t _consum_cond; // 专门给消费者提供的条件变量
int _product_wait_num; // 等待的生产者数量
int _consum_wait_num; // 等待的消费者数量
};
#endif
Thread.hpp与BlockQueue.hpp我们上面已经进行展示了,接下来只进行剩下二者
Task.hpp
#pragma once
#include <iostream>
#include <string>
class Task
{
public:
Task() {}
Task(int a, int b) : _a(a), _b(b), _result(0)
{
}
void Excute()
{
_result = _a + _b;
}
std::string ResultToString()
{
return std::to_string(_a) + "+" + std::to_string(_b) + "=" + std::to_string(_result);
}
std::string DebugToString()//测试的时候使用
{
return std::to_string(_a) + "+" + std::to_string(_b) + "=?";
}
private:
int _a;
int _b;
int _result;
};
Main.cc
#include "BlockQueue.hpp"
#include "Thread.hpp"
#include "Task.hpp"
#include <string>
#include <vector>
#include <unistd.h>
using namespace ThreadModule;
int a = 10;
using blockqueue_t = BlockQueue<Task>;
void Consumer(blockqueue_t &bq)
{
while (true)
{
// 1.从Blockqueue里面取出任务
Task t;
bq.Pop(&t);
// 2.开始执行任务
t.Excute();
std::cout << "Consumer Consum result is : " << t.ResultToString() << std::endl;
sleep(2);
}
}
void Productor(blockqueue_t &bq)
{
int cnt = 1;
srand(time(nullptr));
while (true)
{
int a = rand() % 10;
int b = rand() % 5;
Task t(a, b);
bq.Enqueue(t);
cnt++;
}
}
void StartComm(std::vector<Thread<blockqueue_t>> *threads, int num, blockqueue_t &bq, func_t<blockqueue_t> func)
{
for (int i = 0; i < num; i++)
{
std::string name = "thread-" + std::to_string(i + 1);
threads->emplace_back(func, bq, name);
threads->back().Start();
}
}
void StartConsumer(std::vector<Thread<blockqueue_t>> *threads, int num, blockqueue_t &bq)
{
StartComm(threads, num, bq, Consumer);
}
void StartProductor(std::vector<Thread<blockqueue_t>> *threads, int num, blockqueue_t &bq)
{
StartComm(threads, num, bq, Productor);
}
void WaitAllThread(std::vector<Thread<blockqueue_t>> &threads)
{
for (auto &thread : threads)
{
thread.Join();
}
}
int main()
{
blockqueue_t *bq = new blockqueue_t(5);
std::vector<Thread<blockqueue_t>> threads;
StartProductor(&threads, 1, *bq);
StartConsumer(&threads, 1, *bq);
WaitAllThread(threads);
return 0;
}
今天也是到这里啦!!!