前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >【Linux线程】Linux多线程实践:深入生产者消费者模型

【Linux线程】Linux多线程实践:深入生产者消费者模型

作者头像
Eternity._
发布2024-10-23 08:30:27
1160
发布2024-10-23 08:30:27
举报
文章被收录于专栏:登神长阶

🔍前言:在当今的软件开发领域,多线程编程已经成为了一种不可或缺的技术。特别是在Linux操作系统下,多线程编程的应用更是广泛而深入。而在多线程编程中,生产者消费者模型无疑是一个经典且重要的并发编程模式

生产者消费者模型描述了一个或多个生产者线程生成数据,并将其放入缓冲区,同时一个或多个消费者线程从缓冲区中取出数据进行处理的过程。这种模式不仅有效地实现了数据的生成与处理之间的解耦,还通过引入缓冲区来平衡生产者和消费者之间的速度差异,从而提高了系统的整体效率和稳定性

然而,在Linux多线程环境下实现生产者消费者模型并非易事。它涉及到线程的创建与管理、同步机制的选择与实现、以及缓冲区的设计与优化等多个方面。任何一个环节的疏忽都可能导致数据竞争、死锁、饥饿等并发问题的出现

本文旨在为读者提供一个全面而深入的Linux多线程中生产者消费者模型的学习指南。我们将从模型的基本概念出发,逐步深入到Linux多线程编程的实战技巧。通过详细的代码示例和深入的解析,我们将帮助读者掌握如何在Linux多线程环境下实现高效且稳定的生产者消费者模型


📒1. 生产者消费者模型

生产者消费者模型(Producer-Consumer Model)是一种经典的并发编程模式,它描述了两个或多个线程之间的协作关系:生产者线程负责生成数据并将其放入缓冲区,而消费者线程则从缓冲区中取出数据进行处理。这种模式广泛应用于各种并发场景,如文件读写、网络通信、数据处理等

作用:

生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。这个阻塞队列就是用来给生产者和消费者解耦的

生产者和消费者之间的关系:

生产者消费者模型遵守 “321原则”:

生产者消费者模型优点:

  • 解耦
  • 支持并发
  • 支持忙闲不均

📜2. 基于BlockingQueue的生产者消费者模型

在多线程编程中阻塞队列(Blocking Queue)是一种常用于实现生产者和消费者模型的数据结构。其与普通的队列区别在于,当队列为空时,从队列获取元素的操作将会被阻塞,直到队列中被放入了元素;当队列满时,往队列里存放元素的操作也会被阻塞,直到有元素被从队列中取出(以上的操作都是基于不同的线程来说的,线程在对阻塞队列进程操作时会被阻塞)

因为涉及到阻塞操作,我们可以设计一个类,来专门负责加,解锁操作,来简化我们的代码

代码示例:(LockGuard.hpp)

代码语言:javascript
复制
#pragma once
#include <iostream>
#include <pthread.h>
#include <unistd.h>
#include <string>
#include <vector>
#include <functional>

using namespace std;

class Mutex // //封装pthread库的锁
{
public:
    Mutex(pthread_mutex_t *lock)
        :_lock(lock)
    {}

    void lock()
    {
        pthread_mutex_lock(_lock);
    }

    void unlock()
    {
        pthread_mutex_unlock(_lock);
    }

    ~Mutex()
    {}
private:
    pthread_mutex_t *_lock;
};

class LockGuard // 封装了锁后,出了作用域会自动调用析构函数,用来自动解锁
{
public:
    LockGuard(pthread_mutex_t *lock)
        :_mutex(lock)
    {
        _mutex.lock();
    }

    ~LockGuard()
    {
        _mutex.unlock();
    }
private:
    Mutex _mutex;
};

代码示例:(BlockQueue.hpp)

代码语言:javascript
复制
#include <iostream>
#include <pthread.h>
#include <queue>
#include <ctime>
#include <unistd.h>
#include <sys/types.h>
#include "LockGuard.hpp"


using namespace std;

const int defaultcap = 5;  // for test 最大容量

template <class T>
class BlockQueue
{
public:
    BlockQueue(int cap = defaultcap)
        :_capacity(cap)
    {
        pthread_mutex_init(&_mutex, nullptr);
        pthread_cond_init(&_p_cond, nullptr);
        pthread_cond_init(&_c_cond, nullptr);
    }

    bool IsFull()
    {
        return _q.size() == _capacity;
    }

    bool IsEmpty()
    {
        return _q.size() == 0;
    }

