前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >详解管道

详解管道

原创
作者头像
用户10786285
发布2023-10-30 16:40:57
3170
发布2023-10-30 16:40:57
举报
文章被收录于专栏:1++的Linux

👍作者主页: 进击的1++ 🤩 专栏链接:【1++的Linux】

一,进程间通信的目的

  1. 数据传输:一个进程需要将它的数据发送给另一个进程
  2. 资源共享:多个进程之间共享同样的资源。
  3. 通知事件:一个进程需要向另一个或一组进程发送消息,通知它(它们)发生了某种事件(如进程终止时要通知父进程)。
  4. 进程控制:有些进程希望完全控制另一个进程的执行(如Debug进程),此时控制进程希望能够拦截另一个进程的所有陷入和异常,并能够及时知道它的状态改变。

进程间通信的必要性:

若没有进程间通信,那么也就无法使用并发能力,无法实现进程间协同。传输数据,消息通知等。

进程是具有独立性的,虚拟地址空间和页表保证了其独立性,因此,进程间通信的成本是比较高的。

想要让两进程间能够通信,那么其必定要能够看到同一份 “内存” 。这份所谓的“内存”不能属于任何一个进程,它应该是共享的。

进程间通信的发展:

  1. 管道
  2. System V进程间通信
  3. POSIX进程间通信

管道:

匿名管道pipe

命名管道

System V IPC:

System V 消息队列

System V 共享内存

System V 信号量

POSIX IPC:

消息队列

共享内存

信号量

互斥量

条件变量

读写锁

二,管道

管道是Unix中最古老的进程间通信的形式。是Linux原生就能够提供的。其有一个入口,一个出口,是单向通信的,也可以说是一种特出的半双工通信。

管道的原理:

我们在上面提到,两进程之间能够进行通信,那么两进程之间就得都能看到同一份资源?那么怎么让两进程看到同一份资源呢?

在fork之后,创建出来的子进程会继承父进程的大多数内容,这其中就包括文件描述符表,那么文件对象会被拷贝给子进程吗?显然是不会的,这样做是没有意义的。

我们在创建子进程之前分别以读写方式打开同一个文件,子进程继承之后,其也能够这个文件的文件描述符,有了文件描述符,我们是不是就能够访问这个文件了!!!我们让父进程进行写,那么就关闭其读的那个文件描述符,让子进程读,那么就关闭其写的那个文件描述符。这样,父子进程间就能够就行通信了。这样通信方式我们叫做匿名管道。

管道的本质是一种文件。

下面我们来简单的实现一个匿名管道:

使用pipe系统调用来创建匿名管道。

代码语言:c++
复制
#include<iostream>
#include<fcntl.h>
#include<unistd.h>
#include<cassert>
#include<cstring>
#include<sys/types.h>
#include<sys/wait.h>
using namespace std;

int main()
{
    //创建匿名管道
    int pipefd[2];//0读--1写
    int n=pipe(pipefd);
    assert(n!=-1);
    cout<<"creat pipe success"<<endl;
    (void)n;
    //创建子进程
    pid_t pid=fork();
    assert(pid!=-1);
    if(pid==0)//子进程
    {
        //子进程负责读
        close(pipefd[1]);//关闭写端
        char buffer[1024];
        while(true)
        {
            //sleep(3);
            ssize_t n=read(pipefd[0],buffer,sizeof(buffer)-1);
            assert(n!=-1);
           if(n>0)
           {
             buffer[n]='\0';
             cout<<"child get a message["<<getpid()<<"]"<<"father#"<<buffer<<endl;
           }
            if(n==0)
            {
                cout<<"write quite,me quite"<<endl;
                break;
            }
        }

        close(pipefd[0]);//可有可无
        exit(0);
    }

    //父进程写
    close(pipefd[0]);//关闭读
    const char* message="I am sending message";
    int count=0;
   while(true)
   {
     ssize_t n=write(pipefd[1],message,strlen(message));
     sleep(1);
     count++;
     if(count==5) break;
   }

   //读写完成,退出
  close(pipefd[1]);
  pid_t ret= waitpid(pid,nullptr,0);
  assert(ret>0);
  return 0;

    return 0;
}

运行结果:

写慢读快时

在这里插入图片描述
在这里插入图片描述

我们发现写慢读快时,读端不会继续写,而是停下来等待写入。

当我们让写快,读慢时(即读时休眠时间长一些)

在这里插入图片描述
在这里插入图片描述

一次会将管道中的所有数据都读出来。管道的大小是有限制的,当管道被写满时,便不会再写,而是等待读。

