上次我们使用了阻塞队列的生产消费模型,在先前的生产者-消费者模型代码中,当一个线程想要操作临界资源时,必须确保临界资源处于满足条件的状态才能进行修改;否则无法修改。例如,在
Enqueue
接口中,当队列已满时,临界资源处于条件不可用的状态,无法继续进行push
操作。此时,线程应该进入条件变量队列cond
中等待。如果队列未满,即临界资源条件已准备好,那么可以继续push
,调用队列_q
的push
接口。 观察代码可以看到,在判断临界资源是否就绪之前,必须先获取锁,因为判断临界资源实质上就是对临界资源的访问,而访问临界资源自然需要加锁以保护。因此,代码通常会先获取锁,然后手动检查临界资源的就绪状态,根据状态判断是等待还是直接操作临界资源。 但是如果事先知道临界资源的状态是否就绪,则无需一上来就加锁。一旦提前知道临界资源的就绪状态,便不再需要手动检查资源状态。在这种情况下,若有一个计数器来表示临界资源中小块资源的数量(如队列中每个空间),线程在访问临界资源前会先请求该计数器。若计数器大于0,则表明队列中有空余位置,可以直接向队列push
数据;若计数器等于0,则说明队列已满,不能继续push
数据,应该阻塞等待,直至计数器再次大于0,方可继续向队列push
数据。
void Enqueue(T &in) // 生产者用的接口
{
pthread_mutex_lock(&_mutex);
while (IsFull())
{
// 生产线程去等待,是在临界区中休眠的!你现在还持有锁呢!!!
// 1. pthread_cond_wait调用是: a. 让调用进程等待 b. 自动释放曾经持有的_mutex锁
_product_wait_num++;
pthread_cond_wait(&_product_cond, &_mutex);
_product_wait_num--;
}
// 进行生产
_block_queue.push(in);
// 通知消费者来消费
if (_consum_wait_num > 0)
{
pthread_cond_signal(&_consum_cond);
}
pthread_mutex_unlock(&_mutex); // 其实解锁和唤醒条件顺序无所谓,先唤醒后那边等着,解锁后直接竞争
// 如果先解锁,后唤醒:先解锁没任何效果,因为都在wait那里等,一唤醒就直接得到锁
}
信号量是一种用于进程间通信和同步的机制。它本质上是一个计数器,用于衡量系统中的资源可用数量。通过信号量,可以实现对临界资源的访问控制,确保多个进程或线程能够安全地共享资源而不发生冲突。
在访问临界资源之前,程序可以通过申请信号量来获取对资源的访问权限。如果信号量的值大于0,表示资源可用,程序可以继续访问资源;如果信号量的值等于0,表示资源已被占用,程序需要等待,直到资源可用为止。
信号量并不仅仅是简单的计数器,它是通过原子操作实现的,确保信号量的操作是线程安全的。常用的信号量操作包括P操作(等待操作)和V操作(释放操作),也称为PV操作。P操作会将信号量的值减1,用于占用资源;V操作会将信号量的值加1,用于释放资源。
通过合理地使用信号量和PV操作,可以实现多线程或多进程之间的同步和互斥,避免资源竞争和死锁等并发问题。信号量是操作系统中重要的同步工具,广泛应用于进程间通信、资源管理、线程同步等场景。
system信号量和POSIX信号量都是用于进程间通信和同步的机制,但它们之间存在一些区别。
系统信号量是Linux系统提供的一种进程间通信和同步机制,而POSIX信号量是基于POSIX标准的一种同步机制,二者都可以实现进程或线程间的同步和互斥操作
初始化信号量:
使用sem_init
函数可以初始化信号量,给定的value
值会成为信号量的初始值。如果信号量是线程间共享的,可以被多个线程同时使用;如果是进程间共享的,可以被多个进程使用
#include <semaphore.h>//下面的函数都这此头文件
int sem_init(sem_t *sem, int pshared, unsigned int value);
sem
: 指向要初始化的信号量的指针(我们使用sem_t 类型直接定义)pshared
: 0 表示该信号量为线程间共享;非零值表示信号量为进程间共享value
: 信号量的初始值 销毁信号量:
使用sem_destroy
函数可以销毁之前初始化的信号量。在销毁信号量之前,要确保所有线程或进程都已经停止使用该信号量。
int sem_trywait(sem_t *sem);
sem
: 要销毁的信号量的指针等待信号量:(P操作- -)
使用sem_wait
函数可以等待信号量,即执行P操作。如果信号量的值大于0,则将其减1并立即返回,否则线程(或进程)会阻塞等待信号量变为大于0。
int sem_wait(sem_t *sem);
sem
: 要等待的信号量的指针发布信号量:(V操作++)
使用sem_post
函数可以发布(释放)信号量,即执行V操作。对信号量执行V操作会将其值加1,并唤醒可能正在等待该信号量的线程(或进程)。
int sem_post(sem_t *sem);
sem
: 要发布的信号量的指针之前在阻塞队列里,我们不能实现出队列与入队列的同时进行。现在因为是循环队列我们使用了两个索引,而两个索引不同时可以同时进行出和入 当为空时或者满时,二者只能有一个开始执行。然后就不再相等了,也是能分开进行了
#pragma once
#include <iostream>
#include <string>
#include <vector>
#include <semaphore.h>
#include <pthread.h>
template <typename T>
class RingQueue
{
public:
RingQueue(int cap) : _ringqueue(cap - 1), _cap(cap), _productor_index(0), _consumer_index(0) // vector初始化大小为cap个0
{
sem_init(&_room_sem, 0, _cap); // 这个是生产者的(能用的空间),一开始大小是整个空间的
sem_init(&_data_sem, 0, 0); // 这个是消费者的(能用的数据),一开始是0
pthread_mutex_init(&_productor_mutex, nullptr);
pthread_mutex_init(&_consumer_mutex, nullptr); // 锁的初始化
}
// P+V保证了消费与生产的互斥与同步
// 加锁和解锁保证了之间的互斥
// 我们采取先预定资源,再竞争锁
void Enqueue(const T &in) // 入队列
{
P(_room_sem); // p操作--
Lock(_productor_mutex);
// 到这里就说明一定有空间
_ringqueue[_productor_index++] = in;
_productor_index %= _cap; // 保证循环
Unlock(_productor_mutex);
V(_data_sem); // data++
}
void Pop(T *out) // 出队列 输出型参数
{
// 消费行为
P(_data_sem);
Lock(_consumer_mutex);
*out = _ringqueue[_consumer_index++];
_consumer_index %= _cap;
Unlock(_consumer_mutex);
V(_room_sem);
}
~RingQueue()
{
sem_destroy(&_room_sem);
sem_destroy(&_data_sem); // 处理信号量
pthread_mutex_destroy(&_productor_mutex);
pthread_mutex_destroy(&_consumer_mutex);
}
private:
void P(sem_t &sem) // 预定空间
{
sem_wait(&sem);
}
void V(sem_t &sem) // 还东西
{
sem_post(&sem);
}
void Lock(pthread_mutex_t &mutex)
{
pthread_mutex_lock(&mutex);
}
void Unlock(pthread_mutex_t &mutex)
{
pthread_mutex_unlock(&mutex);
}
private:
std::vector<T> _ringqueue; // 底层是一个数组
int _cap; // 容量上限
int _productor_index;
int _consumer_index; // 生产和消费的下标
sem_t _room_sem; // 生产者关心
sem_t _data_sem; // 消费者关心
// 定义锁,维护多生产多消费之间的互斥关系
pthread_mutex_t _productor_mutex;
pthread_mutex_t _consumer_mutex;
};
Tash.hpp
#pragma once
#include <functional>
#include <iostream>
using Task = std::function<void()>;
void Test()
{
std::cout << "This is the Test Funtion" << std::endl;
}
Thread.hpp
#ifndef __THREAD_HPP__
#define __THREAD_HPP__
#include <iostream>
#include <string>
#include <unistd.h>
#include <functional>
#include <pthread.h>
namespace ThreadModule
{
template <typename T>
using func_t = std::function<void(T &, std::string name)>;
// typedef std::function<void(const T&)> func_t;
template <typename T>
class Thread
{
public:
void Excute()
{
_func(_data, _threadname);
}
public:
Thread(func_t<T> func, T &data, const std::string &name = "none-name")
: _func(func), _data(data), _threadname(name), _stop(true)
{
}
static void *threadroutine(void *args) // 类成员函数,形参是有this指针的!!
{
Thread<T> *self = static_cast<Thread<T> *>(args);
self->Excute();
return nullptr;
}
bool Start()
{
int n = pthread_create(&_tid, nullptr, threadroutine, this);
if (!n)
{
_stop = false;
return true;
}
else
{
return false;
}
}
void Detach()
{
if (!_stop)
{
pthread_detach(_tid);
}
}
void Join()
{
if (!_stop)
{
pthread_join(_tid, nullptr);
}
}
std::string name()
{
return _threadname;
}
void Stop()
{
_stop = true;
}
~Thread() {}
private:
pthread_t _tid;
std::string _threadname;
T &_data; // 为了让所有的线程访问同一个全局变量
func_t<T> _func;
bool _stop;
};
} // namespace ThreadModule
#endif
Main.cc
#include "RingQueue.hpp"
#include "Thread.hpp"
#include "Task.hpp"
#include <string>
#include <vector>
#include <unistd.h>
using namespace ThreadModule;
int a = 10;
using ringqueue_t = RingQueue<Task>;
void Consumer(ringqueue_t &rq, std::string name)
{
while (true)
{
Task t;
rq.Pop(&t);
std::cout << "Consumer :" << " NAME" << name << std::endl;
t();
sleep(2);
}
}
void Productor(ringqueue_t &rq, std::string name)
{
int cnt = 1;
srand(time(nullptr));
while (true)
{
rq.Enqueue(Test);
std::cout << "Productor is : " << cnt << " NAME" << name << std::endl;
// sleep(2);
cnt++;
}
}
void InitComm(std::vector<Thread<ringqueue_t>> *threads, int num, ringqueue_t &rq, func_t<ringqueue_t> func, std::string prename)
{
for (int i = 0; i < num; i++)
{
std::string name = prename + "thread-00" + std::to_string(i + 1);
threads->emplace_back(func, rq, name);
}
}
void InitConsumer(std::vector<Thread<ringqueue_t>> *threads, int num, ringqueue_t &rq)
{
InitComm(threads, num, rq, Consumer, "Cons ");
}
void InitProductor(std::vector<Thread<ringqueue_t>> *threads, int num, ringqueue_t &rq)
{
InitComm(threads, num, rq, Productor, "Prod ");
}
void StartAll(std::vector<Thread<ringqueue_t>> &threads)
{
for (auto &thread : threads)
{
std::cout << "start: " << thread.name() << std::endl;
thread.Start();
}
}
void WaitAllThread(std::vector<Thread<ringqueue_t>> &threads)
{
for (auto &thread : threads)
{
thread.Join();
}
}
int main()
{
ringqueue_t *rq = new ringqueue_t(5);
std::vector<Thread<ringqueue_t>> threads;
InitProductor(&threads, 1, *rq);
InitConsumer(&threads, 1, *rq);
StartAll(threads);
WaitAllThread(threads);
return 0;
}
<stdarg.h>
头文件中定义了一些宏,用于处理 C 语言中的可变参数函数
#define va_start(ap, param) ap = (va_list)¶m
#define va_arg(ap, type) (*(type*)(ap++))
#define va_end(ap) ap = NULL
va_list
:va_list
是一个类型,它用来声明一个变量,这个变量将被用来依次访问可变参数列表中的参数。va_start
:va_start
宏用于初始化 va_list
变量。它接受两个参数:第一个参数是一个 va_list
类型的变量,用来指向参数列表;第二个参数是最后一个确定的参数的后一个参数,即可变参数列表中已知参数的后一个参数。这样就能让 va_list
从可变参数列表的第一个参数开始遍历。va_arg
:va_arg
宏用于先返回参数的值,再访问 va_list
中的下一个参数。它接受两个参数:第一个参数是 va_list
类型的变量;第二个参数是要获取的参数的类型。va_arg
的作用是逐个遍历可变参数列表,返回对应类型的参数值,并将 va_list
向后移动到下一个参数。va_end
:va_end
宏用于清理 va_list
变量,释放资源。一般来说,va_end
应该与对应的 va_start
成对出现,用来正确终止可变参数的处理。va_copy
:va_copy
宏用于将一个 va_list
类型的变量的值复制给另一个 va_list
类型的变量,以便在后续代码中再次访问相同的可变参数列表。va_copy
函数的原型类似于 va_copy(va_list dest, va_list src)
,通过将源 va_list
复制给目标 va_list
,使得目标 va_list
在后续代码中可以重新访问相同的可变参数列表。void Test(int num, ...)
{
va_list arg;
va_start(arg, num);
while (num)
{
int data = va_arg(arg, int);
std::cout << "data: " << data << std::endl;
num--;
}
va_end(arg); // arg = NULL
}
int main()
{
Test(3, 11, 22, 33);
return 0;
}
__VA_ARGS__
是 C/C++ 中的预定义宏,用于表示宏定义中的可变参数部分。在宏定义中,如果我们希望定义一个参数个数不确定的宏,就可以使用 __VA_ARGS__
来代表可变参数的部分。
使用方法
在宏定义中,__VA_ARGS__
常用于定义具有可变参数的宏
#define LOG(format, ...) printf(format, __VA_ARGS__)
在上面的示例中,LOG
宏定义了一个可变参数的输出日志功能。format
是格式化字符串,__VA_ARGS__
表示可变参数部分,当宏被调用时,实际参数会替换 __VA_ARGS__
部分。
工作原理
__VA_ARGS__
会被替换为实际参数列表。使用 ##
连接 format
和 __VA_ARGS__
,以确保在 __VA_ARGS__
为空时,不会产生额外的逗号(一般都会加上)()
Level
、输出日志到文件的函数、获取时间字符串、打印日志消息等。LOG
,用于方便打印日志信息。main
函数,创建了一个线程池 ThreadPool
实例,并向线程池添加任务。Task
,包含了任务的执行、结果转换为字符串等功能。
Thread
,包含了线程的执行函数、启动、分离、等待、停止等功能。
整体流程:在主程序中创建线程池并添加任务,线程池中的线程会从任务队列中获取任务并执行,执行过程中会记录日志信息。日志功能会将信息输出到屏幕或者保存到文件中,日志级别由枚举 Level
定义。
Log.hpp
#pragma once
#include <string>
#include <cstdio>
#include <time.h> //time函数和localtime函数
#include <iostream>
#include <sys/types.h>
#include <unistd.h> //getpid
#include <pthread.h>
#include <stdarg.h>
#include <fstream>
enum Level
{
DEBUG = 0,
INFO,
WARNING,
ERROR,
FATAL // 从上到下,程度依次增大
};
bool isSave = false; // 用来判断日志信息是否需要保存到文件中
std::string file_name = "log.txt";
std::string LevelToString(int level)
{
switch (level)
{
case DEBUG:
return "Debug";
case INFO:
return "Info";
case WARNING:
return "Warning";
case ERROR:
return "Error";
case FATAL:
return "Fatal";
default:
return "Unknown";
}
}
std::string GetTimeString()
{
time_t curr_time = time(nullptr);
struct tm *format_time = localtime(&curr_time); // format:格式
if (format_time == nullptr)
return "None";
char time_buffer[1024];
snprintf(time_buffer, sizeof(time_buffer), "%d-%d-%d %d:%d:%d", // snprintf 会确保在目标字符数组的末尾添加 null 结尾符 '\0',
// 以确保生成的字符串是以 null 结尾的,保证了能当成char*
format_time->tm_year + 1900,
format_time->tm_mon + 1,
format_time->tm_mday,
format_time->tm_hour,
format_time->tm_min,
format_time->tm_sec); // 从上到下,年月日、时分秒
return time_buffer; // 由于 std::string 类的构造函数支持接受以null结尾的C字符串指针作为参数(一般是直接char* 的不是char arr[])
// 因此在返回时会隐式地将 time_buffer 转换为 std::string 对象
}
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
void SaveToFile(std::string &filename, std::string &message)
{
std::ofstream out(filename, std::ios_base::app); // 以追加方式打开
if (!out.is_open())
{
return;
}
out << message;
out.close();
}
void LogMessage(std::string filename, int line, bool isSave, int level, const char *format, ...)
{
std::string levelstr = LevelToString(level);
std::string timestr = GetTimeString();
pid_t log_id = getpid();
va_list arg;
va_start(arg, format);
char buffer[1024];
vsnprintf(buffer, sizeof(buffer), format, arg);
va_end(arg); // 处理可变参数列表
std::string message = "[" + timestr + "]" + "[" + levelstr + "]" +
"[" + std::to_string(log_id) + "]" +
"[" + filename + "]" + "[" + std::to_string(line) + "] " + buffer + "\n";
pthread_mutex_lock(&mutex);
if (!isSave) // 是false就向显示器上打印
{
std::cout << message << std::endl;
}
else // 向文件里保存
{
SaveToFile(file_name, message);
}
pthread_mutex_unlock(&mutex);
}
#define LOG(level, format, ...) \
do \
{ \
LogMessage(__FILE__, __LINE__, isSave, level, format, ##__VA_ARGS__); \
} while (0)
// 在调用LogMessage时,参数一定是具体的,就使用__VA_ARGS__
// \: 反斜杠符号用于将宏定义延续到下一行,使得宏定义可以跨多行书写
#define EnableFile() \
do \
{ \
gIsSave = true; \
} while (0)
// 向文件输入
#define EnableScreen() \
do \
{ \
gIsSave = false; \
} while (0)
// 向屏幕输入
time()
函数:time_t time(time_t *timer)
函数用于获取当前的系统时间(从1970年1月1日0时0分0秒至今的秒数),返回一个 time_t
类型的值。
timer
不为 NULL
,则函数也会将时间戳写入到 timer
指向的变量中。time(NULL)
来获取当前的时间戳。localtime()
函数:struct tm *localtime(const time_t *timer)
函数将时间戳转换为本地时间,返回一个指向 struct tm
结构体的指针。
struct tm
结构体包含了年、月、日、时、分、秒等本地时间信息。localtime()
返回的是一个指向静态分配的结构体的指针tm
,因此在多线程环境下要小心使用。vsnprintf
是 C 语言标准库函数中的一个函数,用于将格式化的字符串输出到缓冲区中,且不超过特定字符数。它与 sprintf
函数相似,但 vsnprintf
可以处理可变参数列表,因此适用于不确定参数个数的情况。函数原型
int vsnprintf(char *str, size_t size, const char *format, va_list ap);
str
:指向要输出的字符缓冲区的指针。size
:要输出的字符数的最大限制(包括终止的 null 字符 \0
)。format
:格式化字符串,包含占位符 %
和格式规格。ap
:va_list
类型的参数列表,用于提供格式化字符串中的替换值。返回值
\0
)。\0
),但不会写入缓冲区。注意事项
sprintf
,但能够处理可变参数列表。Main.cc
#include "ThreadPool.hpp"
#include "Log.hpp"
#include <iostream>
#include "Task.hpp"
#include <memory> //智能指针的
int main()
{
EnableFile(); // 向文件里输入
std::unique_ptr<ThreadPool<Task>> tp = std::make_unique<ThreadPool<Task>>(5); // C++14新特性
tp->Init();
tp->Start();
int tasknum = 10;
while (tasknum)
{
int a = rand() % 10 + 1;
usleep(1234);
int b = rand() % 5 + 1;
Task t(a, b);
LOG(INFO, "main thread push task: %s", t.DebugToString().c_str());
tp->Enqueue(t);
sleep(1);
tasknum--;
}
tp->Stop();
tp->Wait();
return 0;
}
C++14 标准中引入了 std::make_unique
函数,用于动态分配一个类型的对象,并返回一个 std::unique_ptr
智能指针来管理这个对象的生命周期。以下是对 std::make_unique
的详细讲解:
std::make_unique
通过返回一个 std::unique_ptr
来管理动态分配的对象,保证对象的所有权独立且唯一。std::make_unique
会在动态分配内存成功后,立即初始化对象并返回对其的 std::unique_ptr
,确保异常安全性。std::make_unique
创建的对象绑定到智能指针中,避免出现内存泄漏或忘记释放内存等问题。#include <memory>
std::unique_ptr<Type> ptr = std::make_unique<Type>(constructor parameters);
std::unique_ptr<Type>
:std::unique_ptr
是 C++ 中智能指针的一种,用于管理动态分配的对象。<Type>
表示该 std::unique_ptr
指向的对象类型是 Type
。这个智能指针将独占地拥有所指向的对象,保证资源在适当时候被释放。std::make_unique<Type>(constructor parameters)
:std::make_unique
是一个 C++14 新引入的函数模板,用于动态分配内存并初始化对象。<Type>
表示需要创建的对象类型是 Type
。constructor parameters
是传递给 Type
类型对象构造函数的参数。std::make_unique
会在内存分配成功后立即初始化对象,并返回一个指向该对象的 std::unique_ptr
,确保异常安全性和避免内存泄漏。ThreadPool.hpp
// 我们这个线程库是一开始就有固定数量的线程,当来任务时就交给线程来执行
#pragma once
#include <iostream>
#include <vector>
#include <queue>
#include <pthread.h>
#include "Thread.hpp"
#include "Log.hpp"
#include "Task.hpp"
using namespace ThreadModule;
int defaultthreadnum = 5;
template <typename T>
class ThreadPool
{
private:
void LockQueue()
{
pthread_mutex_lock(&_mutex);
}
void UnlockQueue()
{
pthread_mutex_unlock(&_mutex);
}
void ThreadSleep()
{
pthread_cond_wait(&_cond, &_mutex);
}
void ThreadWakeup()
{
pthread_cond_signal(&_cond);
}
void ThreadWakeupAll()
{
pthread_cond_broadcast(&_cond);
}
public:
void HandlerTask(std::string name) // 类的成员方法设计为另一个类的回调方法,这里处理任务
{
LOG(INFO, "%s is running...", name.c_str());
while (true)
{
LockQueue();
while (_task_queue.empty() && _isrunning)
{
_waitnum++; // 每次进来就说明要有线程等了
ThreadSleep();
_waitnum--;
}
// 到这里就说明有任务了
// 如果线程池已经退出了 && 任务队列是空的
if (_task_queue.empty() && !_isrunning)
{
UnlockQueue();
break;
}
// 如果线程池不退出 && 任务队列不是空的
// 如果线程池已经退出 && 任务队列不是空的 --- 处理完所有的任务,然后在退出
T t = _task_queue.front();
_task_queue.pop(); // 已经吧任务拿出来了,在线程里
LOG(DEBUG, "%s get a task", name.c_str());
UnlockQueue();
t(); // 进行处理任务,在锁外就行。我们在Task类里,已经重载了()了
LOG(DEBUG, "%s handler a task, result is: %s", name.c_str(), t.ResultToString().c_str());
}
}
ThreadPool(int threadnum = defaultthreadnum) : _threadnum(threadnum), _waitnum(0), _isrunning(false)
{
pthread_mutex_init(&_mutex, nullptr);
pthread_cond_init(&_cond, nullptr);
LOG(INFO, "ThreadPool Construct()"); // 可变参数列表为空
}
void Init()
{
for (int i = 0; i < _threadnum; i++)
{
std::string name = "thread-00" + std::to_string(i + 1);
//_threads.emplace_back(test, name); // 问题,参数多个this指针,与fun_t 不符合,可以加static
_threads.emplace_back(std::bind(&ThreadPool::HandlerTask, this, std::placeholders::_1), name); // 使用这样解决,还能穿一个对象过去
LOG(INFO, "ThreadPool Init %s ", name.c_str());
}
_isrunning = true;
}
void Start()
{
for (auto &e : _threads)
{
e.Start();
} // 不能在start这后面进行isruning的更改,因为,在未更改前,新线程可能已经运行完Task函数,直接退出了
// 在让线程跑之前,初始化就要做好
}
void Wait()
{
for (auto &e : _threads)
{
e.Join();
LOG(INFO, "%s is quit...", e.name().c_str());
}
}
bool Enqueue(const T &t)
{
bool ret = false;
LockQueue();
if (_isrunning)
{
_task_queue.push(t);
if (_waitnum > 0)
{
ThreadWakeup();
}
LOG(DEBUG, "enqueue task success");
ret = true;
}
UnlockQueue();
return ret;
}
void Stop()
{
LockQueue();
_isrunning = false;
ThreadWakeupAll();
UnlockQueue();
}
~ThreadPool()
{
pthread_mutex_destroy(&_mutex);
pthread_cond_destroy(&_cond);
}
private:
int _threadnum;
std::vector<Thread> _threads; // 一个存的都是线程的vector
std::queue<T> _task_queue; // 任务队列
pthread_mutex_t _mutex; // 锁
pthread_cond_t _cond; // 条件变量
int _waitnum; // 等待的线程数量
bool _isrunning; // 该
};
LockQueue()
和 UnlockQueue()
:用于对任务队列进行加锁和解锁操作。ThreadSleep()
、ThreadWakeup()
和 ThreadWakeupAll()
:用于线程等待和唤醒的操作。HandlerTask(std::string name)
:任务处理函数,线程从任务队列中取出任务并执行。ThreadPool(int threadnum = defaultthreadnum)
:构造函数,初始化线程池。Init()
:初始化线程池,创建固定数量的线程并设置为可运行状态。Start()
:启动线程池中的所有线程。Wait()
:等待所有线程执行完毕。Enqueue(const T &t)
:向任务队列中添加任务。Stop()
:停止线程池中的所有线程。~ThreadPool()
:析构函数,销毁线程池对象,释放资源。ThreadPool
对象后,通过 Init()
初始化线程池,然后调用 Start()
启动线程池中的线程。Enqueue()
往线程池中添加任务,任务将会被线程取出执行。Stop()
停止线程池中的所有线程,最后在析构函数中释放资源。 _threads.emplace_back(std::bind(&ThreadPool::HandlerTask, this, std::placeholders::_1), name);
对于Thread的构造函数要一个fun_t的函数包装器,一个string,如果直接传HandlerTask,有一个this指针,我们使用bind
把一个this对象绑定为第一个参数就行了(这是让另一个类运行本类成员函数的一个方法)
Thread.hpp
#ifndef __THREAD_HPP__
#define __THREAD_HPP__
#include <iostream>
#include <string>
#include <unistd.h>
#include <functional>
#include <pthread.h>
namespace ThreadModule
{
using func_t = std::function<void(std::string)>;
class Thread
{
public:
void Excute()
{
_func(_threadname);
}
public:
Thread(func_t func, const std::string &name = "none-name")
: _func(func), _threadname(name), _stop(true)
{
}
static void *threadroutine(void *args) // 类成员函数,形参是有this指针的!!
{
Thread *self = static_cast<Thread *>(args);
self->Excute();
return nullptr;
}
bool Start()
{
int n = pthread_create(&_tid, nullptr, threadroutine, this);
if (!n)
{
_stop = false;
return true;
}
else
{
return false;
}
}
void Detach()
{
if (!_stop)
{
pthread_detach(_tid);
}
}
void Join()
{
if (!_stop)
{
pthread_join(_tid, nullptr);
}
}
std::string name()
{
return _threadname;
}
void Stop()
{
_stop = true;
}
~Thread() {}
private:
pthread_t _tid;
std::string _threadname;
func_t _func;
bool _stop;
};
} // namespace ThreadModule
#endif
Task.hpp
#pragma once
#include <iostream>
#include <string>
#include <functional>
class Task
{
public:
Task() {}
Task(int a, int b) : _a(a), _b(b), _result(0)
{
}
void Excute()
{
_result = _a + _b;
}
std::string ResultToString()
{
return std::to_string(_a) + "+" + std::to_string(_b) + "=" + std::to_string(_result);
}
std::string DebugToString()
{
return std::to_string(_a) + "+" + std::to_string(_b) + "=?";
}
void operator()()
{
Excute();
}
private:
int _a;
int _b;
int _result;
};
// 我们这个线程库是一开始就有固定数量的线程,当来任务时就交给线程来执行
#pragma once
#include <iostream>
#include <vector>
#include <queue>
#include <pthread.h>
#include "Thread.hpp"
#include "Log.hpp"
#include "Task.hpp"
using namespace ThreadModule;
int defaultthreadnum = 5;
template <typename T>
class ThreadPool
{
private:
void LockQueue()
{
pthread_mutex_lock(&_mutex);
}
void UnlockQueue()
{
pthread_mutex_unlock(&_mutex);
}
void ThreadSleep()
{
pthread_cond_wait(&_cond, &_mutex);
}
void ThreadWakeup()
{
pthread_cond_signal(&_cond);
}
void ThreadWakeupAll()
{
pthread_cond_broadcast(&_cond);
}
void HandlerTask(std::string name) // 类的成员方法设计为另一个类的回调方法,这里处理任务
{
LOG(INFO, "%s is running...", name.c_str());
while (true)
{
LockQueue();
while (_task_queue.empty() && _isrunning)
{
_waitnum++; // 每次进来就说明要有线程等了
ThreadSleep();
_waitnum--;
}
// 到这里就说明有任务了
// 如果线程池已经退出了 && 任务队列是空的
if (_task_queue.empty() && !_isrunning)
{
UnlockQueue();
break;
}
// 如果线程池不退出 && 任务队列不是空的
// 如果线程池已经退出 && 任务队列不是空的 --- 处理完所有的任务,然后在退出
T t = _task_queue.front();
_task_queue.pop(); // 已经吧任务拿出来了,在线程里
LOG(DEBUG, "%s get a task", name.c_str());
UnlockQueue();
t(); // 进行处理任务,在锁外就行。我们在Task类里,已经重载了()了
LOG(DEBUG, "%s handler a task, result is: %s", name.c_str(), t.ResultToString().c_str());
}
}
// 构造函数是要私有的,让唯一的那个static成员变量来用
ThreadPool(int threadnum = defaultthreadnum) : _threadnum(threadnum), _waitnum(0), _isrunning(false)
{
pthread_mutex_init(&_mutex, nullptr);
pthread_cond_init(&_cond, nullptr);
LOG(INFO, "ThreadPool Construct()"); // 可变参数列表为空
}
void Init()
{
for (int i = 0; i < _threadnum; i++)
{
std::string name = "thread-00" + std::to_string(i + 1);
//_threads.emplace_back(test, name); // 问题,参数多个this指针,与fun_t 不符合,可以加static
_threads.emplace_back(std::bind(&ThreadPool::HandlerTask, this, std::placeholders::_1), name); // 使用这样解决,还能穿一个对象过去
LOG(INFO, "ThreadPool Init %s ", name.c_str());
}
_isrunning = true;
}
void Start()
{
for (auto &e : _threads)
{
e.Start();
} // 不能在start这后面进行isruning的更改,因为,在未更改前,新线程可能已经运行完Task函数,直接退出了
// 在让线程跑之前,初始化就要做好
}
// 把赋值重载与拷贝构造删掉
ThreadPool(const ThreadPool<T> &) = delete;
ThreadPool<T> &operator=(const ThreadPool<T> &) = delete;
public:
static ThreadPool<T> *getInstance()
{
if (_instance == nullptr) // 在最外面判断:多个线程都能进去,一旦有一个竞争到了锁,不为空了。后面的线程之间不用进去了
{
pthread_mutex_lock(&_lock);
if (_instance == nullptr)
{
_instance = new ThreadPool<T>();
_instance->Init();
_instance->Start(); // 线程池的初始化与启动
LOG(DEBUG, "创建线程池单例");
pthread_mutex_unlock(&_lock);
return _instance;
}
}
else
{
LOG(DEBUG, "获取线程池单例");
return _instance;
}
}
void Wait()
{
for (auto &e : _threads)
{
e.Join();
LOG(INFO, "%s is quit...", e.name().c_str());
}
}
bool Enqueue(const T &t)
{
bool ret = false;
LockQueue();
if (_isrunning)
{
_task_queue.push(t);
if (_waitnum > 0)
{
ThreadWakeup();
}
LOG(DEBUG, "enqueue task success");
ret = true;
}
UnlockQueue();
return ret;
}
void Stop()
{
LockQueue();
_isrunning = false;
ThreadWakeupAll();
UnlockQueue();
}
~ThreadPool()
{
pthread_mutex_destroy(&_mutex);
pthread_cond_destroy(&_cond);
}
private:
int _threadnum;
std::vector<Thread> _threads; // 一个存的都是线程的vector
std::queue<T> _task_queue; // 任务队列
pthread_mutex_t _mutex; // 锁
pthread_cond_t _cond; // 条件变量
int _waitnum; // 等待的线程数量
bool _isrunning; // 该
static ThreadPool<T> *_instance;
static pthread_mutex_t _lock;
};
template <typename T>
ThreadPool<T> *ThreadPool<T>::_instance = nullptr;
template <typename T>
pthread_mutex_t ThreadPool<T>::_lock = PTHREAD_MUTEX_INITIALIZER; // 类内定义,类外初始化
在C++中,声明操作符重载函数时,可以省略参数的名字,只提供参数的类型。这是因为在操作符重载函数的声明中,参数的名字并不重要,重要的是参数的类型和数量以及函数的返回类型。为了简洁起见,有时候会省略参数的名字
getInstan()
在getInstan()函数里面我们使用了:双重检查锁定(Double-Checked Locking)机制
目的是在多线程环境下提高性能和减少竞争。这种机制的目标是尽量减少加锁的开销,只有在必要的时候才对共享资源加锁。
首先,通过if (_instance == nullptr)
的外层判断,可以减少不必要的锁竞争。如果_instance
不为空,那么直接返回已经存在的实例,避免了不必要的加锁和解锁操作。
在外层判断通过后,通过加锁的方式进入临界区,再次检查_instance == nullptr
,是为了防止多个线程同时通过外层判断后,在竞争锁的过程中,其中一个线程创建了实例,后面的线程则不应该再创建实例。因此,内层的检查是为了保证并发情况下只有一个线程能创建实例。
STL(标准模板库)中的容器本身并不是线程安全的。STL的设计初衷是为了提供高性能和通用性,因此并没有在设计中添加线程安全的机制。在多线程环境下,如果多个线程并发地访问和修改同一个STL容器,可能会引发数据竞争和未定义的行为。
由于加锁机制会对性能造成影响,STL的设计者选择了不提供内置的线程安全机制。因此,如果需要在多线程环境下使用STL容器,调用者需要自行确保线程安全。这通常需要使用互斥锁或其他同步机制来保护对容器的访问,以避免竞态条件和数据竞争。
不同容器的线程安全性实现方式可能有所不同,例如哈希表可能采用锁分区(锁表)和锁桶(锁链)等方式来实现线程安全。因此,调用者在使用STL容器时需要注意不同容器的线程安全性差异,以及需要使用何种同步机制来确保线程安全。
智能指针在多线程环境下的线程安全性取决于具体类型。
原子操作之所以能够保证线程安全,主要是因为它们提供了操作的原子性和可靠性:
悲观锁和乐观锁是两种并发控制的策略,而自旋锁、公平锁和非公平锁则属于具体实现并发控制的方式。
自旋锁是一种基于忙等待的锁,当一个线程尝试获取自旋锁时,如果锁已经被其他线程占用,该线程会进行自旋操作,即不断检查锁的状态是否被释放,而不是立即被挂起等待。这种方式可以减少线程上下文切换的性能开销,适用于临界区内操作时间短暂的情况。
如何衡量临界区内操作时间:
还是看我们的经验来选择合适,恰当的锁
void spin_lock_init(spinlock_t *lock)
这个函数用于初始化一个自旋锁,通常在使用自旋锁之前调用。lock
为指向自旋锁变量的指针。
void spin_lock(spinlock_t *lock);
当一个线程想要进入临界区时,它会调用这个函数来获取自旋锁。如果自旋锁已经被其他线程占用,当前线程会尝试不断地自旋等待,直到获取到锁。lock
为指向自旋锁变量的指针。
void spin_unlock(spinlock_t *lock);
当线程执行完临界区内的操作后,需要调用这个函数来释放自旋锁,使得其他线程可以获取到锁。lock
为指向自旋锁变量的指针。
void spin_lock_destroy(spinlock_t *lock);
当自旋锁不再需要时,可以调用这个函数来销毁自旋锁以释放相关资源。lock
为指向自旋锁变量的指针。
在多线程编程中,有时候会遇到一种常见的情况,即某些共享数据的修改操作相对较少,而读取操作却非常频繁,且读取操作中可能会伴随着耗时较长的查找操作。在这种情况下,如果对整个数据结构进行加锁,那么即使是读取操作也需要等待锁的释放,这会导致程序效率降低。 为了解决这种情况,可以使用读写锁。读写锁允许多个线程同时获取读锁,只有在获取写锁时才会阻塞其他线程。这样一来,在多读少写的情况下,多个线程可以同时获得读锁,从而提高了程序的并发性能,避免了不必要的阻塞。 总结一下,读写锁适用于多读少写的场景,可以通过允许多个线程同时获取读锁来提高程序的并发性能,避免不必要的阻塞,从而提高了程序的效率。
读者写者模型是用于描述多线程对共享数据进行读写操作时的一种经典并发模型。在读者写者模型中,有两类线程:读者和写者。读者线程只对共享数据进行读操作,而写者线程则对共享数据进行写操作。读者在读操作时不会互斥,多个读者可以同时访问共享数据(不会对数据进行修改),但写者在写操作时需要互斥,同时只允许一个写者访问共享数据且不允许其他任何读者或写者访问。
读者写者模型的目标是实现对共享数据的高效访问,保证数据的一致性和并发性。为了实现这一目标,通常会使用锁和条件变量等同步机制来控制读者和写者线程的访问。
int reader_count = 0;
pthread_mutex_t wlock;
pthread_mutex_t rlock;
// 读者线程
void reader() {
lock(&rlock); // 获取读者锁
if (reader_count == 0) {
lock(&wlock); // 如果当前没有读者,则获取写者锁
}
++reader_count; // 增加读者计数
unlock(&rlock); // 释放读者锁
// 这里进行读取操作
lock(&rlock); // 重新获取读者锁
--reader_count; // 减少读者计数
if (reader_count == 0) {
unlock(&wlock); // 如果已经没有读者,释放写者锁
}
unlock(&rlock); // 释放读者锁
}
// 写者线程
void writer() {
lock(&wlock); // 获取写者锁
// 这里进行写入操作
unlock(&wlock); // 释放写者锁
}
在上述伪代码中,我们模拟了读者写者模型的加锁逻辑,主要包括了对读者和写者线程进行互斥和同步控制。下面我们简要解释一下这段伪代码的逻辑:
reader_count
表示当前正在读取数据的读者数量。pthread_mutex_t wlock
和pthread_mutex_t rlock
分别表示写者锁和读者锁,用于读者写者线程的互斥操作。对于读者线程:
rlock
,确保读者线程之间的互斥。
wlock
,确保写者无法进入。
reader_count
计数器,表明有一个读者正在读取数据。
当没有读者在读时,我们就会释放写者锁
对于写者线程:
wlock
,确保写者线程独占对共享数据的访问。pthread_rwlock_init
:初始化读写锁。
函数原型为 int pthread_rwlock_init(pthread_rwlock_t *rwlock, const pthread_rwlockattr_t *attr)
,该函数用于初始化一个读写锁对象rwlock
,可以指定属性attr
,一般情况下可以传入NULL
使用默认属性。
rwlock
:指向读写锁对象的指针。attr
:读写锁的属性对象指针,可以为 NULL
,表示使用默认属性。pthread_rwlock_destroy
:销毁读写锁。
函数原型为 int pthread_rwlock_destroy(pthread_rwlock_t *rwlock)
,用于销毁已经初始化的读写锁对象rwlock
。销毁读写锁后,该读写锁对象不可再使用,需要重新进行初始化。
rwlock
:指向读写锁对象的指针。pthread_rwlock_rdlock
:获取读锁。
函数原型为 int pthread_rwlock_rdlock(pthread_rwlock_t *rwlock)
,该函数用于获取读锁,即允许多个线程同时获取读取权限,但在写锁被获取时将会阻塞。当读线程数较多时,考虑性能可以使用读锁。
rwlock
:指向读写锁对象的指针。pthread_rwlock_wrlock
:获取写锁。
函数原型为 int pthread_rwlock_wrlock(pthread_rwlock_t *rwlock)
,该函数用于获取写锁,即独占地写入数据。一旦有线程获取了写锁,其他线程无法获取读锁或写锁,只能等待写锁的释放。
rwlock
:指向读写锁对象的指针。pthread_rwlock_unlock
:释放锁。
函数原型为 int pthread_rwlock_unlock(pthread_rwlock_t *rwlock)
,用于释放读锁或写锁,让其他线程可以获取读写锁。
rwlock
:指向读写锁对象的指针。我们对于读者里面的加锁就直接使用pthread_rwlock_rdlock,相当于上面的全部过程了 同理:对于写者里面的加锁就直接使用pthread_rwlock_wrlock