    bool Push(const T &in) // 生产者
    {
        LockGuard lockguard(&_mutex);
        // pthread_mutex_lock(&_mutex);
        // if(IsFull())
        while(IsFull())
        {
            // 阻塞等待
            pthread_cond_wait(&_p_cond, &_mutex);
        }
        _q.push(in);
        // if(_q.size() > _productor_water_line) pthread_cond_signal(&_c_cond);
        pthread_cond_signal(&_c_cond);
        // pthread_mutex_unlock(&_mutex);

        return true;
    }

    bool Pop(T *out) // 消费者
    {
        LockGuard lockguard(&_mutex);
        // pthread_mutex_lock(&_mutex);
        // if(IsEmpty())
        while(IsEmpty())
        {
            // 阻塞等待
            pthread_cond_wait(&_c_cond, &_mutex);
        }
        *out = _q.front();
        _q.pop();
        pthread_cond_signal(&_p_cond);
        // pthread_mutex_unlock(&_mutex);

        return true;
    }

    ~BlockQueue()
    {
        pthread_mutex_destroy(&_mutex);
        pthread_cond_destroy(&_p_cond);
        pthread_cond_destroy(&_c_cond);
    }
private:
    queue<T> _q;
    int _capacity; // 列表的容量,满了就不能生存,空了就不能消费
    pthread_mutex_t _mutex; // 生产者消费者共用的一把锁
    pthread_cond_t _p_cond; // 生产者
    pthread_cond_t _c_cond; // 消费者

    int _productor_water_line; // _productor_water_line == _capacity / 3 * 2;
    int _consumer_water_line; // _consumer_water_line == _capacity / 3;
};

关于BlockQueue.hpp的代码,还是比较好理解的,我们唯一要注意的是我们生产者和消费者在进行阻塞等待时,在检测临界资源时,我们要尽可能使用while,而不是if,当我们唤醒资源时,可能会被唤醒多个等待的资源,而对应条件的判断,但是当锁被申请时,其他资源又将继续申请锁,此时的条件很都可能是不满足的,所以这可能会导致代码出错,我们也把这种情况称为:伪唤醒

当我们实现了BlockQueue阻塞队列之后,我们也可是实现一个阻塞式的任务队列,这里就不过多展开了,我这里提供一个码云链接,有兴趣的可以了解一下

阻塞式的任务队列

📝3. POSIX信号量

POSIX信号量和SystemV信号量作用相同,都是用于同步操作,达到无冲突的访问共享资源目的,POSIX信号量是一种在POSIX标准中定义的进程间同步和互斥的方法。它允许进程之间通过信号量来实现临界区的互斥访问,从而避免竞争条件和死锁等问题

  • 信号量本质是一把计数器
  • 申请信号本质就是预定资源

POSIX信号量的相关函数

初始化信号量:

代码语言:javascript
复制
int sem_init(sem_t *sem, int pshared, unsigned int value);

参数:

  • sem:指向要初始化的信号量对象的指针
  • pshared:指示信号量是否可以在进程间共享。如果 pshared 为 0,信号量将仅在当前进程内的线程间共享。如果 pshared 非零,信号量可以在进程间共享(这通常需要特定的权限和配置)
  • value:信号量的初始值。这个值必须大于或等于 0

销毁信号量:

代码语言:javascript
复制
int sem_destroy(sem_t *sem);

等待信号量:

功能:等待信号量,会将信号量的值减1

代码语言:javascript
复制
int sem_wait(sem_t *sem);

sem_wait 是 POSIX 信号量(semaphore)API 中的一个函数,用于对信号量进行“等待”操作,也就是尝试对信号量进行“减1”操作。如果信号量的当前值大于0,sem_wait 会成功地将信号量的值减1,并立即返回。然而,如果信号量的当前值为0,sem_wait 会阻塞调用线程,直到信号量的值变为大于0(通常是通过另一个线程调用 sem_post 来实现的)

发布信号量:

功能:发布信号量,表示资源使用完毕,可以归还资源了。将信号量值加1

代码语言:javascript
复制
int sem_post(sem_t *sem);

sem_post 是 POSIX 信号量(semaphore)API 中的一个函数,用于对信号量进行“发布”或“增加”操作。具体来说,sem_post 会将信号量的值加1,并可能唤醒一个或多个正在等待该信号量的线程(如果它们因为调用 sem_wait 而被阻塞)

📚4. 基于环形队列的生产消费模型

环形队列

环形队列通过循环利用数组中的空间来实现队列的操作。其特点在于队列的头部和尾部相连,形成一个闭环,使得队列元素在固定大小的数组中循环排列。这种数据结构避免了普通队列在队尾指针到达数组末尾后无法再添加元素的问题,从而能够更高效地利用数组空间