当把写端关掉,读端进程会直接退出。

当把读端关掉,OS会关掉写进程。

因此管道可以让进程间协同,提供了访问控制。 管道提供的是面向流式的通信服务,其生命周期随进程。

从管道读数据是一次性操作,数据一旦被读,它就从管道中被抛弃,释放空间以便写更多的数据。

在这里插入图片描述
在这里插入图片描述

站在内核的角度,管道的本质就是两个进程对同一个文件对象,一个进行写入,一个进行读取。

看待管道和看待文件一样,使用也是一样的,这也符合:Linux下一切皆文件的思想。

一个父进程可以和一个子进程通信,那么一个父进程能否和多个子进程分别通信?---可以的!

代码如下:

代码语言:c++
复制
#pragma once
#include<iostream>
#include<functional>
#include<vector>
#include<string>
#include<unordered_map>
#include<unistd.h>
#include <utility> 
#include<cassert>
typedef std::function<void()> func;
std::vector<func> callbacks;
std::unordered_map<int,std::string> desc;
void readSQL()
{
    std::cout << "sub process[" << getpid() << " ] 执行访问数据库的任务\n" << std::endl;
}

void execule()
{
    std::cout << "sub process[" << getpid() << " ] 执行url解析\n" << std::endl;
}

void cal()
{
    std::cout << "sub process[" << getpid() << " ] 执行加密任务\n" << std::endl;
}

void save()
{
    std::cout << "sub process[" << getpid() << " ] 执行数据持久化任务\n" << std::endl;
}

void lod()
{ desc.insert({callbacks.size(), "readSQL: 读取数据库"});
    callbacks.push_back(readSQL);

    desc.insert({callbacks.size(), "execule: 进行url解析"});
    callbacks.push_back(execule);

    desc.insert({callbacks.size(), "cal: 进行加密计算"});
    callbacks.push_back(cal);

    desc.insert({callbacks.size(), "save: 进行数据的文件保存"});
    callbacks.push_back(save);

}

void showHandler()
{
    for(auto& e:desc)
    {
        std::cout<<e.first<<'\t'<<e.second<<std::endl;
    }
}

int Handersize()
{
    return callbacks.size();
}
代码语言:c++
复制
#include<unistd.h>
#include<cstring>
#include<sys/types.h>
#include<sys/wait.h>
#include"task.hpp"
#define PROCESS_NUM 5

using namespace std;

int waitcommand(int waitfd,bool& quite)
{
    int command=0;
    ssize_t n=read(waitfd,&command,sizeof(command));
    if(n==0)
    {
        quite=true;
        return -1;
    }

    return command;

}

void SendCommand(int who,int fd,int command)
{
    ssize_t s=write(fd,&command,sizeof(command));
    std::cout<<"main process call"<<who<<"excule"<<desc[command]<<std::endl;
}

int main()
{
    lod();
    //pid  :  fd
    std::vector<std::pair<int,int>> slots;
    //多个子进程
    for(int i=0;i<PROCESS_NUM;i++)
    {
        //创建管道
        int pipefd[2];
        int n=pipe(pipefd);
        assert(n!=-1);
        //创建子进程
        pid_t pid=fork();
        assert(pid!=-1);
        if(pid==0)//子进程
        {
            //子进程读
            close(pipefd[1]);
            while(true)
            {
                //等命令
                bool quite=false;
                int command=waitcommand(pipefd[0],quite);//不写就等待
                if(command==-1)
                {
                    //std::cout<<"退出"<<std::endl;
                    break;
                }
                else if(command>=0&&command<=Handersize())
                {
                    callbacks[command]();
                }
                else
                {
                    std::cout<<"非法输入"<<std::endl;
                }
            }
            //退出
            close(pipefd[0]);
            std::cout<<"write quite,me quite"<<std::endl;
            exit(1);
            
        }

        //父进程写
        close(pipefd[0]);
        slots.push_back(std::make_pair(pid,pipefd[1]));
    }

    //随机派发命令
    srand((unsigned)time(nullptr));
    while(true)
    {
        int command=rand()%Handersize();//选择命令
        int choice=rand()%slots.size();//选择子进程
        //指派任务
        SendCommand(slots[choice].first,slots[choice].second,command);
        sleep(2);
    }

    //关闭所有写
    for(auto& e:slots)
    {
        close(e.second);
    }

    //回收所有子进程
     for(auto& e:slots)
    {
        waitpid(e.first,nullptr,0);
    }

    return 0;
}
在这里插入图片描述
在这里插入图片描述

