前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >从零开始:实现进程间管道通信的实例

从零开始:实现进程间管道通信的实例

作者头像
绝活蛋炒饭
发布2024-12-16 16:26:09
发布2024-12-16 16:26:09
22800
代码可运行
举报
文章被收录于专栏:绝活编程学习绝活编程学习
运行总次数:0
代码可运行

1 进程之间通信的引出

进程之间能不能直接进行数据交换?---- 不能,因为进程之间具有独立性

但是,进程之间的数据交换是必须要有的,为什么?因为以下目的必须被实现

  • 数据传输:一个进程需要将它的数据发送给另一个进程
  • 资源共享:多个进程之间共享同样的资源。
  • 通知事件:一个进程需要向另一个或一组进程发送消息,通知它(它们)发生了某种事件(如进程终止时要通知父进程)。
  • 进程控制:有些进程希望完全控制另一个进程的执行(如Debug进程),此时控制进程希望能够拦截另一个进程的所有陷入和异常,并能够及时知道它的状态改变

进程之间的通信是什么?

  • 就是一个进程把自己的数据传递给另一个进程

进程之间的通信要怎么实现?

  1. 一般来说进程之间的通信,就是为了能让两个进程之间能看到同一份内存空间。
  2. 但是这一份内存空间,不能让两个进程之间的任意一方出,所以只能让OS来维护这个内存空间

具体做法:

  1. 管道
  2. 消息队列
  3. 共享内存
  4. 信号量

2 管道

  1. 管道是Unix中最古老的进程间通信的形式。
  2. 我们把从一个进程连接到另一个进程的一个数据流称为一个“管道
  • 管道是一种特殊的文件类型,它并不在磁盘上占用实际的物理空间来存储数据,管道的数据存在于内核的缓冲区中,这些缓冲区是内存的一部分,即:它是内存级的文件,数据交换是在内存中直接进行的,无需将数据写入到磁盘再进行读取。

3 匿名管道

匿名管道(Anonymous Pipe)是进程间通信(IPC)的一种机制,它主要用于本地父子进程之间的通信。

3.1 定义与结构

  • 匿名管道是由内核管理的一块缓冲区,用于实现进程间的数据传输。
  • 它没有名字,因此只能在创建它的进程及其子进程之间使用。

3.2 特点与限制

  • 结构简单:匿名管道的结构相对简单,实现起来也较为容易。
  • 占用资源小:由于匿名管道不需要在文件系统中创建实体,因此它占用的系统资源相对较小。(匿名管道的本质就是一个内存级的资源,只存在于内存中)
  • 仅限于本地父子进程间通信:匿名管道不支持跨网络之间的两个进程之间的通信,也不能用于任意两个进程之间的通信,只能用于本地父子进程之间的通信。(匿名管道没有显示的名称,所以在文件系统中找不到,也无法通过常规的文件操作(open、write等)来访问它,只能在创建它的进程及其子进程中有效。)
  • 单向通信:匿名管道是单向的,即数据只能从一个进程流向另一个进程。如果需要双向通信,则需要创建两个管道。

3.3 应用场景

  • 匿名管道常用于父进程创建子进程,并通过管道向子进程传递数据的场景。例如,在实现两围棋引擎本地对战时,可以使用匿名管道来传递棋局数据。

3.4 创建与使用

  • 在Linux系统中,可以使用pipe()函数来创建一个匿名管道。该函数返回一个文件描述符数组,其中fd[0]用于读取数据,fd[1]用于写入数据。
  • 在Windows系统中,可以使用CreatePipe()函数来创建匿名管道。该函数在创建匿名管道的同时返回两个句柄:管道读句柄和管道写句柄。

综上所述,匿名管道是一种简单、高效的进程间通信方式,特别适用于本地父子进程之间的单向数据传输。然而,由于其限制性和局限性,它并不适用于所有场景。在选择进程间通信方式时,需要根据具体的应用场景和需求来选择最合适的通信方式。


3.5 pipe()接口

pipe() 是一个在 Unix/Linux 系统中用于创建管道的系统调用,允许两个进程之间进行简单的进程间通信(IPC)。它的基本功能是提供一个单向的数据流,支持一个进程将数据写入管道,而另一个进程从管道中读取数据。

参数

  • pipefd:这是一个长度为 2 的整型数组,`pipefd[0]` 用于读取数据,`pipefd[1]` 用于写入数据。调用 `pipe()` 后,数组将被填充两个文件描述符。