环形结构起始状态和结束状态都是一样的,不好判断为空或者为满,所以可以通过加计数器或者标记位来判断满或者空。另外也可以预留一个空的位置,作为满的状态

实现原理与条件

  • 生产者不能将消费者套圈:即生产者不能超越消费者太多,否则会导致数据被覆盖。这可以通过信号量或条件变量来控制生产者的生产速度
  • 消费者不能超过生产者:消费者消费的数据量必须小于等于生产者生产的数据量。这可以通过信号量或条件变量来控制消费者的消费速度
  • 为空时生产者先运行:当队列为空时,消费者无法从队列中获取数据,因此生产者应该先运行并生产数据
  • 为满时消费者先运行:当队列为满时,生产者无法向队列中添加数据,因此消费者应该先运行并消费数据以腾出空间

代码示例:(RingQueue.hpp)

代码语言:javascript
复制
#pragma once
#include <iostream>
#include <pthread.h>
#include <unistd.h>
#include <vector>
#include <semaphore.h>
#include <ctime>
#include <sys/types.h>

using namespace std;

const int defaultsize = 5;

template <class T>
class RingQueue
{
private:
    void P(sem_t &sem)
    {
        sem_wait(&sem);
    }

    void V(sem_t &sem)
    {
        sem_post(&sem);
    }

public:
    RingQueue(int size = defaultsize)
        :_ringqueue(size)
        ,_size(size)
        ,_p_step(0)
        ,_c_step(0)
    {
        sem_init(&_space_sem, 0, size);
        sem_init(&_data_sem, 0, 0);
    }

    void Push(const T &in)
    {
        P(_space_sem);
        // pthread_mutex_lock(&_p_mutex);
        _ringqueue[_p_step] = in;
        _p_step++;
        _p_step %= _size;
        // pthread_mutex_unlock(&_p_mutex);
        V(_data_sem);
    }

    void Pop(T *out)
    {
        P(_data_sem);
        // pthread_mutex_lock(&_c_mutex);
        *out = _ringqueue[_c_step];
        _c_step++;
        _c_step %= _size;
        // pthread_mutex_unlock(&_c_mutex);
        V(_space_sem);
    }

    ~RingQueue()
    {
        sem_destroy(&_space_sem);
        sem_destroy(&_data_sem);
    }
private:
    vector<T> _ringqueue;
    int _size;

    int _p_step;
    int _c_step;
	
	pthread_mutex_t _p_mutex;	
	pthread_mutex_t _c_mutex;	

    sem_t _space_sem; // 空间信号
    sem_t _data_sem; // 数据信号
};

我们同样也可是实现一个环形式的任务队列,我依然在这里提供一个码云链接

环形式的任务队列

关于环形队列实现多线程时,我们要注意是先申请信号量还是先申请锁,考虑到效率的问题,我们如果先申请锁,在前一个锁没有释放之前,后面的线程是无法继续申请信号量的,只有等到前面的锁释放之后,才能申请。而我们先申请信号量的话,我们在等待期间就可以完成对信号量的申请,这可以极大的提高效率

📖5. 总结

通过本文的学习,我们深入了解了Linux多线程中生产者消费者模型的基本原理、实现方法和优化技巧。从模型的基本概念出发,我们逐步掌握了线程同步机制、以及并发问题处理等关键知识点

在生产者消费者模型的实现过程中,我们深刻体会到了Linux多线程编程的复杂性和挑战性。然而,正是这些挑战促使我们不断探索和实践,从而积累了宝贵的经验和技能

在未来的软件开发中,愿你能够灵活运用生产者消费者模型等并发编程技术,构建出更加高效、稳定、可扩展的系统,为推动信息技术的发展贡献自己的力量

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2024-10-22,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 📒1. 生产者消费者模型
  • 📜2. 基于BlockingQueue的生产者消费者模型
  • 📝3. POSIX信号量
    • POSIX信号量的相关函数
    • 📚4. 基于环形队列的生产消费模型
      • 环形队列
        • 实现原理与条件
        • 📖5. 总结
        相关产品与服务
        容器服务
        腾讯云容器服务(Tencent Kubernetes Engine, TKE)基于原生 kubernetes 提供以容器为核心的、高度可扩展的高性能容器管理服务,覆盖 Serverless、边缘计算、分布式云等多种业务部署场景,业内首创单个集群兼容多种计算节点的容器资源管理模式。同时产品作为云原生 Finops 领先布道者,主导开源项目Crane,全面助力客户实现资源优化、成本控制。
        领券
        问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档