前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >【Linux线程】从零到一:掌握Linux线程池的设计与实现

【Linux线程】从零到一:掌握Linux线程池的设计与实现

作者头像
Eternity._
发布2024-10-26 09:43:10
1250
发布2024-10-26 09:43:10
举报
文章被收录于专栏:登神长阶

🔍前言:在当今这个信息化高速发展的时代,多线程并发编程已经成为开发高性能应用不可或缺的技能。而在Linux这一广泛应用的操作系统中,线程池作为一种高效管理线程资源的机制,更是成为了众多开发者关注的焦点

线程池通过预先创建并维护一定数量的线程,使得线程可以被重复利用,从而避免了频繁创建和销毁线程所带来的性能损耗。在Linux环境下,线程池的应用更是广泛,无论是服务器端的并发处理,还是客户端的响应速度提升,都离不开线程池的助力

在本文中,我们将从线程池的基本概念入手,逐步深入到线程池的实现细节。我们会结合Linux操作系统的特点,为大家讲解如何在Linux环境下构建和管理线程池。同时,我们还将分享一些在实际项目中应用线程池的经验和教训,帮助大家更好地理解和运用线程池技术

希望本文能够成为大家学习Linux线程池路上的得力助手,助力大家在多线程并发编程的道路上越走越远。

📒1. 日志

Linux日志是Linux系统中非常重要的资源,它们记录了系统运行的各种信息,包括系统错误、警告、安全事件等

日志记录了系统运行的各种信息,对于排查问题和维护系统安全非常重要。通过备份和归档日志文件,可以在需要的时候快速地查找和分析问题。同时,定期清理日志文件可以释放磁盘空间,保留必要的日志信息用于后续的分析和排查问题

我们可以模拟实现一下实现日志的基本功能

代码示例:(日志 Log.hpp)

代码语言:javascript
复制
#pragma once
#include <iostream>
#include <unistd.h>
#include <pthread.h>
#include <string>
#include <ctime>
#include <stdarg.h>
#include <sys/types.h>
#include <fcntl.h>
#include <sys/stat.h>


using namespace std;

// 日志的信息
enum
{
    Debug = 0,
    Info,
    Warning,
    Error,
    Fatal
};

// 日志文件输出方式
enum
{
    Screen = 10,
    OneFile,
    ClassFile
};

string LevelToString(int level)
{
    switch(level)
    {
        case Debug:
            return "Debug";
        case Info:
            return "Info";
        case Warning:
            return "Warning";
        case Error:
            return "Error";
        case Fatal:
            return "Fatal";
        default:
            return "Unkonw";
    }
}

const int defaultstyle = Screen;
const string default_filename = "log.";
const string logdir = "log";

class Log
{
public:
    Log()
        :style(defaultstyle)
        ,filename(default_filename)
    {
    	// 建立专门存放日志文件的目录
        mkdir(logdir.c_str(), 0775);
    }  

    void Enable(int sty)
    {
        style = sty;
    }
    
    // 日志的时间格式
    string TimeStampExLocalTime()
    {
        time_t currtime = time(nullptr);
        struct tm *curr = localtime(&currtime);
        char time_buffer[1024];
        snprintf(time_buffer, sizeof(time_buffer), "%d-%d-%d_%d:%d:%d",\
          curr->tm_year + 1900, curr->tm_mon + 1, curr->tm_mday, curr->tm_hour,\
          curr->tm_min, curr->tm_sec);
        
        return time_buffer;
    }
	
	// 写入一个日志文件
    void WriteLogToOneFile(string &logname, string &message)
    {
        umask(0);
        int fd = open(logname.c_str(), O_WRONLY|O_CREAT|O_APPEND, 0664);
        if(fd < 0) return;
        write(fd, message.c_str(), sizeof(message));
        close(fd);
    }
	
	// 写入一类日志文件
    void WriteLogToClassFile(const string &levelstr, string &message)
    {
    	// 分级建立目录
        string logname = logdir;
        logname += "/";
        logname += filename;
        logname += levelstr;
		
		// 转化成写入一个日志文件
        WriteLogToOneFile(logname, message);
    }
	
	// 将信息写入文件
    void WriteLog(string &levelstr, string &message)
    {
    	// 根据style选择写入方式
        switch(style)
        {
            case Screen:
                cout << message << endl;
                break;
            case OneFile:
                WriteLogToClassFile("all", message);
                break;
            case ClassFile:
                WriteLogToClassFile(levelstr, message);
                break;
            default:
                cout << "Unknow" << endl;
                break;
        }
    }
	