返回值

  • - 如果成功,`pipe()` 返回 0。
  • - 如果失败,返回 -1,并将 `errno` 设置为相应的错误代码。

3.6 用fork来来理解管道原理

2.3 站在文件描述符角度-深度理解管道


3.7 snprintf()接口

snprintf 是 C 和 C++ 中用于格式化和存储字符串的函数,它具有安全特性,可以防止缓冲区溢出。它将格式化的字符串写入字符数组(缓冲区),类似于 printf,但通过额外的参数来控制输出缓冲区的大小。

参数

  • char *str:指向将要存储格式化字符串的缓冲区的指针。
  • size_t size:要写入缓冲区的最大字符数,包括空字符。
  • const char *format:格式字符串,用于指定如何格式化后续参数。
  • ...:要格式化的值,与格式字符串中的格式说明符对应。

返回值

  • 该函数返回如果有足够的空间可以写入的字符数,不包括空字符。如果输出因缓冲区大小不足而被截断,返回值将大于或等于 size

2 管道读写的四种情况

2.1 管道内部没有数据 && 子进程不关闭自己写端,读端(父)就要阻塞等待直到pipe内有数据

代码语言:javascript
代码运行次数:0
运行
复制
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <iostream>

using namespace std;

void my_write(int wfd)
{
    const char *str = "hello father ,I am child";
    char buf[1024];
    int cnt = 0;
    pid_t pid = getpid();

    while (1)
    {
        snprintf(buf, sizeof(buf), "message: %s ,pid: %d , cnt: %d \n", str, pid, cnt);
        write(wfd, buf, sizeof(buf));
        cnt++;

        sleep(100);
    }
    close(wfd);
}

void my_read(int rfd)
{
    char buf[1024];

    while(1)
    {
        
        ssize_t n = read(rfd, buf, sizeof(buf)-1);

        cout << "father get message :  " << buf << endl;
        sleep(1);
    }
}

int main()
{
    int pipefd[2]{0};

    int n = pipe(pipefd);

    // 如果失败直接返回
    if (n < 0)
    {
        exit(-1);
    }
    // 成功了,打印一下
    std::cout << "pipefd[0]: " << pipefd[0] << endl;
    std::cout << "pipefd[1]: " << pipefd[1] << endl;

    // 开始创建进程
    pid_t id = fork();

    if (0 == id)
    {
        // 关闭进程的读端,让子进程进程 写
        close(pipefd[0]);
        my_write(pipefd[1]);

        // 直接让子进程退出,不执行下面的代码
        exit(0);
    }

    // 关闭父进程的写端,让父进程进程读

    close(pipefd[1]);
    my_read(pipefd[0]);

    int status = 0;
    int rid = waitpid(id, &status, 0); // 等待子进程,阻塞等待

    if (id == rid)
    {
        std::cout << "error code: " << WEXITSTATUS(status) << endl;
        std::cout << "error signal: " << (status & 0x7F) << endl;
    }
    return 0;
}

2.2 管道内部被写满 && 父进程不关闭自己读端,写端(子)就要阻塞等待直到pipe内有数据

代码语言:javascript
代码运行次数:0
运行
复制
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <iostream>

using namespace std;

void my_write(int wfd)
{
    const char *str = "hello father ,I am child";
    char buf[1024];
    int cnt = 0;
    pid_t pid = getpid();

    while (1)
    {
        snprintf(buf, sizeof(buf), "message: %s ,pid: %d , cnt: %d \n", str, pid, cnt);
        write(wfd, buf, sizeof(buf));
        cnt++;

        sleep(1);
    }
    close(wfd);
}

void my_read(int rfd)
{
    char buf[1024];

    while(1)
    {
        
        ssize_t n = read(rfd, buf, sizeof(buf)-1);

        cout << "father get message :  " << buf << endl;
        sleep(100);
    }
}

int main()
{
    int pipefd[2]{0};

    int n = pipe(pipefd);

    // 如果失败直接返回
    if (n < 0)
    {
        exit(-1);
    }
    // 成功了,打印一下
    std::cout << "pipefd[0]: " << pipefd[0] << endl;
    std::cout << "pipefd[1]: " << pipefd[1] << endl;

    // 开始创建进程
    pid_t id = fork();

    if (0 == id)
    {
        // 关闭进程的读端,让子进程进程 写
        close(pipefd[0]);
        my_write(pipefd[1]);

        // 直接让子进程退出,不执行下面的代码
        exit(0);
    }

    // 关闭父进程的写端,让父进程进程读

    close(pipefd[1]);
    my_read(pipefd[0]);

    int status = 0;
    int rid = waitpid(id, &status, 0); // 等待子进程,阻塞等待

    if (id == rid)
    {
        std::cout << "error code: " << WEXITSTATUS(status) << endl;
        std::cout << "error signal: " << (status & 0x7F) << endl;
    }
    return 0;
}

