目录
前言
1.进程间通信介绍
1.1 进程间通信目的
1.2 进程间通信发展
1.3 进程间通信分类
1.3.1 管道
1.3.2 System V IPC
1.3.3 POSIX IPC
2.管道
2.1 什么是管道
2.2 匿名管道
2.2.1 pipe函数
2.2.2 调用pipe函数的进程
2.2.3 实例代码
2.2.4 用fork来共享管道原理
2.2.5 站在文件描述符角度-深度理解管道
2.2.6 站在内核角度-管道本质
2.2.7 管道读写规则
2.2.8 匿名管道特点
2.2.9 进程池
2.2.9.1 processpool.cc
2.2.9.2 test.hpp
2.2.9.3 效果
2.3 命名管道
2.3.1 创建命名管道
2.3.2 删除管道
2.3.3 用命名管道实现server&client通信
2.3.3.1 namedPipe.hpp
2.3.3.2 server.cc
2.3.3.3 client.cc
2.3.3.4 效果
3.system V共享内存
3.1 共享内存示意图
3.2 共享内存数据结构
3.3 共享内存函数
3.3.1 shmget函数
3.3.2 shmat函数
3.3.3 shmdt函数
3.3.4 shmctl函数
3.4 实例代码
3.4.1 namedPipe.hpp
3.4.2 Shm.hpp
3.4.3 server.cc
3.4.4 client.cc
5.system V消息队列
6.system V信号量
6.1 信号量的操作
6.2 进程互斥
7.OS管理
当前我们使用的环境更换为Ubuntu
vscode远程连接指南:VScode远程连接虚拟机(ubuntu系统)_vscode连接ubuntu-CSDN博客
vscode配置插件:
推荐插件
免配置:
#include <unistd.h>
功能:创建一无名管道
原型
int pipe(int fd[2]);
参数
fd:文件描述符数组,其中fd[0]表示读端, fd[1]表示写端
返回值:成功返回0,失败返回错误代码
例子:从键盘读取数据,写入管道,读取管道,写到屏幕
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
int main()
{
int fds[2];
char buf[100];
int len;
if (pipe(fds) == -1)
perror("make pipe"), exit(1);
// read from stdin
while (fgets(buf, 100, stdin)) {
len = strlen(buf);
// write into pipe
if (write(fds[1], buf, len) != len) {
perror("write to pipe");
break;
}
memset(buf, 0x00, sizeof(buf));
// read from pipe
if ((len = read(fds[0], buf, 100)) == -1) {
perror("read from pipe");
break;
}
// write to stdout
if (write(1, buf, len) != len) {
perror("write to stdout");
break;
}
}
}
#include<iostream>
#include<unistd.h>
#include<cerrno>
#include<cstring>
using namespace std;
int main()
{
//1.创建管道
int pipefd[2];
int n=pipe(pipefd);//输出型参数
if(n!=0)
{
cerr<<"errno:"<<errno<<"errstring:"<<strerror(errno)<<endl;
return 1;
}
//pipefd[0]->0->r(读)
//pipefd[1]->1->w(写)
cout<<"pipefd[0]:"<<pipefd[0]<<" "<<"pipefd[1]:"<<pipefd[1]<<endl;
return 0;
}
所以,看待管道,就如同看待文件一样!管道的使用和文件一致,迎合了“Linux一切皆文件思想”
#include <unistd.h>
#include <stdlib.h>
#include <stdio.h>
#include <errno.h>
#include <string.h>
#include <sys/types.h>
#define ERR_EXIT(m) do { perror(m); exit(EXIT_FAILURE); } while(0)
int main(int argc, char* argv[])
{
int pipefd[2];
if (pipe(pipefd) == -1) ERR_EXIT("pipe error");
pid_t pid;
pid = fork();
if (pid == -1)
ERR_EXIT("fork error");
if (pid == 0) {
close(pipefd[0]);
write(pipefd[1], "hello", 5);
close(pipefd[1]);
exit(EXIT_SUCCESS);
}
close(pipefd[1]);
char buf[10] = { 0 };
read(pipefd[0], buf, 10);
printf("buf=%s\n", buf);
return 0;
}
1. 当没有数据可读时
2. 当管道满的时候
3. 如果所有管道写端对应的文件描述符被关闭,则read返回0 4. 如果所有管道读端对应的文件描述符被关闭,则write操作会产生信号SIGPIPE,进而可能导致write进程退出 5. 当要写入的数据量不大于PIPE_BUF时,linux将保证写入的原子性 6. 当要写入的数据量大于PIPE_BUF时,linux将不再保证写入的原子性
我们用匿名管道实现一个进程池
#include<iostream>
#include<unistd.h>
#include<string>
#include<vector>
#include<sys/wait.h>
#include<sys/types.h>
#include"Test.hpp"
using namespace std;
//master
class Channel
{
public:
Channel(int wfd,pid_t id,const string& name)
:_wfd(wfd)
,_subprocessid(id)
,_name(name)
{}
int GetWfd(){return _wfd;}
pid_t GetProcessId(){return _subprocessid;}
string GetName(){return _name;}
void CloseChannel(){close(_wfd);}
void Wait()
{
pid_t rid=waitpid(_subprocessid,nullptr,0);
if(rid>0)
{
cout<<"wait "<<rid<<" success "<<endl;
}
}
~Channel()
{}
private:
int _wfd;
pid_t _subprocessid;
string _name;
};
//形参类型命名规范
//const &:输出
//& 输入输出
//* 输出型参数
void CreateChannelAndSub(vector<Channel>*Channels ,int num,task_t task)
{
for(int i=0;i<num;i++)
{
//1.创建管道
int pipefd[2]={0};
int n=pipe(pipefd);
if(n<0) exit(1);
//2.创建子进程
pid_t id=fork();
if(id==0)
{
if(!Channels->empty())
{
//第二次之后创建新的管道
for(auto &Channel:*Channels)
{
Channel.CloseChannel();
}
}
//child - read
close(pipefd[1]);
//work(pipefd[0]);
dup2(pipefd[0],0);
task();
close(pipefd[0]);
exit(0);
}
//3.构建一个channel名称
string channel_name="Channel-"+to_string(i);
//father - write
close(pipefd[0]);
//子进程的pid
//父进程关心的管道write
Channels->push_back(Channel(pipefd[1],id,channel_name));
}
}
int NextChannel(int channelnum)
{
static int next=0;
int channel=next;
next++;
next%=channelnum;
return channel;
}
void SendTaskCommand(Channel& Channel,int taskcommand)
{
write(Channel.GetWfd(),&taskcommand,sizeof(taskcommand));
}
void ctrlProcessOnce(vector<Channel>& Channels)
{
sleep(1);
//a.选择一个任务
int taskcommand=SelectTask();
//b.选择一个信道和进程
int channel_index=NextChannel(Channels.size());
//c.发送任务
SendTaskCommand(Channels[channel_index],taskcommand);
cout<<"taskcommand: "<<taskcommand<<"channel: "\
<<Channels[channel_index].GetName()<<"sub process: "\
<<Channels[channel_index].GetProcessId()<<endl;
}
void ctrlProcess(vector<Channel>& Channels,int times=-1)
{
if(times>0)
{
while(times--)
{
ctrlProcessOnce(Channels);
}
}
else{
while(true)
{
ctrlProcessOnce(Channels);
}
}
}
void CleanUpChannels(vector<Channel>& Channels)
{
for(auto &Channel:Channels)
{
Channel.CloseChannel();
Channel.Wait();
}
// for(auto &Channel:Channels)
// {
// Channel.CloseChannel();
// }
// for(auto &Channel:Channels)
// {
// Channel.Wait();
// }
}
// ./processpool 5
int main(int argc,char *argv[])
{
if(argc!=2)
{
cerr<<"Usage: "<<argv[0]<<" processnum "<<endl;
return 1;
}
int num=stoi(argv[1]);
LoadTask();
vector<Channel> Channels;
//1.创建信道和子进程
CreateChannelAndSub(&Channels,num,work);
//2.通过channel控制子进程
ctrlProcess(Channels,10);
//3.回收管道和子进程
//a.关闭所有的写端
//b.回收子进程
CleanUpChannels(Channels);
// for(auto&Channel:Channels)
// {
// cout<<"======================"<<endl;
// cout<<Channel.GetName()<<endl;
// cout<<Channel.GetWfd()<<endl;
// cout<<Channel.GetProcessId()<<endl;
// }
// sleep(100);
return 0;
}
#pragma once
#include<iostream>
#include<ctime>
#include<cstdlib>
#include<sys/types.h>
#include<unistd.h>
#define TaskNum 3
typedef void (*task_t)();//task_t函数指针类型
using namespace std;
task_t tasks[TaskNum];
void Print()
{
cout<<"I am Print Task"<<endl;
}
void Downland()
{
cout<<"I am Downland Task"<<endl;
}
void Flush()
{
cout<<"I am Flush Task"<<endl;
}
void LoadTask()
{
srand(time(nullptr)^getpid());
tasks[0]=Print;
tasks[1]=Downland;
tasks[2]=Flush;
}
void ExcuteTask(int num)
{
if(num<0||num>2) return;
tasks[num]();
}
int SelectTask()
{
return rand()%TaskNum;
}
void work()
{
while(true)
{
int command=0;
int n=read(0,&command,sizeof(command));
if(n==sizeof(int))
{
cout<<"pid is: "<<getpid()<<"handler task "<<endl;
ExcuteTask(command);
}
else if(n==0)
{
cout<<"sub process: "<<getpid()<<" quit "<<endl;
break;
}
}
}
命名管道可以从命令行上创建,命令行方法是使用下面这个命令
$ mkfifo filename
命名管道也可以从程序里创建,相关函数有
int mkfifo(const char *filename,mode_t mode);
int CreateNamedPipe(const std::string &path)
{
int res=mkfifo(path.c_str(),0666);
if(res!=0)
{
perror("mkfifo");
}
return res;
}
int RemoveNamedPipe(const std::string &path)
{
int res=unlink(path.c_str());
if(res!=0)
{
perror("unlink");
}
return res;
}
#pragma once
#include<iostream>
#include<sys/types.h>
#include<sys/stat.h>
#include<string>
#include<cstdio>
#include<cerrno>
#include<unistd.h>
#include<fcntl.h>
const std::string comm_path="./myfifo";
#define DefaultFd -1
#define Creater 1
#define User 2
#define Read O_RDONLY
#define Write O_WRONLY
#define BaseSize 4096
class NamedPipe
{
private:
bool OpenNamedPipe(int mode)
{
_fd=open(_fifo_path.c_str(),mode);
if(_fd<0) return false;
return true;
}
public:
NamedPipe(const std::string &path,int who)
:_fifo_path(path)
,_id(who)
,_fd(DefaultFd)
{
if(_id==Creater)
{
int res=mkfifo(_fifo_path.c_str(),0666);
if(res!=0)
{
perror("mkfifo");
}
std::cout<<"creater create named pipe"<<std::endl;
}
}
bool OpenForRead()
{
return OpenNamedPipe(Read);
}
bool OpenForWrite()
{
return OpenNamedPipe(Write);
}
int ReadNamedPipe(std::string *out)
{
char buffer[BaseSize];
int n=read(_fd,buffer,sizeof(buffer));
if(n>0)
{
buffer[n]=0;
*out=buffer;//输出到out
}
return n;
}
int WriteNamedPipe(const std::string &in)
{
return write(_fd,in.c_str(),in.size());
}
~NamedPipe()
{
if(_id==Creater)
{
//sleep(5);
int res=unlink(_fifo_path.c_str());
if(res!=0)
{
perror("unlink");
}
std::cout<<"creater free named pipe"<<std::endl;
}
if(_fd!=DefaultFd)close(_fd);
}
private:
const std::string _fifo_path;
int _id;
int _fd;
};
// int CreateNamedPipe(const std::string &path)
// {
// int res=mkfifo(path.c_str(),0666);
// if(res!=0)
// {
// perror("mkfifo");
// }
// return res;
// }
// int RemoveNamedPipe(const std::string &path)
// {
// int res=unlink(path.c_str());
// if(res!=0)
// {
// perror("unlink");
// }
// return res;
// }
#include "namedPipe.hpp"
int main()
{
// read
NamedPipe fifo(comm_path, Creater);
//对于读端而言,如果我们打开文件,但是写端还没来
//我会阻塞在open调用中,直到对方打开
//进程同步
if (fifo.OpenForRead())
{
std::cout<<"server open named pipe done"<<std::endl;
sleep(3);
while (true)
{
std::string message;
int n = fifo.ReadNamedPipe(&message);
if (n > 0)
{
std::cout << "client say> " << message << std::endl;
}
else if(n==0)
{
std::cout<<"client quit, server too!"<<std::endl;
break;
}
else
{
std::cout<<"fifo.ReadNamedPipe Error"<<std::endl;
break;
}
}
}
// CreateNamedPipe(comm_path);
// sleep(5);
// RemoveNamedPipe(comm_path);
return 0;
}
#include "namedPipe.hpp"
int main()
{
// write
NamedPipe fifo(comm_path, User);
std::cout<<"client open named pipe done"<<std::endl;
while (true)
{
//std::cout<<"client open named pipe done"<<std::endl;
if (fifo.OpenForWrite())
{
std::cout << "Please Enter> ";
std::string message;
std::getline(std::cin, message);
fifo.WriteNamedPipe(message);
}
}
return 0;
}
共享内存区是最快的IPC形式。一旦这样的内存映射到共享它的进程的地址空间,这些进程间数据传递不再涉及到内核,换句话说是进程不再通过执行进入内核的系统调用来传递彼此的数据
struct shmid_ds {
struct ipc_perm shm_perm; /* operation perms */
int shm_segsz; /* size of segment (bytes) */
__kernel_time_t shm_atime; /* last attach time */
__kernel_time_t shm_dtime; /* last detach time */
__kernel_time_t shm_ctime; /* last change time */
__kernel_ipc_pid_t shm_cpid; /* pid of creator */
__kernel_ipc_pid_t shm_lpid; /* pid of last operator */
unsigned short shm_nattch; /* no. of current attaches */
unsigned short shm_unused; /* compatibility */
void *shm_unused2; /* ditto - used by DIPC */
void *shm_unused3; /* unused */
};
功能:用来创建共享内存
原型
int shmget(key_t key, size_t size, int shmflg);
参数
key:这个共享内存段名字
size:共享内存大小
shmflg:由九个权限标志构成,它们的用法和创建文件时使用的mode模式标志是一样的
返回值:成功返回一个非负整数,即该共享内存段的标识码;失败返回
功能:将共享内存段连接到进程地址空间
原型
void *shmat(int shmid, const void *shmaddr, int shmflg);
参数
shmid: 共享内存标识
shmaddr:指定连接的地址
shmflg:它的两个可能取值是SHM_RND和SHM_RDONLY
返回值:成功返回一个指针,指向共享内存第一个节;失败返回-1
说明:
功能:将共享内存段与当前进程脱离
原型
int shmdt(const void *shmaddr);
参数
shmaddr: 由shmat所返回的指针
返回值:成功返回0;失败返回-1
注意:将共享内存段与当前进程脱离不等于删除共享内存段
功能:用于控制共享内存
原型
int shmctl(int shmid, int cmd, struct shmid_ds *buf);
参数
shmid:由shmget返回的共享内存标识码
cmd:将要采取的动作(有三个可取值)
buf:指向一个保存着共享内存的模式状态和访问权限的数据结构
返回值:成功返回0;失败返回-1
代码部分可自行打开注释测试
#pragma once
#include<iostream>
#include<sys/types.h>
#include<sys/stat.h>
#include<string>
#include<cstdio>
#include<cerrno>
#include<unistd.h>
#include<fcntl.h>
const std::string comm_path="./myfifo";
#define DefaultFd -1
#define Creater 1
#define User 2
#define Read O_RDONLY
#define Write O_WRONLY
#define BaseSize 4096
class NamedPipe
{
private:
bool OpenNamedPipe(int mode)
{
_fd=open(_fifo_path.c_str(),mode);
if(_fd<0) return false;
return true;
}
public:
NamedPipe(const std::string &path,int who)
:_fifo_path(path)
,_id(who)
,_fd(DefaultFd)
{
if(_id==Creater)
{
int res=mkfifo(_fifo_path.c_str(),0666);
if(res!=0)
{
perror("mkfifo");
}
std::cout<<"creater create named pipe"<<std::endl;
}
}
bool OpenForRead()
{
return OpenNamedPipe(Read);
}
bool OpenForWrite()
{
return OpenNamedPipe(Write);
}
int ReadNamedPipe(std::string *out)
{
char buffer[BaseSize];
int n=read(_fd,buffer,sizeof(buffer));
if(n>0)
{
buffer[n]=0;
*out=buffer;//输出到out
}
return n;
}
int WriteNamedPipe(const std::string &in)
{
return write(_fd,in.c_str(),in.size());
}
~NamedPipe()
{
if(_id==Creater)
{
//sleep(5);
int res=unlink(_fifo_path.c_str());
if(res!=0)
{
perror("unlink");
}
std::cout<<"creater free named pipe"<<std::endl;
}
if(_fd!=DefaultFd)close(_fd);
}
private:
const std::string _fifo_path;
int _id;
int _fd;
};
#ifndef __SHM_HPP__
#define __SHM_HPP__
#include <iostream>
#include <sys/ipc.h>
#include <sys/shm.h>
#include <cstdio>
#include <cerrno>
#include <string>
#include <unistd.h>
#include <cstring>
const int gCreater = 1;
const int gUser = 2;
const std::string gpathname = "/home/dc/lesson31pipe/4.shm";
const int gproj_id = 0x66;
const int gShmsize = 4096;
class Shm
{
private:
key_t GetCommKey()
{
key_t k = ftok(_pathname.c_str(), _proj_id);
if (k < 0)
{
perror("ftok");
}
return k;
}
int GetShmHelper(key_t key, int size, int flag)
{
int shmid = shmget(key, size, flag);
if (shmid < 0)
{
perror("shmget");
}
return shmid;
}
std::string RoleToString(int _who)
{
if (_who == gCreater)
return "Creater";
else if (_who == gUser)
return "User";
else
return "None";
}
void *AttachShm()
{
if (_addrshm != nullptr)
DetachShm(_addrshm);
void *shmaddr = shmat(_shmid, nullptr, 0);
if (shmaddr == nullptr)
{
perror("shmat");
}
std::cout << "who: " << RoleToString(_who) << " attach shm..." << std::endl;
return shmaddr;
}
void DetachShm(void *shmaddr)
{
if (shmaddr == nullptr)
return;
shmdt(shmaddr);
std::cout << "who: " << RoleToString(_who) << " detach shm..." << std::endl;
}
public:
Shm(const std::string &pathname, int proj_id, int who)
: _pathname(pathname), _proj_id(proj_id), _who(who), _addrshm(nullptr)
{
_key = GetCommKey();
if (_who == gCreater)
GetShmUseCreate();
else if (_who == gUser)
GetShmForUse();
_addrshm = AttachShm();
std::cout << "shmid: " << _shmid << std::endl;
std::cout << "key: " << ToHex(_key) << std::endl;
}
std::string ToHex(key_t key)
{
char buffer[128];
snprintf(buffer, sizeof(buffer), "0x%x", _key);
return buffer;
}
bool GetShmUseCreate()
{
if (_who == gCreater)
{
_shmid = GetShmHelper(_key, gShmsize, IPC_CREAT | IPC_EXCL | 0666);
// sleep(5);
if (_shmid >= 0)
return true;
std::cout << "shm create done..." << std::endl;
}
return false;
}
bool GetShmForUse()
{
if (_who == gUser)
{
_shmid = GetShmHelper(_key, gShmsize, IPC_CREAT | 0666);
// sleep(5);
if (_shmid >= 0)
return true;
std::cout << "shm get done..." << std::endl;
}
return false;
}
void Zero()
{
if (_addrshm)
{
memset(_addrshm, 0, gShmsize);
}
}
void *Addr()
{
return _addrshm;
}
~Shm()
{
if (_who == gCreater)
{
int res = shmctl(_shmid, IPC_RMID, nullptr);
}
std::cout << "shm remove done..." << std::endl;
}
private:
key_t _key;
int _shmid;
std::string _pathname;
int _proj_id;
int _who;
void *_addrshm;
};
#endif
#include "Shm.hpp"
#include "namedPipe.hpp"
int main()
{
//1.创建共享内存
Shm shm(gpathname, gproj_id, gCreater);
// char *addr = (char *)shm.AttachShm();
// shm.DetachShm(addr);
//shm.Zero();
char *shmaddr=(char*)shm.Addr();
//2.创建管道
// NamedPipe fifo(comm_path,Creater);
// fifo.OpenForRead();
//int cnt=10;
while(true)
{
std::string temp;
//fifo.ReadNamedPipe(&temp);
std::cout<<"shm memory content: "<<shmaddr<<std::endl;
sleep(1);
}
// key_t key=GetCommKey(gpathname,gproj_id);
// std::cout<<"key: "<<ToHex(key)<<std::endl;
// int shmid=ShmGet(key,4096);
// std::cout<<"shmid: "<<shmid<<std::endl;
return 0;
}
#include "Shm.hpp"
#include "namedPipe.hpp"
int main()
{
// 1.创建共享内存
Shm shm(gpathname, gproj_id, gUser);
// char *addr = (char *)shm.AttachShm();
// shm.DetachShm(addr);
shm.Zero();
char *shmaddr = (char *)shm.Addr();
sleep(3);
// 2.打开管道
// NamedPipe fifo(comm_path, User);
// fifo.OpenForWrite();
// 当成string
char ch = 'A';
while (ch <= 'Z')
{
shmaddr[ch - 'A'] = ch;
ch++;
// std::string temp = "wake up";
// std::cout << "add " << ch << " into Shm, "
// << "wakeup reader" << std::endl;
// fifo.WriteNamedPipe(temp);
sleep(2);
//ch++;
}
// key_t key=GetCommKey(gpathname,gproj_id);
// std::cout<<"key: "<<ToHex(key)<<std::endl;
// //int shmid=ShmGet(key,4096);
// //std::cout<<"shmid: "<<shmid<<std::endl;
return 0;
}
注意:共享内存没有进行同步与互斥!
特性方面
5个概念:
特性方面
操作系统如何把共享内存,消息队列,信号量统一管理起来:
板书/20240427_namedpipe_shm.png · whb-helloworld/111 - 码云 - 开源中国 (gitee.com)