	// 打印日志信息
    void LogMessage(int level, const char *format, ...) // 日志接口
    {
        char right_buffer[1024];
        va_list args;
        va_start(args, format);
        // args 指向了可变参数部分
        vsnprintf(right_buffer, sizeof(right_buffer), format, args);
        va_end(args);

        char left_buffer[1024];
        string currtime = TimeStampExLocalTime(); // 时间
        string levelstr = LevelToString(level); // 等级
        string idstr = to_string(getpid()); // id
        snprintf(left_buffer, sizeof(left_buffer), "[%s][%s][%s]", \
        levelstr.c_str(), currtime.c_str(), idstr.c_str());

        // printf("%s: %s\n", left_buffer, right_buffer);
        string loginfo = left_buffer;
        loginfo += right_buffer;
        WriteLog(levelstr, loginfo);
    }

    ~Log()
    {}
private:
    int style; // 日志的输出风格
    string filename; // 文件名称
};

实现功能:

代码语言:javascript
复制
#include "Log.hpp"                                                                                                                                                                    

using namespace std;

int main()
{
    Log lg;
    // lg.Enable(ClassFile);
    lg.Enable(Screen);
    lg.LogMessage(Debug, "%s:%d-%.2f", "爱你", 1314, 5.20);
    lg.LogMessage(Info, "%s:%d-%.2f", "爱你", 1314, 5.20);
    lg.LogMessage(Warning, "%s:%d-%.2f", "爱你", 1314, 5.20);
    lg.LogMessage(Error, "%s:%d-%.2f", "爱你", 1314, 5.20);
    lg.LogMessage(Fatal, "%s:%d-%.2f", "爱你", 1314, 5.20);
    lg.LogMessage(Info, "%s:%d-%.2f", "爱你", 1314, 5.20);

	return 0;
}

style == ClassFile

在这里插入图片描述
在这里插入图片描述

style == Screen

在这里插入图片描述
在这里插入图片描述

模拟实现日志代码


📜2. 线程池

一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量

这些线程在执行完一个任务后,并不会被销毁,而是会继续等待并执行新的任务。线程池通过重用线程,减少了线程创建和销毁的开销,从而提高了程序的执行效率


线程池的应用场景:

  • 需要大量的线程来完成任务,且完成任务的时间比较短。 WEB服务器完成网页请求这样的任务,使用线程池技术是非常合适的。因为单个任务小,而任务数量巨大,你可以想象一个热门网站的点击次数。 但对于长时间的任务,比如一个Telnet连接请求,线程池的优点就不明显了。因为Telnet会话时间比线程的创建时间大多了
  • 对性能要求苛刻的应用,比如要求服务器迅速响应客户请求
  • 接受突发性的大量请求,但不至于使服务器因此产生大量线程的应用。突发性大量客户请求,在没有线程池情况下,将产生大量线程,虽然理论上大部分操作系统线程数目最大值不是问题,短时间内产生大量线程可能使内存到达极限,出现错误

在我们模拟实现线程池的时候,我们会用到我们前面自己模拟实现的一些文件,这些代码我就不再一次展示出来,在最后我会给出代码的了解,大家可以结合者理解,下面我们演示的代码采用的是单例模式设计的线程池

代码示例:(线程池 ThreadPool.hpp)

代码语言:javascript
复制
#pragma once
#include <iostream>
#include <unistd.h>
#include <pthread.h>
#include <string>
#include <functional>
#include <ctime>
#include <queue>
#include <stdarg.h>
#include <sys/types.h>
#include "Task.hpp"

using namespace std;

class ThreadData
{
public:
    ThreadData(const string name)
        :threadname(name)   
    {}
    ~ThreadData()
    {}
public:
    string threadname;
};

const int default_value = 5; // 多线程的最大数量

template <class T>
class ThreadPool
{
// 单例模式
private:
    ThreadPool(int thread_num = default_value)
        :_thread_num(thread_num)
    {
        pthread_mutex_init(&_mutex, nullptr);
       pthread_cond_init(&_cond, nullptr);

        for(int i = 0; i < _thread_num; i++)
        {
            string threadname = "thread-";
            threadname += to_string(i+1);

            ThreadData td(threadname);
            // Thread<ThreadData> t(threadname, bind(&ThreadPool<T>::ThreadRun, this, placeholders::_1), td);
            
            _threads.emplace_back(threadname, bind(&ThreadPool<T>::ThreadRun, this, placeholders::_1), td);
        }
    }