2.3 对于写端而已,不写了 && 关闭了pipe ,对于读端来说,读端就会把管道里的数据先读完,最后会读到返回值为0

读端本身不会因为写端的关闭而被OS直接关闭,它仍需要由读进程显示地关闭或随着读进程的结束而关闭。

如果负责读的父进程,不结束就会。。。一直等待读(因为没有主动退出读的那个循环,我将my_read函数中的break注释了)

代码语言:javascript
代码运行次数:0
运行
复制
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <iostream>

using namespace std;
// 对于写端而已,不写了 && 关闭了pipe ,对于读端来说,读端就会把管道里的数据先读完,最后会读到返回值为0 

// 关于write函数的写入,在写入字符串的时候,第三个参数的字节数大于要写入的字符串,是就写入字符串就停止了,小于是发生截断
// 在对管道写入的时候默认是追加
void my_write(int wfd)
{
    const char *str = "hello father ,I am child";
    char buf[128];
    // cout << "sizeof(buf) " << sizeof(buf) << endl;
    int cnt = 0;
    pid_t pid = getpid();

    // snprintf(buf, sizeof(buf), "message: %s ,pid: %d , cnt: %d \n", str, pid, cnt);

    while (cnt < 10)
    {
        snprintf(buf, sizeof(buf), "message: %s ,pid: %d , cnt: %d \n", str, pid, cnt); // 多次写入会进行覆盖,第二次写入会覆盖之前的字符串

        // cout << "buf  -->" << buf << endl;
        //  cout << "sizeof(buf) " << sizeof(buf) << endl;

        write(wfd, buf, sizeof(buf));

        cout << "write time : " << cnt << endl;
        cnt++;

        sleep(1);
    }

    close(wfd);
    cout << " wfd closed" << endl;
}

// read函数,就是将管道里的数据拷贝到目标缓冲区
// 从管道读数据写入到字符数组,对于数组数组来说,肯定是从起始位置覆盖式写。
// 如果是从字符数组读内容,写到管道,同一个进程的话,会因为文件偏移发生追加式写/

void my_read(int rfd)
{
    char buf[1024];

    while (1)
    {

        ssize_t n = read(rfd, buf, sizeof(buf) - 1);
        // buf[n] = '\0';
        // cout << "buf  -->" << buf << endl;

        if (n > 0)
        {
            // buf[n] = '\0';
            cout << "father get message :  " << buf << endl;
            // buf[n] = '\0';
        }
        else if (0 == n)
        {
            cout << "chile close write pipe ..." << endl;
            break;
        }
        else
        {
            cout << "read is error" << endl;
        }
        sleep(1);
    }
}

int main()
{
    int pipefd[2]{0};

    int n = pipe(pipefd);

    // 如果失败直接返回
    if (n < 0)
    {
        exit(-1);
    }
    // 成功了,打印一下
    std::cout << "pipefd[0]: " << pipefd[0] << endl;
    std::cout << "pipefd[1]: " << pipefd[1] << endl;

    // 开始创建进程
    pid_t id = fork();

    if (0 == id)
    {
        // 关闭进程的读端,让子进程进程 写
        close(pipefd[0]);
        my_write(pipefd[1]);

        // 直接让子进程退出,不执行下面的代码
        exit(0);
    }

    // 关闭父进程的写端,让父进程进程读

    close(pipefd[1]);
    my_read(pipefd[0]);

    int status = 0;
    int rid = waitpid(id, &status, 0); // 等待子进程,阻塞等待

    if (id == rid)
    {
        std::cout << "error code: " << WEXITSTATUS(status) << endl;
        std::cout << "error signal: " << (status & 0x7F) << endl;
    }
    return 0;
}

2.4 读端不在读 && 将读端关闭,OS会将写入的进程关闭(子进程),(通过信号 -13 SIGPIPE)

代码语言:javascript
代码运行次数:0
运行
复制
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <iostream>

using namespace std;
 // 读端不在读 && 将读端关闭,OS会将写入的进程关闭(子进程),(通过信号 -13 SIGPIPE)