命名管道:

命名管道与匿名管道的原理相同,都是通过让两个进程看到同一份资源,从而实现通信,但命名管道不再局限于父子进程之间,而是任意两个进程之间实现通信。

两进程看到相同的资源,是通过管道文件的路径从而实现的。

命名管道的本质也是一种文件,但不是普通的文件,普通的文件我们在读写时,会将内存数据刷新到磁盘中,但是我们的管道是不会的。因此其效率也是很高的。

管道文件的创建:

  1. mkfifo filename
  2. int mkfifo(const char *filename,mode_t mode);

下面是我们实现的命名管道的代码:

// 服务端接收消息

代码语言:c++
复制
#include"comm.hpp"
#include"Log.hpp"

static void getmessage(int fd)
{
      char buffer[1024];
   while(true)
   {
        int n=read(fd,buffer,sizeof(buffer-1));
        assert(n!=-1);
        if(n>0)
        {
            buffer[n]='\0';
            std::cout<<"["<<getpid()<<"]"<<"client say: "<<buffer<<std::endl;
        }
        else if(n==0)
        {
            std::cout<<"["<<getpid()<<"]"<<"client quit,me quit"<<std::endl;
            break;
        }
   }
}

int main()
{
    //创建管道
    int n=mkfifo(ipc_path.c_str(),0666);
    if(n<0)
    {
        perror("mkfifo");
        exit(1);
    }
    log("管道创建成功",DEBUG)<<"step1"<<std::endl;

    //打开管道进行读
    int fd=open(ipc_path.c_str(),O_RDONLY);
    if(fd<0)
    {
        perror("open");
        exit(2);
    }
    log("打开管道成功",DEBUG)<<"step2"<<std::endl;
    for(int i=0;i<Process_Num;i++)
    {
        int pid=fork();
        assert(pid>=0);
        if(pid==0)
        {
            getmessage(fd);
            exit(1);
        }

    }
    for(int i=0;i<Process_Num;i++)
    {
        waitpid(-1,nullptr,0);
        std::cout<<"等待成功"<<std::endl;
    }
    close(fd);
    log("关闭管道成功",DEBUG)<<"step3"<<std::endl;
    unlink(ipc_path.c_str());
    log("删除管道成功",DEBUG)<<"step4"<<std::endl;

    return 0;
}

//客户端发送消息

代码语言:c++
复制
#include<iostream>
#include"comm.hpp"
#include"Log.hpp"
#include<cstring>
int main()
{
    int fd=open(ipc_path.c_str(),O_WRONLY);
    if(fd<0)
    {
        perror("open");
        exit(3);
    }
    log("client 打开管道成功",DEBUG)<<"step5"<<std::endl;
    std::string buffer;
    while(true)
    {
        std::cout<<"client say:"<<std::endl;
        std::getline(std::cin,buffer);
        int n=write(fd,buffer.c_str(),buffer.size());

    }
    close(fd);
    return 0;

}
代码语言:c++
复制
#pragma once
#include<iostream>
#include"comm.hpp"
#ifndef _LOG_H_
#define _LOG_H_

#define DEBUG 0
#define NOTICE 1
#define WARNING 2
#define ERROR 3

std::string mes[4]={
    "DEBUG","NOTICE","WARNING","ERROR"
};

std::ostream &log(std::string message,int level)
{
    std::cout<<"|"<<unsigned(time(nullptr))<<"|"<<mes[level]<<"|"<<message;
    return std::cout;
}


#endif
代码语言:c++
复制
#pragma once
#include<iostream>
#include<sys/types.h>
#include<sys/stat.h>
#include<string>
#include<cassert>
#include<fcntl.h>
#include<unistd.h>
#include<sys/wait.h>
#define Process_Num 4

std::string ipc_path="./fifo.ipc";

一个普通的全局的静态函数与普通函数的区别是:用static修饰的函数,限定在本源码文件中,不能被本源码文件以外的代码文件调用。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 一,进程间通信的目的
  • 二,管道
相关产品与服务
消息队列
腾讯云消息队列 TDMQ 是分布式架构中的重要组件,提供异步通信的基础能力,通过应用解耦降低系统复杂度,提升系统可用性和可扩展性。TDMQ 产品系列提供丰富的产品形态,包含 CKafka、RocketMQ、RabbitMQ、Pulsar、CMQ 五大产品,覆盖在线和离线场景,满足金融、互联网、教育、物流、能源等不同行业和场景的需求。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档