在操作系统中,内核空间是所有进程共用的,而每个进程都拥有自己的用户空间。这种结构类似于一个公共图书馆,每个进程都有自己的阅读室”(用户空间),但它们都可以通过特定的走廊”(系统调用)访问这个图书馆(内核空间)。
进程之间能够通信,正是因为它们可以通过这个共享的内核空间进行交互。尽管每个进程的用户空间是相互独立的,但它们与内核空间之间并不是完全隔离的,而是通过系统调用这个走廊”进行沟通。
在Linux系统中,采用了两级保护机制,其中0级为内核使用,3级为用户程序使用。每个进程都拥有自己的私有用户空间(0~3G),这个空间对其他进程是不可见的。而最高的1GB字节虚拟内核空间为所有进程和内核所共享。
内核空间中存储的是内核代码和数据,而用户空间中存储的是用户程序的代码和数据。无论是内核空间还是用户空间,它们都处于虚拟空间中。尽管内核空间在每个虚拟空间中都占据了最高的1GB字节,但映射到物理内存时,总是从最低地址(0x00000000)开始。
通过一幅图来解释进程间通信的原理,可以看到,尽管进程之间有空间隔离,但它们都与内核相连,可以通过特殊的系统调用与内核进行沟通,从而实现与其他进程的通信。这就像不同的房间虽然相互独立,但都通过管道与一个中央控制室相连。
在这个比喻中,进程就是各个房间,而内核就是中央控制室。进程虽然不能直接访问其他进程的用户空间,但可以通过系统调用与内核进行交互,内核再将信息传递给其他进程,从而实现进程间通信。
例如,当一个进程需要向另一个进程发送数据时,它可以通过系统调用将数据写入内核空间的特定区域,内核再通知目标进程从该区域读取数据。
进程间通信机制由存在于内核空间的通信中枢和存在于用户空间的通信接口组成,两者关系紧密。通信中枢就如同邮局或基站,为通信提供核心机制;通信接口则像信纸或手机,为用户提供使用通信机制的方法。
为了更直观地理解进程间通信机制的结构,我们可以通过以下图示来展示:
用户通过通信接口让通信中枢建立通信信道或传递通信信息。例如,在使用共享内存进行进程间通信时,用户通过特定的系统调用接口(通信接口)请求内核空间的通信中枢为其分配一块共享内存区域,并建立起不同进程对该区域的访问路径。
通信中枢建立好通信信道后,通信双方之后的通信不需要通信中枢的协助。这就如同两个房间之间打开了一扇门,双方可以直接通过这扇门进行交流,而不需要中间人的帮忙。
但是,由于通信信息的传递不需要通信中枢的协助,通信双方需要进程间同步,以保证数据读写的一致性。否则,就可能出现数据踩踏或者读到垃圾数据的情况。比如,多个进程同时对共享内存进行读写操作时,需要通过信号量等机制来确保在同一时间只有一个进程能够进行写操作,避免数据冲突。
通信中枢建立好通信信道后,每次通信还都需要通信中枢的协助。这种方式就像一个中间人在两个房间之间传递信息,每次传递都需要经过中间人。
消息传递式又分为有边界消息和无边界消息。无边界消息是字节流,发过来是一个一个的字节,要靠进程自己设计如何区分消息的边界。有边界消息的发送和接收都是以消息为基本单位,类似于一封封完整的信件,接收方可以明确地知道每个消息的开始和结束位置。
按照通信双方的关系,可分为对称型通信和非对称型通信:
进程间通信机制一般要实现三类接口:
如何建立通信信道,谁去建立通信信道。对于对称型通信来说,谁去建立通信信道无所谓,有一个人去建立就可以了,后者直接加入通信信道。对于非对称型通信,一般是由服务端、消费者建立通信信道,客户端、生产者则加入这个通信信道。不同的进程间通信机制,有不同的接口来创建信道。例如,在使用共享内存时,可以通过特定的系统调用(如 shmget)来创建共享内存区域,建立通信信道。
后者如何找到并加入这个通信信道。一般情况是,双方通过提前约定好的信道名称找到信道句柄,通过信道句柄加入通信信道。但是有的是通过继承把信道句柄传递给对方,有的是通过其它进程间通信机制传递信道句柄,有的则是通过信道名称直接找到信道,不需要信道句柄。
如何使用通信信道。一旦通信信道建立并加入成功,进程就需要知道如何正确地使用通信信道进行数据的读写操作。例如,在使用管道进行通信时,进程需要明确知道哪个文件描述符是用于读,哪个是用于写,以及在读写过程中的各种规则和特殊情况的处理。
⑴匿名管道:匿名管道通常用于临时的、简单的数据传输,仅用于有亲缘关系的进程。当使用 fork 函数创建子进程时,子进程会继承父进程的文件描述符表。父进程通过 pipe 函数自动以读写的方式打开同一个管道文件,并将文件描述符返回给一个数组。其中,数组的一个元素存储以读的方式打开管道文件所返回的文件描述符,另一个元素存储以写的方式打开管道文件所返回的文件描述符。
站在文件描述符角度深度理解管道,子进程拷贝父进程后,就不需要再以读或者写的方式打开管道文件了。确保管道通信的单向性,父子进程要分别关闭读端和写端。例如,如果希望数据从父进程流向子进程,就关闭父进程的读端,子进程的写端;如果希望数据从子进程流向父进程,就关闭父进程的写端,子进程的读端。
#include<iostream>
#include<cassert>
#include<unistd.h>
#include<string.h>
#include<sys/types.h>
#include<sys/wait.h>
#include<cstdlib>
using namespace std;
#define MAX 1024
int main()
{
int pipefd[2]={0};
int n=pipe(pipefd);
assert(n==0);
cout<<"pipefd[0]:"<<pipefd[0]<<" pipefd[1]:"<<pipefd[1]<<endl;
//创建子进程
pid_t pid=fork();
//判断创建是否失败
if(pid<0){
perror("fork");
exit(1);
}
//子进程
else if(pid==0){
//关闭读端
close(pipefd[0]);
int cnt = 10;
while(cnt)
{
char message[MAX];
snprintf(message,sizeof(message),"I am child process, message %d\n",cnt);
write(pipefd[1],message,strlen(message));
cnt--;
sleep(1);
}
exit(0);
}
//父进程
close(pipefd[1]);//关闭写端
char buffer[MAX];
while(true){
sleep(1);
//从文件描述符对应的管道里面读取数据
size_t n=read(pipefd[0],buffer,sizeof(buffer)-1);
if(n>0){
buffer[n]=0;
std::cout << getpid() << " -> " << "child sends: " << buffer << " to me" << std::endl;
}else if(n==0){
std::cout << "father return val(n):" << n << std::endl;
std::cout << "child quit, me too!!!" << std::endl;
sleep(1);
break;
}
}
std::cout << "finish reading..." << std::endl;
// 写端已经退出,读完后关闭读端
close(pipefd[0]);
pid_t rid = waitpid(pid, nullptr, 0);
if (rid == pid) {
std::cout << "wait success" << std::endl;
}
return 0;
}
⑵有名管道FIFO:有名管道允许不相关的进程通过文件系统中的一个路径名进行通信。创建有名管道可以使用 mkfifo 函数,判断有名管道是否已存在,若尚未创建,则以相应的权限创建。以下是创建有名管道和使用有名管道进行通信的代码示例:
发送端:
#include"name_fifo.hpp"
#include<unistd.h>
#include<stdlib.h>
#include<fcntl.h>
#include<limits.h>
#include<sys/types.h>
#include<sys/stat.h>
#include<stdio.h>
#include<string.h>
#include<errno.h>
#define MYFIFO "/tmp/myfifo"/* 有名管道文件名*/
#define MAX_BUFFER_SIZE PIPE_BUF/*常量PIPE_BUF 定义在于limits.h中*/
int main() {
char buff[MAX_BUFFER_SIZE];
int fd;
int nread;
/* 判断有名管道是否已存在,若尚未创建,则以相应的权限创建*/
if (access(MYFIFO, F_OK) == -1) {
if ((mkfifo(MYFIFO, 0666) < 0) && (errno!= EEXIST)) {
printf("Cannot create fifo file\n");
exit(1);
}
/* 以只读阻塞方式打开有名管道 */
fd = open(MYFIFO, O_RDONLY);
if (fd == -1) {
printf("Open fifo file error\n");
exit(1);
}
while (1) {
memset(buff, 0, sizeof(buff));
if ((nread = read(fd, buff, MAX_BUFFER_SIZE)) > 0) {
printf("Read '%s' from FIFO\n", buff);
}
close(fd);
exit(0);
}
}
}
接收端:
#include"name_fifo.hpp"
#include<unistd.h>
#include<stdlib.h>
#include<fcntl.h>
#include<limits.h>
#include<sys/types.h>
#include<sys/stat.h>
#include<stdio.h>
#include<string.h>
#include<errno.h>
#define MYFIFO "/tmp/myfifo"/* 有名管道文件名*/
#define MAX_BUFFER_SIZE PIPE_BUF/*常量PIPE_BUF 定义在于limits.h中*/
int main() {
char buff[MAX_BUFFER_SIZE];
int fd;
int nread;
/* 判断有名管道是否已存在,若尚未创建,则以相应的权限创建*/
if (access(MYFIFO, F_OK) == -1) {
if ((mkfifo(MYFIFO, 0666) < 0) && (errno!= EEXIST)) {
printf("Cannot create fifo file\n");
exit(1);
}
/* 以只读阻塞方式打开有名管道 */
fd = open(MYFIFO, O_RDONLY);
if (fd == -1) {
printf("Open fifo file error\n");
exit(1);
}
while (1) {
memset(buff, 0, sizeof(buff));
if ((nread = read(fd, buff, MAX_BUFFER_SIZE)) > 0) {
printf("Read '%s' from FIFO\n", buff);
}
close(fd);
exit(0);
}
}
}
信号是一种软件中断,是操作系统用来通知进程某个事件已经发生的一种方式。可以使用 signal 函数注册信号处理函数。以下是注册信号处理函数的代码示例:
#include <stdio.h>
#include <stdlib.h>
#include <sys/types.h>
#include <unistd.h>
#include <signal.h>
// 信号处理函数
void sighandler(int signo) {
printf("signo==[%d]\n", signo);
}
int main() {
// 注册信号处理函数
signal(SIGINT, sighandler);
while (1) {
sleep(10);
}
return 0;
}
文件是一种持久化存储机制,可用于进程间通信。写进程将数据写入文件,读进程从文件中读取数据。以下是写进程和读进程的代码示例:
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
int main() {
// 写进程
FILE *fp = fopen("data.txt", "w");
if (fp == NULL) {
perror("Error opening file for writing");
return 1;
}
char *data = "Hello from write process!";
fputs(data, fp);
fclose(fp);
// 读进程
fp = fopen("data.txt", "r");
if (fp == NULL) {
perror("Error opening file for reading");
return 1;
}
char buffer[100];
fgets(buffer, sizeof(buffer), fp);
printf("Read from file: %s\n", buffer);
fclose(fp);
return 0;
}
信号量是一种计数器,用于控制对共享资源的访问。可以使用信号量控制对共享内存的访问。以下是使用信号量控制对共享内存访问的代码示例:
#include <stdio.h>
#include <stdlib.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/shm.h>
#include <sys/sem.h>
union semun {
int val;
struct semid_ds *buf;
unsigned short *arry;
};
int set_semvalue(int sem_id) {
union semun sem_union;
sem_union.val = 1;
if (semctl(sem_id, 0, SETVAL, sem_union) == -1)
return 0;
return 1;
}
void del_semvalue(int sem_id) {
union semun sem_union;
if (semctl(sem_id, 0, IPC_RMID, sem_union) == -1)
fprintf(stderr, "Failed to delete semaphore\n");
}
int semaphore_p(int sem_id) {
struct sembuf sem_b;
sem_b.sem_num = 0;
sem_b.sem_op = -1;
sem_b.sem_flg = SEM_UNDO;
if (semop(sem_id, &sem_b, 1) == -1) {
fprintf(stderr, "semaphore_p failed\n");
return 0;
}
return 1;
}
int semaphore_v(int sem_id) {
struct sembuf sem_b;
sem_b.sem_num = 0;
sem_b.sem_op = 1;
sem_b.sem_flg = SEM_UNDO;
if (semop(sem_id, &sem_b, 1) == -1) {
fprintf(stderr, "semaphore_v failed\n");
return 0;
}
return 1;
}
int main() {
int shmid, sem_id;
void *shm = NULL;
struct shared_use_st *shared;
// 创建共享内存
shmid = shmget((key_t)1234, sizeof(struct shared_use_st), 0666 | IPC_CREAT);
if (shmid == -1) {
fprintf(stderr, "shmget failed\n");
exit(EXIT_FAILURE);
}
// 将共享内存连接到当前进程的地址空间
shm = shmat(shmid, 0, 0);
if (shm == (void *)-1) {
fprintf(stderr, "shmat failed\n");
exit(EXIT_FAILURE);
}
// 新建信号量
sem_id = semget((key_t)1234, 1, 0666 | IPC_CREAT);
// 信号量初始化
if (!set_semvalue(sem_id)) {
fprintf(stderr, "init failed.\n");
exit(EXIT_FAILURE);
}
// 操作共享内存
shared = (struct shared_use_st *) shm;
// 写入数据
strcpy(shared->text, "Data to be shared");
semaphore_v(sem_id);
// 读取数据
semaphore_p(sem_id);
printf("Read from shared memory: %s\n", shared->text);
// 删除信号量
del_semvalue(sem_id);
// 把共享内存从当前进程中分离
if (shmdt(shm) == -1) {
fprintf(stderr, "shmdt failed\n");
exit(EXIT_FAILURE);
}
// 删除共享内存
if (shmctl(shmid, IPC_RMID, 0) == -1) {
fprintf(stderr, "shmctl(IPC_RMID) failed");
exit(EXIT_FAILURE);
}
exit(EXIT_SUCCESS);
}
共享内存允许多个进程访问同一内存区域,是一种高效的 IPC 机制。可以使用 shmget、shmat、shmdt 和 shmctl 函数来创建共享内存、写入数据和读取数据。以下是创建共享内存、写入数据和读取数据的代码示例:
#include <stdio.h>
#include <stdlib.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/shm.h>
#define TEXT_SZ 2048
struct shared_use_st {
char text[TEXT_SZ];
};
int main() {
int shmid;
void *shm = NULL;
struct shared_use_st *shared;
// 创建共享内存
shmid = shmget((key_t)1234, sizeof(struct shared_use_st), 0666 | IPC_CREAT);
if (shmid == -1) {
fprintf(stderr, "shmget failed\n");
exit(EXIT_FAILURE);
}
// 将共享内存连接到当前进程的地址空间
shm = shmat(shmid, 0, 0);
if (shm == (void *)-1) {
fprintf(stderr, "shmat failed\n");
exit(EXIT_FAILURE);
}
// 设置共享内存
shared = (struct shared_use_st *) shm;
// 写入数据
strcpy(shared->text, "Data to be shared");
// 读取数据
printf("Read from shared memory: %s\n", shared->text);
// 把共享内存从当前进程中分离
if (shmdt(shm) == -1) {
fprintf(stderr, "shmdt failed\n");
exit(EXIT_FAILURE);
}
// 删除共享内存
if (shmctl(shmid, IPC_RMID, 0) == -1) {
fprintf(stderr, "shmctl(IPC_RMID) failed");
exit(EXIT_FAILURE);
}
exit(EXIT_SUCCESS);
}
消息队列允许进程发送和接收消息。可以使用 msgget、msgsnd 和 msgrcv 函数来创建消息队列、发送消息和接收消息。以下是创建消息队列、发送消息和接收消息的代码示例:
#include <stdio.h>
#include <stdlib.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#define MSG_SIZE 100
struct msgbuf {
long mtype;
char mtext[MSG_SIZE];
};
int main() {
int msgid;
key_t key = ftok(".", 'm');
// 创建消息队列
msgid = msgget(key, 0666 | IPC_CREAT);
if (msgid == -1) {
perror("msgget");
exit(1);
}
// 发送消息
struct msgbuf sendbuf;
sendbuf.mtype = 1;
strcpy(sendbuf.mtext, "Hello from message queue!");
if (msgsnd(msgid, &sendbuf, strlen(sendbuf.mtext) + 1, 0) == -1) {
perror("msgsnd");
exit(1);
}
// 接收消息
struct msgbuf recvbuf;
if (msgrcv(msgid, &recvbuf, MSG_SIZE, 1, 0) == -1) {
perror("msgrcv");
exit(1);
}
printf("Received message: %s\n", recvbuf.mtext);
// 删除消息队列
if (msgctl(msgid, IPC_RMID, NULL) == -1) {
perror("msgctl");
exit(1);
}
return 0;
}
套接字是一种网络通信机制,也可用于本地 IPC。可以使用 socket、bind、listen、accept 和 connect 函数来实现服务端和客户端的通信。以下是服务端和客户端的代码示例:
服务端:
#include <iostream>
#include <string.h>
#include <unistd.h> // for close()
#include <arpa/inet.h> // for sockaddr_in, inet_addr
#include <sys/types.h>
#include <sys/socket.h>
#define PORT 8080
#define BUFFER_SIZE 1024
int main() {
int server_fd, new_socket;
struct sockaddr_in address;
int opt = 1;
int addrlen = sizeof(address);
char buffer[BUFFER_SIZE] = {0};
// 创建 socket 文件描述符
if ((server_fd = socket(AF_INET, SOCK_STREAM, 0)) == 0) {
perror("socket failed");
exit(EXIT_FAILURE);
}
// 将套接字绑定到端口
if (setsockopt(server_fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt))) {
perror("setsockopt");
exit(EXIT_FAILURE);
}
address.sin_family = AF_INET;
address.sin_addr.s_addr = INADDR_ANY; // 接受任何地址
address.sin_port = htons(PORT); // 转换为网络字节序
if (bind(server_fd, (struct sockaddr *)&address, sizeof(address))<0) {
perror("bind failed");
exit(EXIT_FAILURE);
}
// 开始监听
if (listen(server_fd, 3) < 0) {
perror("listen");
exit(EXIT_FAILURE);
}
std::cout << "Server is listening on port " << PORT << std::endl;
// 接受客户端连接
if ((new_socket = accept(server_fd, (struct sockaddr *)&address,
(socklen_t*)&addrlen))<0) {
perror("accept");
exit(EXIT_FAILURE);
}
// 接收数据
read(new_socket , buffer, BUFFER_SIZE);
std::cout << "Message from client: " << buffer << std::endl;
const char *hello = "Hello from server";
send(new_socket , hello , strlen(hello) , 0 );
std::cout << "Hello message sent" << std::endl;
close(new_socket);
close(server_fd);
return 0;
}
客户端代码:
#include <iostream>
#include <string.h>
#include <unistd.h> // for close()
#include <arpa/inet.h> // for sockaddr_in, inet_addr
#define PORT 8080
int main() {
int sock = 0;
struct sockaddr_in serv_addr;
char *hello = "Hello from client";
char buffer[1024] = {0};
// 创建套接字文件描述符
if ((sock = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
std::cerr << "\n Socket creation error \n";
return -1;
}
serv_addr.sin_family = AF_INET;
serv_addr.sin_port = htons(PORT);
// 将 IPv4 地址从文本转换为二进制形式
if(inet_pton(AF_INET, "127.0.0.1", &serv_addr.sin_addr)<=0) {
std::cerr << "\nInvalid address/ Address not supported \n";
return -1;
}
// 发起连接请求
if (connect(sock, (struct sockaddr *)&serv_addr, sizeof(serv_addr)) < 0) {
std::cerr << "\nConnection Failed \n";
return -1;
}
send(sock , hello , strlen(hello) , 0 );
std::cout << "Hello message sent from client" << std::endl;
read(sock , buffer, sizeof(buffer));
std::cout << "Message from server: " << buffer << std::endl;
close(sock);
return 0;
}
编译与运行
首先,编译服务端和客户端代码:g++ server.cpp -o server g++ client.cpp -o client 在一个终端中运行服务端:./server 在另一个终端中运行客户端:./client
文件处理与转换:管道常被用于将一个进程的输出作为另一个进程的输入,从而实现数据的传输与处理。比如在 Shell 脚本中,使用 ls | grep "txt" 命令,通过管道将 ls 命令列出的文件列表传输给 grep 命令,筛选出文件名包含 "txt" 的文件2。
数据库操作:多个进程可能需要共享数据库连接或对数据库进行协同操作。例如,一个进程负责向数据库写入数据,另一个进程负责从数据库读取数据并进行分析,它们之间可以通过共享内存或消息队列来传递数据库操作的指令和数据。
多媒体处理:在多媒体应用中,不同的进程可能负责音频、视频的采集、编码、解码、播放等不同环节。进程间通过共享内存或管道等方式传输音频视频数据,实现多媒体数据的流畅处理。
打印机等设备共享:多个进程可能需要同时访问打印机、扫描仪等外部设备。通过信号量来控制对设备的访问权限,确保同一时刻只有一个进程能够使用设备,避免冲突134。
文件锁机制:当多个进程需要对同一个文件进行读写操作时,使用信号量或文件锁来实现互斥访问,防止数据损坏或不一致。例如,一个进程正在写入文件时,其他进程需要等待,直到写入操作完成。
进程状态通知:父进程创建子进程后,子进程的终止、暂停等状态变化需要及时通知父进程。信号机制常用于这种场景,子进程可以通过发送特定信号告知父进程其状态14。
系统事件通知:当系统发生某些事件,如磁盘空间不足、网络连接变化等,内核会向相关进程发送信号,进程接收到信号后可以采取相应的处理措施。
分布式计算:在分布式系统中,不同的计算机节点上的进程需要协同工作来完成复杂的计算任务。消息队列可用于在节点间传递任务指令和中间结果,实现任务的分发和结果的汇总。
多线程编程:在同一个进程中的多个线程之间也需要进行通信和协作。虽然线程共享进程的地址空间,但也需要通过信号量、互斥量等机制来实现同步和互斥,确保线程安全地访问共享资源 。
客户端 / 服务器模型:服务器进程和多个客户端进程之间需要进行通信。套接字是实现这种通信的常用方式,它不仅可以用于本地进程间通信,还支持网络通信,使得客户端可以通过网络连接到服务器。
微服务架构:在微服务架构中,不同的微服务进程之间需要进行高效的通信和协作。可以根据具体需求选择合适的进程间通信方式,如消息队列用于异步通信、HTTP 接口基于套接字实现服务间的调用等。