// 关于write函数的写入,在写入字符串的时候,第三个参数的字节数大于要写入的字符串,是就写入字符串就停止了,小于是发生截断
// 在对管道写入的时候默认是追加
void my_write(int wfd)
{
    const char *str = "hello father ,I am child";
    char buf[128];
    // cout << "sizeof(buf) " << sizeof(buf) << endl;
    int cnt = 0;
    pid_t pid = getpid();

    // snprintf(buf, sizeof(buf), "message: %s ,pid: %d , cnt: %d \n", str, pid, cnt);

    while (1)
    {
        snprintf(buf, sizeof(buf), "message: %s ,pid: %d , cnt: %d \n", str, pid, cnt); // 多次写入会进行覆盖,第二次写入会覆盖之前的字符串

        // cout << "buf  -->" << buf << endl;
        //  cout << "sizeof(buf) " << sizeof(buf) << endl;

        write(wfd, buf, sizeof(buf));

        cout << "write time : " << cnt << endl;
        cnt++;

        sleep(1);
    }

   // close(wfd);
    cout << " wfd closed" << endl;
}

// read函数,就是将管道里的数据拷贝到目标缓冲区
// 从管道读数据写入到字符数组,对于数组数组来说,肯定是从起始位置覆盖式写。
// 如果是从字符数组读内容,写到管道,同一个进程的话,会因为文件偏移发生追加式写/

void my_read(int rfd)
{
    char buf[1024];
    int cnt = 5;

    while (cnt)
    {

        ssize_t n = read(rfd, buf, sizeof(buf) - 1);
        // buf[n] = '\0';
        // cout << "buf  -->" << buf << endl;

        if (n > 0)
        {
            // buf[n] = '\0';
            cout << "father get message :  " << buf << endl;
            // buf[n] = '\0';
        }
        else if (0 == n)
        {
            cout << "chile close write pipe ..." << endl;
            break;
        }
        else
        {
            cout << "read is error" << endl;
        }
        sleep(1);

        cnt--;
    }
     close(rfd);
     return ;
}

int main()
{
    int pipefd[2]{0};

    int n = pipe(pipefd);

    // 如果失败直接返回
    if (n < 0)
    {
        exit(-1);
    }
    // 成功了,打印一下
    std::cout << "pipefd[0]: " << pipefd[0] << endl;
    std::cout << "pipefd[1]: " << pipefd[1] << endl;

    // 开始创建进程
    pid_t id = fork();

    if (0 == id)
    {
        // 关闭进程的读端,让子进程进程 写
        close(pipefd[0]);
        my_write(pipefd[1]);

        // 直接让子进程退出,不执行下面的代码
        exit(0);
    }

    // 关闭父进程的写端,让父进程进程读

    close(pipefd[1]);
    my_read(pipefd[0]);

    int status = 0;
    int rid = waitpid(id, &status, 0); // 等待子进程,阻塞等待

    if (id == rid)
    {
        std::cout << "error code: " << WEXITSTATUS(status) << endl;
        std::cout << "error signal: " << (status & 0x7F) << endl;
    }
    return 0;
}

3 管道读写规则

当没有数据可读时:

O_NONBLOCK disable:read调用阻塞,即进程暂停执行,一直等到有数据来到为止。

O_NONBLOCK enable:read调用返回-1,errno值为EAGAIN。

当管道满的时候:

O_NONBLOCK disable: write调用阻塞,直到有进程读走数据O_NONBLOCK enable:调用返回-1,errno值为EAGAIN

如果所有管道写端对应的文件描述符被关闭,则read返回0

如果所有管道读端对应的文件描述符被关闭,则write操作会产生信号SIGPIPE,进而可能导致write进程退出

当要写入的数据量不大于PIPE_BUF时,linux将保证写入的原子性。当要写入的数据量大于PIPE_BUF时,linux将不再保证写入的原子性


4 进程通信的四大特性