    ThreadPool(const ThreadPool<T> &tp) = delete;
    const ThreadPool<T> &operator=(const ThreadPool<T>) = delete;
public:
    static ThreadPool<T> *GetInstance()
    {
        if(instance == nullptr)
        {
            LockGuard lockguard(&sig_lock);
            if(instance == nullptr)
            {
                lg.LogMessage(Info, "创建单例成功 ...\n");
                instance = new ThreadPool<T>();
            }
        }
        return instance;
    }
    ThreadPool(int thread_num = default_value)
        :_thread_num(thread_num)
    {
        pthread_mutex_init(&_mutex, nullptr);
        pthread_cond_init(&_cond, nullptr);

        for(int i = 0; i < _thread_num; i++)
        {
            string threadname = "thread-";
            threadname += to_string(i+1);

            ThreadData td(threadname);
            // Thread<ThreadData> t(threadname, bind(&ThreadPool<T>::ThreadRun, this, placeholders::_1), td);
            
            _threads.emplace_back(threadname, bind(&ThreadPool<T>::ThreadRun, this, placeholders::_1), td);
        }
    }

    bool Start()
    {
        for(auto &thread : _threads)
        {
            thread.Start();
            lg.LogMessage(Info, "%s is running ...\n", thread.ThreadName().c_str());
        }
        return true;
    }

    void Threadwait(const ThreadData &td)
    {
        lg.LogMessage(Debug, "no task, %s is sleeping ...\n", td.threadname.c_str());
        pthread_cond_wait(&_cond, &_mutex);
    }

    void ThreadWakeup()
    {
        pthread_cond_signal(&_cond);
    }

    void ThreadRun(ThreadData &td)
    {
        while(true)
        {
            // 取任务
            T t;
            {
                pthread_mutex_lock(&_mutex);
                while(_q.empty())
                {
                    Threadwait(td);
                    lg.LogMessage(Debug, "have task, %s is wakeup ...\n", td.threadname.c_str());
                }
                t = _q.front();
                _q.pop();

                pthread_mutex_unlock(&_mutex);
                sleep(1);
            }
            // 处理任务
            t.Run();
            lg.LogMessage(Debug, "%s handler task %s done, result is : %s\n",td.threadname.c_str(), t.PrintTask().c_str(), t.PrintResult().c_str());
        }
    }

    void Push(T &in)
    {
        lg.LogMessage(Debug, "other thread push a task is : %s\n", in.PrintTask().c_str());
        LockGuard lockguard(&_mutex);
        _q.push(in);
        ThreadWakeup();
    }

    ~ThreadPool()
    {
        pthread_mutex_destroy(&_mutex);
        pthread_cond_destroy(&_cond);
    }

    // for test
    void Wait()
    {
        for(auto &thread : _threads)
        {
            thread.Join();
        }
    }
private:
    queue<T> _q;
    vector<Thread<ThreadData>> _threads;
    int _thread_num;
    pthread_mutex_t _mutex;
    pthread_cond_t _cond;
	
	// 单例模式
    static ThreadPool<T> *instance;
    static pthread_mutex_t sig_lock;
};

// 单例模式
template<class T>
ThreadPool<T> *ThreadPool<T>::instance = nullptr;
template<class T>
pthread_mutex_t ThreadPool<T>::sig_lock = PTHREAD_MUTEX_INITIALIZER;

实现功能:

代码语言:javascript
复制
#include "LockGuard.hpp"
#include "Log.hpp"
#include "Thread.hpp"
#include "Task.hpp"
#include "ThreadPool.hpp"
#include <memory>

using namespace std;

int main()
{
    sleep(10);
    ThreadPool<Task>::GetInstance()->Start();
    srand((uint64_t)time(nullptr));

    while(true)
    {
        int x = rand() % 100 + 1;
        usleep(rand() % 123);
        int y = rand() % 200;
        // int y = 0;
        usleep(rand() % 123);
        char oper = opers[rand() % opers.size()];

        Task t(x, y, oper);
        t.Enable(ok);
        
        ThreadPool<Task>::GetInstance()->Push(t);
        sleep(1);
    }

    ThreadPool<Task>::GetInstance()->Wait();
    return 0;
}
在这里插入图片描述
在这里插入图片描述

模拟实现线程池代码


