进程之间能不能直接进行数据交换?---- 不能,因为进程之间具有独立性
但是,进程之间的数据交换是必须要有的,为什么?因为以下目的必须被实现
进程之间的通信是什么?
进程之间的通信要怎么实现?
具体做法:
匿名管道(Anonymous Pipe)是进程间通信(IPC)的一种机制,它主要用于本地父子进程之间的通信。
pipe()
函数来创建一个匿名管道。该函数返回一个文件描述符数组,其中fd[0]
用于读取数据,fd[1]
用于写入数据。CreatePipe()
函数来创建匿名管道。该函数在创建匿名管道的同时返回两个句柄:管道读句柄和管道写句柄。综上所述,匿名管道是一种简单、高效的进程间通信方式,特别适用于本地父子进程之间的单向数据传输。然而,由于其限制性和局限性,它并不适用于所有场景。在选择进程间通信方式时,需要根据具体的应用场景和需求来选择最合适的通信方式。
pipe() 是一个在 Unix/Linux 系统中用于创建管道的系统调用,允许两个进程之间进行简单的进程间通信(IPC)。它的基本功能是提供一个单向的数据流,支持一个进程将数据写入管道,而另一个进程从管道中读取数据。
参数
返回值
snprintf
是 C 和 C++ 中用于格式化和存储字符串的函数,它具有安全特性,可以防止缓冲区溢出。它将格式化的字符串写入字符数组(缓冲区),类似于printf
,但通过额外的参数来控制输出缓冲区的大小。
参数
char *str
:指向将要存储格式化字符串的缓冲区的指针。size_t size
:要写入缓冲区的最大字符数,包括空字符。const char *format
:格式字符串,用于指定如何格式化后续参数。...
:要格式化的值,与格式字符串中的格式说明符对应。
返回值
size
。#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <iostream>
using namespace std;
void my_write(int wfd)
{
const char *str = "hello father ,I am child";
char buf[1024];
int cnt = 0;
pid_t pid = getpid();
while (1)
{
snprintf(buf, sizeof(buf), "message: %s ,pid: %d , cnt: %d \n", str, pid, cnt);
write(wfd, buf, sizeof(buf));
cnt++;
sleep(100);
}
close(wfd);
}
void my_read(int rfd)
{
char buf[1024];
while(1)
{
ssize_t n = read(rfd, buf, sizeof(buf)-1);
cout << "father get message : " << buf << endl;
sleep(1);
}
}
int main()
{
int pipefd[2]{0};
int n = pipe(pipefd);
// 如果失败直接返回
if (n < 0)
{
exit(-1);
}
// 成功了,打印一下
std::cout << "pipefd[0]: " << pipefd[0] << endl;
std::cout << "pipefd[1]: " << pipefd[1] << endl;
// 开始创建进程
pid_t id = fork();
if (0 == id)
{
// 关闭进程的读端,让子进程进程 写
close(pipefd[0]);
my_write(pipefd[1]);
// 直接让子进程退出,不执行下面的代码
exit(0);
}
// 关闭父进程的写端,让父进程进程读
close(pipefd[1]);
my_read(pipefd[0]);
int status = 0;
int rid = waitpid(id, &status, 0); // 等待子进程,阻塞等待
if (id == rid)
{
std::cout << "error code: " << WEXITSTATUS(status) << endl;
std::cout << "error signal: " << (status & 0x7F) << endl;
}
return 0;
}
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <iostream>
using namespace std;
void my_write(int wfd)
{
const char *str = "hello father ,I am child";
char buf[1024];
int cnt = 0;
pid_t pid = getpid();
while (1)
{
snprintf(buf, sizeof(buf), "message: %s ,pid: %d , cnt: %d \n", str, pid, cnt);
write(wfd, buf, sizeof(buf));
cnt++;
sleep(1);
}
close(wfd);
}
void my_read(int rfd)
{
char buf[1024];
while(1)
{
ssize_t n = read(rfd, buf, sizeof(buf)-1);
cout << "father get message : " << buf << endl;
sleep(100);
}
}
int main()
{
int pipefd[2]{0};
int n = pipe(pipefd);
// 如果失败直接返回
if (n < 0)
{
exit(-1);
}
// 成功了,打印一下
std::cout << "pipefd[0]: " << pipefd[0] << endl;
std::cout << "pipefd[1]: " << pipefd[1] << endl;
// 开始创建进程
pid_t id = fork();
if (0 == id)
{
// 关闭进程的读端,让子进程进程 写
close(pipefd[0]);
my_write(pipefd[1]);
// 直接让子进程退出,不执行下面的代码
exit(0);
}
// 关闭父进程的写端,让父进程进程读
close(pipefd[1]);
my_read(pipefd[0]);
int status = 0;
int rid = waitpid(id, &status, 0); // 等待子进程,阻塞等待
if (id == rid)
{
std::cout << "error code: " << WEXITSTATUS(status) << endl;
std::cout << "error signal: " << (status & 0x7F) << endl;
}
return 0;
}
读端本身不会因为写端的关闭而被OS直接关闭,它仍需要由读进程显示地关闭或随着读进程的结束而关闭。
如果负责读的父进程,不结束就会。。。一直等待读(因为没有主动退出读的那个循环,我将my_read函数中的break注释了)
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <iostream>
using namespace std;
// 对于写端而已,不写了 && 关闭了pipe ,对于读端来说,读端就会把管道里的数据先读完,最后会读到返回值为0
// 关于write函数的写入,在写入字符串的时候,第三个参数的字节数大于要写入的字符串,是就写入字符串就停止了,小于是发生截断
// 在对管道写入的时候默认是追加
void my_write(int wfd)
{
const char *str = "hello father ,I am child";
char buf[128];
// cout << "sizeof(buf) " << sizeof(buf) << endl;
int cnt = 0;
pid_t pid = getpid();
// snprintf(buf, sizeof(buf), "message: %s ,pid: %d , cnt: %d \n", str, pid, cnt);
while (cnt < 10)
{
snprintf(buf, sizeof(buf), "message: %s ,pid: %d , cnt: %d \n", str, pid, cnt); // 多次写入会进行覆盖,第二次写入会覆盖之前的字符串
// cout << "buf -->" << buf << endl;
// cout << "sizeof(buf) " << sizeof(buf) << endl;
write(wfd, buf, sizeof(buf));
cout << "write time : " << cnt << endl;
cnt++;
sleep(1);
}
close(wfd);
cout << " wfd closed" << endl;
}
// read函数,就是将管道里的数据拷贝到目标缓冲区
// 从管道读数据写入到字符数组,对于数组数组来说,肯定是从起始位置覆盖式写。
// 如果是从字符数组读内容,写到管道,同一个进程的话,会因为文件偏移发生追加式写/
void my_read(int rfd)
{
char buf[1024];
while (1)
{
ssize_t n = read(rfd, buf, sizeof(buf) - 1);
// buf[n] = '\0';
// cout << "buf -->" << buf << endl;
if (n > 0)
{
// buf[n] = '\0';
cout << "father get message : " << buf << endl;
// buf[n] = '\0';
}
else if (0 == n)
{
cout << "chile close write pipe ..." << endl;
break;
}
else
{
cout << "read is error" << endl;
}
sleep(1);
}
}
int main()
{
int pipefd[2]{0};
int n = pipe(pipefd);
// 如果失败直接返回
if (n < 0)
{
exit(-1);
}
// 成功了,打印一下
std::cout << "pipefd[0]: " << pipefd[0] << endl;
std::cout << "pipefd[1]: " << pipefd[1] << endl;
// 开始创建进程
pid_t id = fork();
if (0 == id)
{
// 关闭进程的读端,让子进程进程 写
close(pipefd[0]);
my_write(pipefd[1]);
// 直接让子进程退出,不执行下面的代码
exit(0);
}
// 关闭父进程的写端,让父进程进程读
close(pipefd[1]);
my_read(pipefd[0]);
int status = 0;
int rid = waitpid(id, &status, 0); // 等待子进程,阻塞等待
if (id == rid)
{
std::cout << "error code: " << WEXITSTATUS(status) << endl;
std::cout << "error signal: " << (status & 0x7F) << endl;
}
return 0;
}
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <iostream>
using namespace std;
// 读端不在读 && 将读端关闭,OS会将写入的进程关闭(子进程),(通过信号 -13 SIGPIPE)
// 关于write函数的写入,在写入字符串的时候,第三个参数的字节数大于要写入的字符串,是就写入字符串就停止了,小于是发生截断
// 在对管道写入的时候默认是追加
void my_write(int wfd)
{
const char *str = "hello father ,I am child";
char buf[128];
// cout << "sizeof(buf) " << sizeof(buf) << endl;
int cnt = 0;
pid_t pid = getpid();
// snprintf(buf, sizeof(buf), "message: %s ,pid: %d , cnt: %d \n", str, pid, cnt);
while (1)
{
snprintf(buf, sizeof(buf), "message: %s ,pid: %d , cnt: %d \n", str, pid, cnt); // 多次写入会进行覆盖,第二次写入会覆盖之前的字符串
// cout << "buf -->" << buf << endl;
// cout << "sizeof(buf) " << sizeof(buf) << endl;
write(wfd, buf, sizeof(buf));
cout << "write time : " << cnt << endl;
cnt++;
sleep(1);
}
// close(wfd);
cout << " wfd closed" << endl;
}
// read函数,就是将管道里的数据拷贝到目标缓冲区
// 从管道读数据写入到字符数组,对于数组数组来说,肯定是从起始位置覆盖式写。
// 如果是从字符数组读内容,写到管道,同一个进程的话,会因为文件偏移发生追加式写/
void my_read(int rfd)
{
char buf[1024];
int cnt = 5;
while (cnt)
{
ssize_t n = read(rfd, buf, sizeof(buf) - 1);
// buf[n] = '\0';
// cout << "buf -->" << buf << endl;
if (n > 0)
{
// buf[n] = '\0';
cout << "father get message : " << buf << endl;
// buf[n] = '\0';
}
else if (0 == n)
{
cout << "chile close write pipe ..." << endl;
break;
}
else
{
cout << "read is error" << endl;
}
sleep(1);
cnt--;
}
close(rfd);
return ;
}
int main()
{
int pipefd[2]{0};
int n = pipe(pipefd);
// 如果失败直接返回
if (n < 0)
{
exit(-1);
}
// 成功了,打印一下
std::cout << "pipefd[0]: " << pipefd[0] << endl;
std::cout << "pipefd[1]: " << pipefd[1] << endl;
// 开始创建进程
pid_t id = fork();
if (0 == id)
{
// 关闭进程的读端,让子进程进程 写
close(pipefd[0]);
my_write(pipefd[1]);
// 直接让子进程退出,不执行下面的代码
exit(0);
}
// 关闭父进程的写端,让父进程进程读
close(pipefd[1]);
my_read(pipefd[0]);
int status = 0;
int rid = waitpid(id, &status, 0); // 等待子进程,阻塞等待
if (id == rid)
{
std::cout << "error code: " << WEXITSTATUS(status) << endl;
std::cout << "error signal: " << (status & 0x7F) << endl;
}
return 0;
}
当没有数据可读时:
O_NONBLOCK disable:read调用阻塞,即进程暂停执行,一直等到有数据来到为止。
O_NONBLOCK enable:read调用返回-1,errno值为EAGAIN。
当管道满的时候:
O_NONBLOCK disable: write调用阻塞,直到有进程读走数据O_NONBLOCK enable:调用返回-1,errno值为EAGAIN
如果所有管道写端对应的文件描述符被关闭,则read返回0
如果所有管道读端对应的文件描述符被关闭,则write操作会产生信号SIGPIPE,进而可能导致write进程退出
当要写入的数据量不大于PIPE_BUF时,linux将保证写入的原子性。当要写入的数据量大于PIPE_BUF时,linux将不再保证写入的原子性
进程通信,特别是通过管道进行的通信,确实具有一些显著的特性。以下是对您提出的四大特性的详细解释:
综上所述,进程通信(特别是通过管道进行的通信)具有自带同步和互斥机制、面向字节流、管道的生命周期随进程以及管道只能单向通信(半双工通信的一种特殊情况)等四大特性。这些特性使得管道成为一种简单、高效且易于管理的进程间通信方式。
进程池(Process Pool)是一种常见的多进程编程技术,旨在提高并发处理能力和资源利用率。
进程池适用于需要大量并行处理的任务场景,如:
综上所述,进程池是一种高效的多进程编程技术,能够在需要大量并行处理的任务中显著提升系统性能。然而,在使用进程池时也需要根据具体的应用场景进行权衡和选择,以充分发挥其优势并避免潜在的问题。
#pragma once
#include<iostream>
#include<cstdio>
#include<sys/types.h>
#include<sys/wait.h>
#include<unistd.h>
#include<cstring>
#include<vector>
#include<cstdlib>
#include<ctime>
#include"task.hpp"
using namespace std;
enum //匿名枚举,用来将错误码转化为错误信息
{
UsageError = 1,
ProcnumError,
PipeError
};
void Usage(const string& proc)
{
cout << "Usage:" << proc << "proc_num" << endl;
return ;
}
class Channel //先描述(类)
{
public:
Channel(int wfd, int id, string& name)
:_wfd(wfd),_id(id),_name(name)
{ }
int Wfd(){ return _wfd; }
string& Name(){ return _name;}
int Id(){ return _id; }
void Print()
{
cout << "_wfd: " << _wfd;
cout << ",_id: " << _id;
cout << ",_name: " << _name << endl;
}
~Channel(){ }
private:
int _wfd;
pid_t _id;
string _name;
};
class Processpol //进程池
{
public:
Processpol(int proc_num)
:_proc_num(proc_num)
{ }
vector<int> fds; //用来关闭struct file被多个文件描述符指向
int CreatProcess(work_t work) //1.创建一组子进程、命令行管道
{
for(int i = 0; i < _proc_num; i++)
{
//父进程需要对管道进行管理,否则会被覆盖
int pipefd[2]{0};
int n = pipe(pipefd);
if(n == -1) return PipeError; //创建管道失败
pid_t id = fork();
if(id == 0) //子进程
{
if (!fds.empty())
for (auto& fd : fds) close(fd);
close(pipefd[1]); //关闭管道的写端
dup2(pipefd[0], 0); //将标准输入重定向到管道读端
work(pipefd[0]); //执行任务 —— 回调函数
exit(0); //终止
}
//父进程
close(pipefd[0]); //关闭管道的读端
string name = "channel-" + to_string(i); //管道名字
_channels.push_back(Channel(pipefd[1], id, name));
fds.push_back(pipefd[1]);
}
return 0;
}
//2.1.选择一个子进程、管道,来执行任务 —— 父进程'负载均衡'式的给子进程分配任务
int NextChannel()
{
static int next = 0; //静态,不属于任何对象,全局, 用来定位进程池中的各个进程、管道
int n = next;
next++;
next %= _channels.size(); //不可越界
return n;
}
//2.3.发送任务编号到管道写端,以供子进程读取
void SendTaskCode(int c_code, int t_code)
{
cout << "send code " << t_code << " to " << _channels[c_code].Name() << " proc_id " << _channels[c_code].Id() << endl;
write(_channels[c_code].Wfd(), &t_code, sizeof(t_code)); //父进程将数据写入管道的写端
sleep(1);
}
//3.回收资源
/*因为子进程会继承父进程的文件描述符表,子进程会将父进程中以w方式打开上一文件的fd,
使得上一文件被多个指针指向,所以关闭父进程写端,此文件的struct file中引用计数--,并未全部关闭*/
void RecycleProc()
{
for(auto& e : _channels)
{
close(e.Wfd()); //关闭当前被打开文件(管道)的写端
int rid = waitpid(e.Id(), NULL, 0); //回收子进程资源
if(rid > 0) //等待成功
cout << "wait process " << e.Id() << "sucess" << endl;
cout << e.Name() << " close done. process " << e.Id() <<"quit now!" << endl;
}
}
void Debug() //打印子进程、命令行管道相关信息
{
for(auto& e : _channels)
{
e.Print();
}
}
~Processpol(){ }
private:
vector<Channel> _channels; //再组织
int _proc_num; //预先创建的子进程数量
};
void CtrlProcessPool(Processpol* ptr, int cnt)
{
while(cnt--)
{
int c_code = ptr->NextChannel();
int t_code = NextCode();
ptr->SendTaskCode(c_code, t_code);
}
}
int main(int argc, char* argv[])
{
//设置随机数生成的种子(起始值),若无srand,则每次程序启动随机数序列均相同,使用当前时间(时间戳)来初始化种子
srand((uint64_t)time(nullptr)); //unit64_t为无符号64位整数,long long别名
//输入错误
if(argc != 2)
{
Usage(argv[0]);
return UsageError;
}
//预设创建的进程数须>0
int proc_num = stoi(argv[1]);
if(proc_num <= 0)
return ProcnumError;
Processpol* ptr = new Processpol(proc_num); //创建一个进程池
//1.创建一组子进程、命令行管道
ptr->CreatProcess(Work);
ptr->Debug(); //打印子进程、命令行管道相关信息
//2.控制子进程
CtrlProcessPool(ptr, 5);
cout << "task finish......" << endl;
//3.回收资源: 子进程先要退出(关闭写端),再回收子进程资源
ptr->RecycleProc();
return 0;
}
————————————————
版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
原文链接:https://blog.csdn.net/m0_74808907/article/details/143231476
#include<cstdio>
#include<iostream>
#include<unistd.h>
#include <sys/types.h>
using namespace std;
//函数指针类型
typedef void(*work_t)(int rfd);
typedef void(*task_t)(int rfd, int id);
//任务
void PrintLog(int rfd, int id)
{
cout << "rfd: " << rfd << " process: " << id << " is working: " << " printf log task!" << endl;
}
void ReloadConf(int rfd, int id)
{
cout << "rfd: " << rfd << " process: " << id << " is working: " << " reload conf task!" << endl;
}
void ConnectMysql(int rfd, int id)
{
cout<< "rfd: " << rfd << " process: " << id << " is working: " << "Connect Mysql task!" << endl;
}
task_t task[3] = {PrintLog, ReloadConf, ConnectMysql };
int NextCode() //2.2.选择要执行任务的编号
{
return rand() % 3; //生成一个伪随机数
}
void Work(int rfd) //2.4.子进程接受并处理任务
{
while(true)
{
int code = 0;
int n = read(0, &code, sizeof(code)); //子进程从管道的读端读取数据
if(n > 0)
{
if(code >= 3) continue;
task[code](rfd, getpid());
}
else if(n == 0) //情况3,写端关闭,读取到了结尾
{
cout << "read ending!" << endl;
break;
}
else
{
cout << "read error!" << endl;
break;
}
sleep(1);
}
}
命名管道(Named Pipes),也被称为命名管线或FIFO(First In First Out,先进先出),是一种简单的进程间通信(IPC)机制。以下是对命名管道的详细介绍:
命名管道是一种特殊的文件类型,它允许在同一台计算机的不同进程之间或在跨越一个网络的不同计算机的不同进程之间,进行可靠的、单向或双向的数据通信。它克服了传统管道没有名字、只能用于具有亲缘关系的进程间通信的限制,通过提供一个路径名与之关联,使得任何可以访问该路径的进程都能够通过命名管道进行通信。
mkfifo()
函数是一个系统调用,用于在UNIX和类UNIX操作系统(如Linux)中创建命名管道(Named Pipe),也称为FIFO(First In First Out)特殊文件。命名管道允许不同进程之间按照先进先出的顺序进行数据交换。
参数
mode
:设置命名管道的权限模式,类似于open()
函数中的模式参数。这些权限位将决定哪些用户可以读写管道。返回值
mkfifo()
返回0。errno
以指示错误类型。unlink()
函数是一个系统调用,用于删除一个目录项(通常是一个文件或命名管道)。它并不直接删除文件内容,而是从文件系统的目录结构中删除指定的文件名,从而使其不可访问。如果该文件是被多个进程打开的,那么unlink()
调用不会立即删除文件内容,而是等到最后一个打开该文件的进程关闭文件描述符时,文件内容才会被真正删除。这种行为对于实现临时文件和命名管道等资源的清理特别有用。
参数
pathname
:指向一个以null结尾的字符串,该字符串指定了要删除的文件的路径和文件名。返回值
unlink()
返回0。errno
以指示错误类型。#include<iostream>
#include<cstdio>
#include<sys/types.h>
#include<sys/stat.h>
#include<cstring>
#include<errno.h>
#include<unistd.h>
#include <fcntl.h>
using namespace std;
#define Mode 0666 //指定命名管道的权限
#define Path "./fifo" //路径名
class Fifo
{
public:
Fifo(const string& path)
:_path(path)
{
umask(0);
int n = mkfifo(Path, Mode); //创建命名管道
if(n == 0)
{
cout << "mkfifo success!" << endl;
}
else
{
cout << "mkfifo failed! errno: " << errno << "errdesc: " << strerror(errno) << endl;
}
}
~Fifo()
{
int n = unlink(Path); //删除命名管道
if(n == 0)
{
cout << "remove file success!" << endl;
}
else
{
cout << "remove file failed! errno: " << errno << "errdesc: " << strerror(errno) << endl;
}
}
private:
string _path;
};
#include"name_pipe.hpp"
int main()
{
//1.创建命名管道
Fifo fifo(Path);
//2.以读的方式打开命名管道
int fd = open(Path, O_RDONLY);
if(fd <= 0)
{
cout << "open failed! errno: " << errno << "errdesc: " << strerror(errno) << endl;
return 1;
}
cout << "open success!" << endl;
//3.从命名管道中读取数据
char buffer[128];
while(1)
{
int n = read(fd, buffer, sizeof(buffer));
if(n > 0)
{
buffer[n] = '\0';
cout << "client say: " << buffer << endl;
}
else if(n == 0)
{
cout << "client quit! me to...." << endl;
break;
}
else
{
cout << "read failed! errno: " << errno << "errdesc: " << strerror(errno) << endl;
break;
}
}
}
我的博客即将同步至腾讯云开发者社区,邀请大家一同入驻:https://cloud.tencent.com/developer/support-plan?invite_code=cuhg6omf704