进程通信,特别是通过管道进行的通信,确实具有一些显著的特性。以下是对您提出的四大特性的详细解释:

  1. 自带同步和互斥机制
    • 同步:进程间的同步是指多个进程在执行过程中,需要在某些点上协调它们的工作次序,以保证数据的一致性和正确性。管道通信本身并不直接提供复杂的同步机制,但操作系统在内核层面对管道操作进行了一定的同步处理,以确保数据的正确传输。例如,当管道中没有数据时,读操作会阻塞,直到有数据写入;当管道写满时,写操作也会阻塞,直到有数据被读出。这种机制在一定程度上实现了进程间的同步。
    • 互斥:互斥是指多个进程不能同时访问同一临界资源。虽然管道本身不是临界资源,但在使用管道进行通信时,会涉及到对管道读写端的访问。由于管道是半双工的,同一时刻只能有一个进程进行写操作或读操作,这在一定程度上实现了对管道访问的互斥。然而,这种互斥是由管道的通信机制本身决定的,而不是通过额外的互斥锁或信号量来实现的。
  2. 面向字节流
    • 管道通信是面向字节流的,即数据以字节为单位在管道中传输。这意味着,无论发送方发送的数据是什么类型或格式,接收方都会以字节流的形式接收到这些数据,并需要自行解析和处理。这种面向字节流的通信方式使得管道能够传输各种类型的数据,包括文本、图像、音频等。
  3. 管道的生命周期随进程
    • 管道的生命周期与创建它的进程及其子进程紧密相关。当所有打开管道的文件描述符都被关闭时,管道就会被释放。通常,这意味着当创建管道的进程及其所有子进程都退出时,管道就会随之消失。这种生命周期管理方式使得管道成为一种轻量级的、易于管理的进程间通信方式。
  4. 管道只能单向通信,是半双工通信的一种特殊情况
    • 管道是半双工通信的一种特殊情况,即它只能在一个方向上传输数据。具体来说,一个管道有两个端点:读端和写端。在任意时刻,只能有一个进程向管道的写端写入数据,而另一个进程则从管道的读端读取数据。这种单向通信的特性使得管道在某些场景下非常有用,例如父子进程之间的数据传递。然而,如果需要双向通信,则需要创建两个管道,分别用于两个方向上的数据传输。

综上所述,进程通信(特别是通过管道进行的通信)具有自带同步和互斥机制、面向字节流、管道的生命周期随进程以及管道只能单向通信(半双工通信的一种特殊情况)等四大特性。这些特性使得管道成为一种简单、高效且易于管理的进程间通信方式。


5 匿名管道的应用:进程池

进程池(Process Pool)是一种常见的多进程编程技术,旨在提高并发处理能力和资源利用率。

5.1 定义与组成

  • 进程池是由一组预先创建好的空闲进程和管理这些进程的进程所组成的技术应用。
  • 管理进程负责创建资源进程,把工作交给空闲资源进程处理,并回收已经处理完工作的资源进程。

5.2 工作原理

  • 进程池中的进程在应用程序的生命周期内保持活动状态,随时准备处理任务。
  • 管理进程通过有效的任务调度机制,将任务分配给空闲的进程执行。
  • 进程执行完任务后,将结果返回给管理进程或指定的接收者。
  • 管理进程还负责回收空闲资源进程,以优化系统资源的使用。

5.3 优点与特性

  1. 提高并发处理能力:进程池能够充分利用多核处理器的性能,通过并行执行多个进程来提高系统的并发处理能力。
  2. 减少系统开销:进程的创建和销毁是非常耗时的操作。进程池通过预先创建一组进程,避免了频繁的创建和销毁操作,从而减少了系统开销。
  3. 提高响应速度:由于进程池中的进程已经存在并处于就绪状态,当有新任务时可以立即分配给这些进程处理,大大提高了系统的响应速度。
  4. 资源管理:进程池可以限制同时运行的进程数量,避免系统资源被过度占用,保证系统的稳定运行。

5.4 应用场景

进程池适用于需要大量并行处理的任务场景,如:

  • 数值计算:进行大量的数值运算或复杂的数据处理任务时,可以使用进程池来加速计算过程。
  • 图像处理:对大量图像进行批量处理(如缩放、裁剪、滤镜等)时,可以使用进程池来提高处理效率。
  • 网络服务器:在处理大量并发网络请求时,可以使用进程池来管理多个工作进程,以提高服务器的响应速度和吞吐量。

5.5 注意事项

  • 进程间通信:进程池中的进程是独立的,无法直接共享内存和变量。因此,需要通过某种机制(如IPC、信号、管道等)来实现进程间通信。
  • 资源占用:进程池中的进程需要占用一定的系统资源(如内存、CPU时间等)。因此,在使用进程池时需要根据具体的应用场景进行调整和优化,以避免资源过度消耗和系统负载过重。
  • 任务调度:进程池通常配备有任务调度机制,但开发者需要根据具体需求来设计和实现合理的任务分配策略,以提高系统的整体性能和稳定性。