📝3. 其他常见的各种锁

  • 乐观锁:认为冲突的概率很低,先修改完共享资源再验证是否发生冲突。如果发生冲突,则放弃本次操作并重试。乐观锁全程没有加锁操作,因此也称为无锁编程
  • 悲观锁:认为多线程同时修改共享资源的概率较高,容易出现冲突。因此,在访问共享资源前会先上锁。前面提到的互斥锁、自旋锁、读写锁都属于悲观锁

  • 自旋锁(Spinlock) 用途:保护共享资源,确保在任何时刻只有一个线程或进程可以访问 特点:当线程试图获取锁时,如果锁已被占用,则线程会一直尝试获取锁,处于忙等待状态,直到成功为止。自旋锁适用于临界区较小且共享资源独占时间较短的场景,可以避免上下文切换的开销。但自旋锁不能用于需要睡眠的代码临界区,因为会占用CPU资源

  • 读写锁(Rwlock) 用途:解决读操作较多的场景,减少锁等待,提高并发性 特点:分为读锁和写锁。读锁允许多个线程同时读取共享资源,而写锁则确保只有一个线程可以写入资源。写锁会阻塞其他读写锁。读写锁适用于读操作远多于写操作的场景

  • CAS操作:内存地址、期望值和新值。它将内存地址中的值与期望值进行比较,如果一致,则将内存中的值更新为新值,并返回成功标识;如果不一致,则不更新内存中的值,并返回失败标识。这个操作是原子的,意味着它不会被其他线程打断,从而保证了数据的一致性

📚4. 读者写者问题

读者写者问题(Readers-Writers Problem)是计算机科学中一个经典的并发控制问题。它描述了一个数据对象(如文件或记录)被多个并发进程所共享,其中一些进程只要求读取该数据对象的内容(称为“读者”),而另一些进程则要求对该数据对象进行写操作(称为“写者”)


读写锁:

在编写多线程的时候,有一种情况是十分常见的。那就是,有些公共数据修改的机会比较少。相比较改写,它们读的机会反而高的多。通常而言,在读的过程中,往往伴随着查找的操作,中间耗时很长。给这种代码段加锁,会极大地降低我们程序的效率

在这里插入图片描述
在这里插入图片描述

注意:写独占,读共享,读锁优先级高


核心规则:

  • 写-写互斥:不能有两个写者同时进行写操作
  • 读-写互斥:不能同时有一个线程在读,而另一个线程在写
  • 读-读允许:可以有一个或多个读者在读

读者写者问题和生产者消费者模型大差不差,同样满足我们之前说的321模型


读写锁接口:

初始化

代码语言:javascript
复制
int pthread_rwlock_init(pthread_rwlock_t *restrict rwlock,
const pthread_rwlockattr_t *restrict attr);

销毁

代码语言:javascript
复制
int pthread_rwlock_destroy(pthread_rwlock_t *rwlock); 

加锁和解锁

代码语言:javascript
复制
int pthread_rwlock_rdlock(pthread_rwlock_t *rwlock); // 读者加锁
int pthread_rwlock_wrlock(pthread_rwlock_t *rwlock); // 写者加锁

int pthread_rwlock_unlock(pthread_rwlock_t *rwlock); // 解锁

📖5. 总结

随着本文的逐渐收尾,我们对Linux线程池的学习之旅也即将告一段落。但请记住,这仅仅是一个开始,而非终点。线程池作为并发编程中的核心组件,其深度和广度远超我们在这篇简短文章中所能涵盖的范围

尽管我们已经介绍了线程池的关键概念和实现方法,但仍有许多细节和高级特性等待你去探索和发现。例如,如何根据具体的应用场景调整线程池的大小?如何有效地监控和调试线程池的性能?如何在线程池中加入更多的容错和恢复机制?

我们鼓励你继续深入研究线程池技术,并结合自己的实际项目经验,不断实践和优化。记住,真正的掌握不仅仅在于理解理论知识,更在于能够将其应用于解决实际问题中

愿你在未来的学习和工作中,能够充分利用线程池技术,打造出更加高效、稳定的并发应用!

希望本文能够为你提供有益的参考和启示,让我们一起在编程的道路上不断前行! 谢谢大家支持本篇到这里就结束了,祝大家天天开心!

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2024-10-25,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 📒1. 日志
  • 📜2. 线程池
  • 📝3. 其他常见的各种锁
  • 📚4. 读者写者问题
  • 📖5. 总结
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档