多线程编程时需要考虑多线程竞争资源可能出现的问题,加锁是一种常用的解决方案。
锁的本质属性是为事物提供“访问保护”,例如:大门上的锁,是为了保护房子免于不速之客的到访;自行车的锁,是为了保护自行车只有owner才可以使用;保险柜上的锁,是为了保护里面的合同和金钱等重要东西……
并发编程中经常需要考虑并发资源竞争读写的问题,因为多个流程同时修改、读取同一个资源时往往会发生超出预期的奇怪行为,因此我们的原则是并发执行任务但是资源读取的过程是清楚干净的。
在 c++ 等高级编程语言中,锁也是用来提供“访问保护”的,不过被保护的东西不再是房子、自行车、金钱,而是内存中的各种变量。此外,计算机领域对于“锁”有个响亮的名字——mutex(互斥量)。 从c11开始,c提供了std::mutex类型,对于多线程的加锁操作提供了很好的支持。
线程之间的锁有: 互斥锁、条件锁、自旋锁、读写锁、递归锁。一般而言,锁的功能与性能成反比。
互斥锁用于控制多个线程对他们之间共享资源互斥访问的一个信号量。也就是说是为了避免多个线程在某一时刻同时操作一个共享资源。
例如线程池中的有多个空闲线程和一个任务队列。任何是一个线程都要使用互斥锁互斥访问任务队列,以避免多个线程同时访问任务队列以发生错乱。
在某一时刻,只有一个线程可以获取互斥锁,在释放互斥锁之前其他线程都不能获取该互斥锁。如果其他线程想要获取这个互斥锁,那么这个线程只能以阻塞方式进行等待。
项目 | 内容 |
|---|---|
头文件 | < mutex > |
类型 | std::mutex |
用法 | 在C中,通过构造 std::mutex 的实例创建互斥元,调用成员函数 lock() 来锁定它,调用 unlock() 来解锁不过一般不推荐这种做法,标准C库提供了 std::lock_guard 和 unique_lock 类模板,都是 RAII 风格,它们是在定义时获得锁,在析构时释放锁。它们的主要区别在于 unique_lock 锁机制更加灵活,可以再需要的时候进行 lock 或者 unlock 调用,不非得是析构或者构造时。std::mutex 和 std::lock_guard 都声明在 <mutex>头文件中。 |
对于 std::mutex 对象,任意时刻最多允许一个线程对其进行上锁
mtx.lock()
调用该函数的线程尝试加锁。如果上锁不成功,即:其它线程已经上锁且未释放,则当前线程block。如果上锁成功,则执行后面的操作,操作完成后要调用 mtx.unlock() 释放锁,否则会导致死锁的产生mtx.unlock()
释放锁,std::mutex 还有一个操作:mtx.try_lock(),字面意思就是:“尝试上锁”,与 mtx.lock() 的不同点在于:如果上锁不成功,当前线程不阻塞。虽然 std::mutex 可以对多线程编程中的共享变量提供保护,但是直接使用 std::mutex 的情况并不多。因为仅使用 std::mutex 有时候会发生死锁。
考虑这样一个情况:假设线程1上锁成功,线程2上锁等待。但是线程1上锁成功后,抛出异常并退出,没有来得及释放锁,导致线程2“永久的等待下去”,此时就发生了死锁。
std::lock_guard 只有构造函数和析构函数。简单的来说:当调用构造函数时,会自动调用传入的对象的lock()函数,而当调用析构函数时,自动调用 unlock() 函数(这就是所谓的RAII,读者可自行搜索)。
lock_guard 还有一个构造函数 lock_guard( mutex_type& m, std::adopt_lock_t t ); 其中第二个参数类型为:std::adopt_lock_t。这个构造函数假定:当前线程已经上锁成功,所以不再调用lock()函数。
//用互斥元保护列表
#include <list>
#include <mutex>
std::list<int> some_list;
std::mutex some_mutex;
void add_to_list(int new_value)
{
std::lock_guard<std::mutex> guard(some_mutex);
some_list.push_back(new_value);
}假定有一个全局变量counter,启动两个线程,每个都对该变量自增10000次,最后输出该变量的值。
#include <iostream>
#include <thread>
#include <vector>
#include <mutex>
#include <chrono>
#include <stdexcept>
int counter = 0;
void increase(int time)
{
for (int i = 0; i < time; i++) {
// 当前线程休眠1毫秒
std::this_thread::sleep_for(std::chrono::milliseconds(1));
counter++;
}
}
int main(int argc, char** argv)
{
std::thread t1(increase, 1000);
std::thread t2(increase, 1000);
t1.join();
t2.join();
std::cout << "counter:" << counter << std::endl;
return 0;
}随便运行一次,输出结果
counter:1993没有输出理论上的 2000,出现上述情况的原因是:自增操作"counter++"不是原子操作,而是由多条汇编指令完成的。多个线程对同一个变量进行读写操作就会出现不可预期的操作。以上面的demo1作为例子:假定counter当前值为10,线程1读取到了10,线程2也读取到了10,分别执行自增操作,线程1和线程2分别将自增的结果写回counter,不管写入的顺序如何,counter都会是11,但是线程1和线程2分别执行了一次自增操作,我们期望的结果是12。
定义一个 std::mutex 对象用于保护 counter 变量。对于任意一个线程,如果想访问 counter,首先要进行"加锁"操作,如果加锁成功,则进行 counter 的读写,读写操作完成后 释放锁(重要!!!); 如果“加锁”不成功,则线程阻塞,直到加锁成功。
#include <iostream>
#include <thread>
#include <vector>
#include <mutex>
#include <chrono>
#include <stdexcept>
int counter = 0;
std::mutex mtx; // 保护counter
void increase(int time) {
for (int i = 0; i < time; i++) {
mtx.lock();
// 当前线程休眠1毫秒
std::this_thread::sleep_for(std::chrono::milliseconds(1));
counter++;
mtx.unlock();
}
}
int main(int argc, char** argv) {
std::thread t1(increase, 1000);
std::thread t2(increase, 1000);
t1.join();
t2.join();
std::cout << "counter:" << counter << std::endl;
return 0;
}运行结果为:
counter:2000死锁:死锁是指两个或两个以上的进程(线程)在运行过程中因争夺资源而造成的一种僵局,若无外力作用,这些进程(线程)都将无法向前推进。
以下情况会出现死锁:
mutex m0,m1;
int i = 0;
void fun0()
{
while (i < 100)
{
lock_guard<mutex> g0(m0); //线程0加锁0
lock_guard<mutex> g1(m1); //线程0加锁1
cout << "thread 0 running..." << endl;
}
return;
}
void fun1()
{
while (i < 100)
{
lock_guard<mutex> g1(m1); //线程1加锁1
lock_guard<mutex> g0(m0); //线程1加锁0
cout << "thread 1 running... "<< i << endl;
}
return;
}
int main()
{
thread p0(fun0);
thread p1(fun1);
p0.join();
p1.join();
return 0;
}
mutex m0,m1;
int i = 0;
void fun0()
{
while (i < 100)
{
lock_guard<mutex> g0(m0); //线程0加锁0
lock_guard<mutex> g1(m1); //线程0加锁1
cout << "thread 0 running..." << endl;
}
return;
}
void fun1()
{
while (i < 100)
{
lock_guard<mutex> g0(m0); //线程1加锁0
lock_guard<mutex> g1(m1); //线程1加锁1
cout << "thread 1 running... "<< i << endl;
}
return;
}
int main()
{
thread p0(fun0);
thread p1(fun1);
p0.join();
p1.join();
return 0;
}
需要用到lock函数
mutex m0,m1;
int i = 0;
void fun0()
{
while (i < 100)
{
lock(m0,m1);
lock_guard<mutex> g0(m0, adopt_lock);
lock_guard<mutex> g1(m1, adopt_lock);
cout << "thread 0 running..." << endl;
}
return;
}
void fun1()
{
while (i < 100)
{
lock(m0,m1);
lock_guard<mutex> g0(m0, adopt_lock);
lock_guard<mutex> g1(m1, adopt_lock);
cout << "thread 1 running... "<< i << endl;
}
return;
}
int main()
{
thread p0(fun0);
thread p1(fun1);
p0.join();
p1.join();
return 0;
}
注意到这里的
lock_guard中多了第二个参数adopt_lock,这个参数表示在调用lock_guard时,已经加锁了,防止lock_guard在对象生成时构造函数再次lock()。 lock_guard 避免死锁
可以防止 mutex 由于异常没有释放造成的死锁问题。
#include <iostream>
#include <thread>
#include <vector>
#include <mutex>
#include <chrono>
#include <stdexcept>
int counter = 0;
std::mutex mtx; // 保护counter
void increase_proxy(int time, int id) {
for (int i = 0; i < time; i++) {
// std::lock_guard对象构造时,自动调用mtx.lock()进行上锁
// std::lock_guard对象析构时,自动调用mtx.unlock()释放锁
std::lock_guard<std::mutex> lk(mtx);
// 线程1上锁成功后,抛出异常:未释放锁
if (id == 1) {
throw std::runtime_error("throw excption....");
}
// 当前线程休眠1毫秒
std::this_thread::sleep_for(std::chrono::milliseconds(1));
counter++;
}
}
void increase(int time, int id) {
try {
increase_proxy(time, id);
}
catch (const std::exception& e){
std::cout << "id:" << id << ", " << e.what() << std::endl;
}
}
int main(int argc, char** argv) {
std::thread t1(increase, 10000, 1);
std::thread t2(increase, 10000, 2);
t1.join();
t2.join();
std::cout << "counter:" << counter << std::endl;
return 0;
}
当需要死循环判断某个条件成立与否时【true or false】,我们往往需要开一个线程死循环来判断,这样非常消耗CPU。使用条件变量,可以让当前线程 wait,释放CPU,如果条件改变时,我们再notify退出线程,再次进行判断。
条件锁就是所谓的条件变量,某一个线程因为某个条件未满足时可以使用条件变量使该程序处于阻塞状态。一旦条件满足以“信号量”的方式唤醒一个因为该条件而被阻塞的线程(常和互斥锁配合使用),唤醒后,需要检查变量,避免虚假唤醒。
最为常见就是在线程池中,起初没有任务时任务队列为空,此时线程池中的线程因为“任务队列为空”这个条件处于阻塞状态。一旦有任务进来,就会以信号量的方式唤醒一个线程来处理这个任务。
项目 | 内容 |
|---|---|
头文件 | < condition_variable > |
类型 | std::condition_variable(只和std::mutex一起工作) 和 std::condition_variable_any(符合类似互斥元的最低标准的任何东西一起工作)。 |
用法 | 借由条件变量,一个线程可以唤醒一个或多个其他等待中的线程。 |
想要修改共享变量(即“条件”)的线程必须:
std::mutexstd::condition_variable 执行 notify_one 或 notify_all (当做 notify 动作时,不必持有锁)
即使共享变量是原子性的,它也必须在 mutex 的保护下被修改,这是为了能够将改动正确发布到正在等待的线程。任意要等待 std::condition_variable 的线程必须:
获取 std::unique_lock<std::mutex>,这个 mutex 正是用来保护共享变量(即“条件”)的。
执行 wait , wait_for 或者 wait_until。这些等待动作原子性地释放 mutex,并使得线程的执行暂停。
当获得条件变量的通知,或者超时,或者一个虚假的唤醒,那么线程就会被唤醒,并且获得 mutex。然后线程应该检查条件是否成立,如果是虚假唤醒,就继续等待。
注: 所谓虚假唤醒,就是因为某种未知的罕见的原因,线程被从等待状态唤醒了,但其实共享变量(即条件)并未变为 true。因此此时应继续等待
std::deque<int> q;
std::mutex mu;
std::condition_variable cond;
void function_1() //生产者
{
int count = 10;
while (count > 0)
{
std::unique_lock<std::mutex> locker(mu);
q.push_front(count);
locker.unlock();
cond.notify_one(); // Notify one waiting thread, if there is one.
std::this_thread::sleep_for(std::chrono::seconds(1));
count--;
}
}
void function_2() //消费者
{
int data = 0;
while (data != 1)
{
std::unique_lock<std::mutex> locker(mu);
while (q.empty())
cond.wait(locker); // Unlock mu and wait to be notified
data = q.back();
q.pop_back();
locker.unlock();
std::cout << "t2 got a value from t1: " << data << std::endl;
}
}
int main()
{
std::thread t1(function_1);
std::thread t2(function_2);
t1.join();
t2.join();
return 0;
}上面的代码有三个注意事项:
function_2中,在判断队列是否为空的时候,使用的是while(q.empty()),而不是 if(q.empty()),这是因为 wait() 从阻塞到返回,不一定就是由于 notify_one() 函数造成的,还有可能由于系统的不确定原因唤醒(可能和条件变量的实现机制有关),这个的时机和频率都是不确定的,被称作伪唤醒。如果在错误的时候被唤醒了,执行后面的语句就会错误,所以需要再次判断队列是否为空,如果还是为空,就继续 wait()阻塞;std::unique_lock 而不是 std::lock_guard, 而且事实上也不能使用std::lock_guard。这需要先解释下 wait() 函数所做的事情,可以看到,在 wait() 函数之前,使用互斥锁保护了,如果 wait 的时候什么都没做,岂不是一直持有互斥锁?那生产者也会一直卡住,不能够将数据放入队列中了。所以,wait() 函数会先调用互斥锁的 unlock() 函数,然后再将自己睡眠,在被唤醒后,又会继续持有锁,保护后面的队列操作。**lock_guard** 没有 lock 和 unlock 接口,而 unique_lock 提供了,这就是必须使用 unique_lock 的原因;notify_one() 的时候,不需要处于互斥锁的保护范围内,所以在唤醒条件变量之前可以将锁 unlock()。假设我们有一个两个处理器core1和core2计算机,现在在这台计算机上运行的程序中有两个线程:T1和T2分别在处理器core1和core2上运行,两个线程之间共享着一个资源。
首先我们说明互斥锁的工作原理,互斥锁是是一种 sleep-waiting 的锁。假设线程T1获取互斥锁并且正在core1上运行时,此时线程T2也想要获取互斥锁(pthread_mutex_lock),但是由于T1正在使用互斥锁使得T2被阻塞。当T2处于阻塞状态时,T2被放入到等待队列中去,处理器core2会去处理其他任务而不必一直等待(忙等)。也就是说处理器不会因为线程阻塞而空闲着,它去处理其他事务去了。
而自旋锁就不同了,自旋锁是一种 busy-waiting 的锁。也就是说,如果T1正在使用自旋锁,而T2也去申请这个自旋锁,此时T2肯定得不到这个自旋锁。与互斥锁相反的是,此时运行T2的处理器core2会一直不断地循环检查锁是否可用(自旋锁请求),直到获取到这个自旋锁为止。
从“自旋锁”的名字也可以看出来,如果一个线程想要获取一个被使用的自旋锁,那么它会一致占用CPU请求这个自旋锁使得CPU不能去做其他的事情,直到获取这个锁为止,这就是“自旋”的含义。
当发生阻塞时,互斥锁可以让CPU去处理其他的任务;而自旋锁让CPU一直不断循环请求获取这个锁。 通过两个含义的对比可以我们知道“自旋锁”是比较耗费CPU的。
// 用户空间用 atomic_flag 实现自旋互斥
#include <thread>
#include <vector>
#include <iostream>
#include <atomic>
std::atomic_flag lock = ATOMIC_FLAG_INIT;
void f(int n)
{
for (int cnt = 0; cnt < 100; ++cnt) {
while (lock.test_and_set(std::memory_order_acquire)) // 获得锁
; // 自旋
std::cout << "Output from thread " << n << '\n';
lock.clear(std::memory_order_release); // 释放锁
}
}
int main()
{
std::vector<std::thread> v;
for (int n = 0; n < 10; ++n) {
v.emplace_back(f, n);
}
for (auto& t : v) {
t.join();
}
}
atomic 是C标准程序库中的一个头文件,定义了 C11 标准中的一些表示线程、并发控制时原子操作的类与方法等。此头文件主要声明了两大类原子对象:std::atomic和std::atomic_flag。
atomic_flag 类
是一种简单的原子布尔类型,只支持两种操作:test_and_set(flag=true) 和 clear(flag=false)。std::atomic 类模板std::atomic 既不可复制亦不可移动。atomic 对 int、char、bool 等数据结构进行了原子性封装,在多线程环境中,对 std::atomic 对象的访问不会造成竞争-冒险。利用 std::atomic 可实现数据结构的无锁设计。所谓的原子操作,取的就是“原子是最小的、不可分割的最小个体”的意义,它表示在多个线程访问同一个全局资源的时候,能够确保所有其他的线程都不在同一时间内访问相同的资源。也就是他确保了在同一时刻只有唯一的线程对这个资源进行访问。这有点类似互斥对象对共享资源的访问的保护,但是原子操作更加接近底层,因而效率更高。使用原子操作能大大的提高程序的运行效率。
#include <iostream>
#include <ctime>
#include <vector>
#include <thread>
#include <atomic>
std::atomic<size_t> count(0);
void threadFun()
{
for (int i = 0; i < 10000; i++)
count++;
}
int main(void)
{
clock_t start_time = clock();
// 启动多个线程
std::vector<std::thread> threads;
for (int i = 0; i < 10; i++)
threads.push_back(std::thread(threadFun));
for (auto&thad : threads)
thad.join();
// 检测count是否正确 10000*10 = 100000
std::cout << "count number:" << count << std::endl;
clock_t end_time = clock();
std::cout << "耗时:" << end_time - start_time << "ms" << std::endl;
return 0;
}
先看互斥锁,它只有两个状态,要么是加锁状态,要么是不加锁状态。假如现在一个线程 a 只是想读一个共享变量 i,因为不确定是否会有线程去写它,所以我们还是要对它进行加锁。但是这时又有一个线程b试图去读共享变量 i,发现被锁定了,那么b不得不等到a释放了锁后才能获得锁并读取 i 的值,但是两个读取操作即使是同时发生的,也并不会像写操作那样造成竞争,因为它们不修改变量的值。所以我们期望在多个线程试图读取共享变量的时候,它们可以立刻获取因为读而加的锁,而不是需要等待前一个线程释放。
读写锁可以解决上面的问题。它提供了比互斥锁更好的并行性。因为以读模式加锁后,当有多个线程试图再以读模式加锁时,并不会造成这些线程阻塞在等待锁的释放上。
读写锁是多线程同步的另外一个机制。在一些程序中存在读操作和写操作问题,对某些资源的访问会存在两种可能情况,一种情况是访问必须是排他的,就是独占的意思,这种操作称作写操作,另外一种情况是访问方式是可以共享的,就是可以有多个线程同时去访问某个资源,这种操作称为读操作。这个问题模型是从对文件的读写操作中引申出来的。把对资源的访问细分为读和写两种操作模式,这样可以大大增加并发效率。读写锁比互斥锁适用性更高,并行性也更高。
需要注意的是,这里只是说并行效率比互斥高,并不是速度一定比互斥锁快,读写锁更复杂,系统开销更大。并发性好对于用户体验非常重要,假设互斥锁需要0.5秒,使用读写锁需要0.8秒,在类似学生管理系统的软件中,可能90%的操作都是查询操作。如果突然有20个查询请求,使用的是互斥锁,则最后的查询请求被满足需要10秒,估计没人接收。使用读写锁时,因为读锁能多次获得,所以20个请求中,每个请求都能在1秒左右被满足,用户体验好的多。
项目 | 内容 |
|---|---|
头文件 | < boost/thread/shared_mutex.cpp > |
类型 | boost::shared_lock |
用法 | 你可以使用 boost::shared_ mutex 的实例来实现同步,而不是使用 std::mutex 的实例。对于更新操作,std::lock_guard< boost::shared _mutex> 和 std::unique _lock< boost::shared _mutex> 可用于锁定,以取代相应的 std::mutex 特化。这确保了独占访问,就像 std::mutex 那样。那些不需要更新数据结构的线程能够转而使用 boost::shared_lock< boost::shared _mutex>来获得共享访问。这与 std::unique _lock 用起来正是相同的,除了多个线程在同一时间,同一boost::shared _mutex 上可能会具有共享锁。唯一的限制是,如果任意一个线程拥有一个共享锁,试图获取独占锁的线程会被阻塞,知道其他线程全都撤回它们的锁。同样的,如果一个线程具有独占锁,其他线程都不能获取共享锁或独占锁,直到第一个线程撤回它的锁。 |
简单的说:
shared_lock 是 read_lock。被锁后仍允许其他线程执行同样被 shared_lock 的代码。这是一般做读操作时的需要。unique_lock 是 write_lock。被锁后不允许其他线程执行被 shared_lock 或 unique_lock 的代码。在写操作时,一般用这个,可以同时限制 unique_lock 的写和 share_lock 的读。std::recursive_mutex 与 std::mutex 一样,也是一种可以被上锁的对象,但是和 std::mutex 不同的是,std::recursive_mutex 允许同一个线程对互斥量多次上锁(即递归上锁),来获得对互斥量对象的多层所有权,std::recursive_mutex 释放互斥量时需要调用与该锁层次深度相同次数的 unlock(),可理解为 lock() 次数和 unlock() 次数相同,除此之外,std::recursive_mutex 的特性和 std::mutex 大致相同。
例如函数 a 需要获取锁 mutex,函数 b 也需要获取锁 mutex,同时函数 a 中还会调用函数 b。如果使用std::mutex 必然会造成死锁。但是使用 std::recursive_mutex 就可以解决这个问题。