综上所述,进程池是一种高效的多进程编程技术,能够在需要大量并行处理的任务中显著提升系统性能。然而,在使用进程池时也需要根据具体的应用场景进行权衡和选择,以充分发挥其优势并避免潜在的问题。

代码语言:javascript
代码运行次数:0
运行
复制
#pragma once

#include<iostream>
#include<cstdio>
#include<sys/types.h>
#include<sys/wait.h>
#include<unistd.h>
#include<cstring>
#include<vector>
#include<cstdlib>
#include<ctime>
#include"task.hpp"

using namespace std;

enum //匿名枚举,用来将错误码转化为错误信息
{
    UsageError = 1,
    ProcnumError,
    PipeError
};

void Usage(const string& proc)
{
    cout << "Usage:" << proc << "proc_num" << endl;
    return ;
}

class Channel //先描述(类)
{
public:
    Channel(int wfd, int id, string& name)
        :_wfd(wfd),_id(id),_name(name)
    { }

    int Wfd(){ return _wfd; }

    string& Name(){ return _name;}

    int Id(){ return _id; }

    void Print()
    {
        cout << "_wfd: " << _wfd;
        cout << ",_id: " << _id;
        cout << ",_name: " << _name << endl;
    }

    ~Channel(){ }
private:
    int _wfd;
    pid_t _id;
    string _name;
};


class Processpol //进程池
{
public:
    Processpol(int proc_num) 
        :_proc_num(proc_num)
    { }

    vector<int> fds; //用来关闭struct file被多个文件描述符指向
    int CreatProcess(work_t work) //1.创建一组子进程、命令行管道
    {
        for(int i = 0; i < _proc_num; i++)
        {
            //父进程需要对管道进行管理,否则会被覆盖
            int pipefd[2]{0}; 
            int n = pipe(pipefd);
            if(n == -1) return PipeError; //创建管道失败
            
            pid_t id = fork(); 
            if(id == 0) //子进程
            {
                if (!fds.empty()) 
                    for (auto& fd : fds) close(fd);

                close(pipefd[1]); //关闭管道的写端
                dup2(pipefd[0], 0); //将标准输入重定向到管道读端
                work(pipefd[0]);  //执行任务 —— 回调函数
                exit(0); //终止
            }
            //父进程
            close(pipefd[0]); //关闭管道的读端

            string name = "channel-" + to_string(i); //管道名字
            _channels.push_back(Channel(pipefd[1], id, name)); 

            fds.push_back(pipefd[1]);
        }
        return 0;
    }

    //2.1.选择一个子进程、管道,来执行任务 —— 父进程'负载均衡'式的给子进程分配任务
    int NextChannel()
    {
        static int next = 0; //静态,不属于任何对象,全局, 用来定位进程池中的各个进程、管道
        int n = next;
        next++;
        next %= _channels.size(); //不可越界
        return n;
    }

    //2.3.发送任务编号到管道写端,以供子进程读取
    void SendTaskCode(int c_code, int t_code)
    {
        cout << "send code " << t_code << " to " << _channels[c_code].Name() << " proc_id " << _channels[c_code].Id() << endl;
        write(_channels[c_code].Wfd(), &t_code, sizeof(t_code)); //父进程将数据写入管道的写端
        sleep(1);
    }
    
    //3.回收资源
    /*因为子进程会继承父进程的文件描述符表,子进程会将父进程中以w方式打开上一文件的fd,
    使得上一文件被多个指针指向,所以关闭父进程写端,此文件的struct file中引用计数--,并未全部关闭*/
    void RecycleProc()
    {
        for(auto& e : _channels)
        {
            close(e.Wfd()); //关闭当前被打开文件(管道)的写端
            int rid = waitpid(e.Id(), NULL, 0); //回收子进程资源
            if(rid > 0) //等待成功
                cout << "wait process " << e.Id() << "sucess" << endl;
            cout << e.Name() << " close done. process " << e.Id() <<"quit now!" << endl; 
        }
    }

    void Debug() //打印子进程、命令行管道相关信息
    {
        for(auto& e : _channels)
        {
            e.Print();
        }
    }

    ~Processpol(){ }

private:
    vector<Channel> _channels; //再组织
    int _proc_num; //预先创建的子进程数量
};

void CtrlProcessPool(Processpol* ptr, int cnt)
{
    while(cnt--)
    {
        int c_code = ptr->NextChannel();
        int t_code = NextCode();
        ptr->SendTaskCode(c_code, t_code);
    }
}

int main(int argc, char* argv[])
{
    //设置随机数生成的种子(起始值),若无srand,则每次程序启动随机数序列均相同,使用当前时间(时间戳)来初始化种子
    srand((uint64_t)time(nullptr)); //unit64_t为无符号64位整数,long long别名

    //输入错误
    if(argc != 2) 
    {
        Usage(argv[0]);
        return UsageError;
    }

    //预设创建的进程数须>0
    int proc_num = stoi(argv[1]);
    if(proc_num <= 0) 
        return ProcnumError;
    
    Processpol* ptr = new Processpol(proc_num); //创建一个进程池
    //1.创建一组子进程、命令行管道
    ptr->CreatProcess(Work);
    ptr->Debug(); //打印子进程、命令行管道相关信息

    //2.控制子进程
    CtrlProcessPool(ptr, 5);
    cout << "task finish......" << endl;

    //3.回收资源: 子进程先要退出(关闭写端),再回收子进程资源
    ptr->RecycleProc();
    
    return 0;
}
————————————————

                            版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
                        
原文链接:https://blog.csdn.net/m0_74808907/article/details/143231476
代码语言:javascript
代码运行次数:0
运行
复制
#include<cstdio>
#include<iostream>
#include<unistd.h>
#include <sys/types.h>

using namespace std;

//函数指针类型
typedef void(*work_t)(int rfd); 
typedef void(*task_t)(int rfd, int id);

//任务
void PrintLog(int rfd, int id)
{
    cout << "rfd: " << rfd << " process: " << id << " is working: " << " printf log task!" << endl;
}

void ReloadConf(int rfd, int id)
{
    cout << "rfd: " << rfd << " process: " << id << " is working: " << " reload conf task!" << endl;
}

void ConnectMysql(int rfd, int id)
{
    cout<< "rfd: " << rfd << " process: " << id << " is working: " << "Connect Mysql task!" << endl;
}

task_t task[3] = {PrintLog, ReloadConf, ConnectMysql };

int NextCode() //2.2.选择要执行任务的编号
{
    return rand() % 3; //生成一个伪随机数
}

void Work(int rfd) //2.4.子进程接受并处理任务
{
    while(true)
    {
        int code = 0; 
        int n = read(0, &code, sizeof(code)); //子进程从管道的读端读取数据
        if(n > 0)  
        {
            if(code >= 3) continue; 
            task[code](rfd, getpid()); 
        }
        else if(n == 0) //情况3,写端关闭,读取到了结尾
        {
            cout << "read ending!" << endl;
            break;
        }
        else 
        {
            cout << "read error!" << endl;
            break;
        }

        sleep(1);
    }
}

6 命名管道

命名管道(Named Pipes),也被称为命名管线或FIFO(First In First Out,先进先出),是一种简单的进程间通信(IPC)机制。以下是对命名管道的详细介绍:

6.1 基本概念

命名管道是一种特殊的文件类型,它允许在同一台计算机的不同进程之间或在跨越一个网络的不同计算机的不同进程之间,进行可靠的、单向或双向的数据通信。它克服了传统管道没有名字、只能用于具有亲缘关系的进程间通信的限制,通过提供一个路径名与之关联,使得任何可以访问该路径的进程都能够通过命名管道进行通信。

6.2 特点与优势

  1. 跨进程通信:命名管道可以用于任何两个进程间的通信,而不限制这两个进程是否同源,因此使用比管道更灵活方便。
  2. 文件系统可见性:命名管道在文件系统中具有可见性,可以通过路径名进行访问,这使得它适合需要跨多个会话或长期运行的进程通信的场景。
  3. 安全性:在Windows平台上,命名管道充分利用了内建的安全特性(如ACL等),为进程间通信提供了安全保障。
  4. 简单易用:使用命名管道进行跨计算机应用程序的设计非常简单,不需要事先深入掌握底层网络传送协议(如TCP、UDP等)的知识。

6.3 mkfifo() --- 创建命名管道

mkfifo()函数是一个系统调用,用于在UNIX和类UNIX操作系统(如Linux)中创建命名管道(Named Pipe),也称为FIFO(First In First Out)特殊文件。命名管道允许不同进程之间按照先进先出的顺序进行数据交换。

参数

  • pathname:指向一个以null结尾的字符串,该字符串指定了要创建的命名管道的路径和文件名。
  • mode:设置命名管道的权限模式,类似于open()函数中的模式参数。这些权限位将决定哪些用户可以读写管道。

返回值

  • 成功时,mkfifo()返回0。
  • 失败时,返回-1,并设置errno以指示错误类型。

6.4 unlink

unlink()函数是一个系统调用,用于删除一个目录项(通常是一个文件或命名管道)。它并不直接删除文件内容,而是从文件系统的目录结构中删除指定的文件名,从而使其不可访问。如果该文件是被多个进程打开的,那么unlink()调用不会立即删除文件内容,而是等到最后一个打开该文件的进程关闭文件描述符时,文件内容才会被真正删除。这种行为对于实现临时文件和命名管道等资源的清理特别有用。

参数

  • pathname:指向一个以null结尾的字符串,该字符串指定了要删除的文件的路径和文件名。

返回值

  • 成功时,unlink()返回0。
  • 失败时,返回-1,并设置errno以指示错误类型。

6.5 通过命名管道实现 server&client通信

代码语言:javascript
代码运行次数:0
运行
复制
#include<iostream>
#include<cstdio>
#include<sys/types.h>
#include<sys/stat.h>
#include<cstring>
#include<errno.h>
#include<unistd.h>
 #include <fcntl.h>

using namespace std;

#define Mode 0666 //指定命名管道的权限
#define Path "./fifo" //路径名

class Fifo
{
public:
    Fifo(const string& path) 
        :_path(path)
    {
        umask(0);
        int n = mkfifo(Path, Mode); //创建命名管道
        if(n == 0) 
        {
            cout << "mkfifo success!" << endl;
        }
        else
        {
            cout << "mkfifo failed! errno: " << errno << "errdesc: " << strerror(errno) << endl; 
        }
    }

    ~Fifo() 
    {
        int n = unlink(Path); //删除命名管道
        if(n == 0)
        {
            cout << "remove file success!" << endl;
        }
        else
        {
            cout << "remove file failed! errno: " << errno << "errdesc: " << strerror(errno) << endl; 
        }

    }

private:
    string _path;
};
代码语言:javascript
代码运行次数:0
运行
复制
#include"name_pipe.hpp"

int main()
{
    //1.创建命名管道
    Fifo fifo(Path);

    //2.以读的方式打开命名管道
    int fd = open(Path, O_RDONLY);
    if(fd <= 0)
    {
        cout << "open failed! errno: " << errno << "errdesc: " << strerror(errno) << endl;
        return 1;
    }
    cout << "open success!" << endl;

    //3.从命名管道中读取数据
    char buffer[128];
    while(1)
    {
        int n = read(fd, buffer, sizeof(buffer));
        if(n > 0)
        {
            buffer[n] = '\0';
            cout << "client say: " << buffer << endl;
        }
        else if(n == 0)
        {
            cout << "client quit! me to...." << endl;
            break;
        }
        else
        {
           cout << "read failed! errno: " << errno << "errdesc: " << strerror(errno) << endl;
           break;
        }
    }
}

我的博客即将同步至腾讯云开发者社区,邀请大家一同入驻:https://cloud.tencent.com/developer/support-plan?invite_code=cuhg6omf704

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2024-12-15,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1 进程之间通信的引出
  • 2 管道
  • 3 匿名管道
    • 3.1 定义与结构
    • 3.2 特点与限制
    • 3.3 应用场景
    • 3.4 创建与使用
    • 3.5 pipe()接口
    • 3.6 用fork来来理解管道原理
    • 2.3 站在文件描述符角度-深度理解管道
    • 3.7 snprintf()接口
  • 2 管道读写的四种情况
    • 2.1 管道内部没有数据 && 子进程不关闭自己写端,读端(父)就要阻塞等待直到pipe内有数据
    • 2.2 管道内部被写满 && 父进程不关闭自己读端,写端(子)就要阻塞等待直到pipe内有数据
    • 2.3 对于写端而已,不写了 && 关闭了pipe ,对于读端来说,读端就会把管道里的数据先读完,最后会读到返回值为0
    • 2.4 读端不在读 && 将读端关闭,OS会将写入的进程关闭(子进程),(通过信号 -13 SIGPIPE)
  • 3 管道读写规则
  • 4 进程通信的四大特性
  • 5 匿名管道的应用:进程池
    • 5.1 定义与组成
    • 5.2 工作原理
    • 5.3 优点与特性
    • 5.4 应用场景
    • 5.5 注意事项
  • 6 命名管道
    • 6.1 基本概念
    • 6.2 特点与优势
    • 6.3 mkfifo() --- 创建命名管道
    • 6.4 unlink
    • 6.5 通过命名管道实现 server&client通信
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档