1 进程间通信的前提
进程具有独立性,通信的前提:先得让不同的进程看到同一份资源(某种形式的内存空间),只能由操作系统提供。
实现:数据传输、资源共享、通知事件、进程控制。
2 本地通信
同一台主机,同一个OS,不同进程之间通信。
进程通信的标准:System V IPC 标准,POSIX 标准(可移植操作系统接口标准)。
3 进程间通信分类
管道:匿名管道 pipe、命名管道。
System V IPC:System V 消息队列、System V 共享内存、System V 信号量。
4 管道
who | wc -l
一个组合命令,用于统计当前登录系统的用户数量。
1. who : 列出当前登录系统的用户信息。
2. | : 管道符,将 who 的输出传递给 wc -l 。
3. wc -l : 统计输入的行数。
文件描述符角度理解匿名管道
进程打开磁盘文件,可使用 fopen/open/fstream 等,首先得告诉对应的路径,根据路径确定是在哪一个分区当中,然后再根据路径解析从根目录开始依次解析,找到文件名和 inode 的映射关系,找到文件的 inode,在特定分区内部找到对应的分组,根据 inode 号找到 inode 属性,文件的内容和属性全有了,所以在内核中就可以创建对应的内核级结构struct file,文件所对应的三个关键字段:inode表、文件操作符表、内核级缓冲区。
上图管道(内存级,特定文件的内核级缓冲区)只能单向通信(父➡️子或子➡️父)
1. 如果不关闭呢?资源浪费、fd 泄露、误操作等问题。
2. 先创建管道,在创建子进程。
3. 为何 rw(读写)同时打开?
管道的数据流动是单向的,数据从写端流入,从读端流出。因此,管道的两端必须同时存在,才能实现数据的传递。先创建子进程继承权限,然后父进程和子进程分别关闭不需要的一端。4. 先有简单通信的需求,只需实现单向通信即可,于是根据特性再将其命名为管道。
5. 此管道不需要路径,不需要名字,故被称为匿名管道。
另:标准输出和标准错误是两个不同的文件描述符,例,./myexe >log.txt 重定向的完整写法是 ./myexe 1>log.txt,如果想要1、2都重定向,则有 ./myexe >log.txt 2>&1 。为方便 debug,还可以 ./myexe 1>normal.txt 2>error.txt 。
(1)pipe()
#include <unistd.h>
int pipe(int pipefd[2]);RETURN VALUEOn success, zero is returned. On error, -1 is returned, and errno is set appropriately.
pipefd[2] 是输出型参数
(2)匿名管道的4种情况和5个特性
临界资源(Critical Resource)是指在操作系统中,同一时刻只能被一个进程访问的资源。当多个进程并发执行时,如果它们试图同时访问同一个临界资源,可能会导致数据不一致或系统状态错误。
管道的4种场景:
①管道为空且管道正常,read(系统调用)会阻塞;
②管道为满且管道正常,write(系统调用)会阻塞。
注:管道有上限,Ubuntu 是 64KB 。管道的特性之一是“面向字节流”。
③管道写端关闭且读端继续,读端读到0,表示读到文件结尾。
如果写端关闭了,读端读完管道内数据,再读取的时候就会读到返回值0,表示对端关闭(管道角度),同时也表示读到文件结尾(文件角度)。
④管道读端关闭但写端正常,OS会直接杀掉进行写入的进程。
发送 13)SIGPIPE 信号给目标进程)。
匿名管道特性:
①面向字节流;
②具有血缘关系的进程进行IPC,常见于父子;
③文件的生命周期随进程,管道也是;
④单向数据通信;
⑤管道自带同步互斥等保护机制(对共享资源的保护)。
*
在使用 write(2) 系统调用向管道(pipe)写入数据时,如果写入的数据量小于 PIPE_BUF 字节,那么这个写入操作必须是原子的(atomic)。
On Linux, PIPE_BUF is 4096 bytes.
原子操作是指在执行过程中不会被中断的操作。对于 write(2) 来说,原子性意味着如果多个进程同时向同一个管道写入数据,且每个写入的数据量都小于 PIPE_BUF,那么这些写入操作不会相互干扰,每个进程的写入数据都会完整地写入管道,而不会被其他进程的数据打断或混合。
管道通信的场景:进程池
「Makefile 示例」BIN=procpool
CC=g++
FLAGS=-c -Wall -std=c++11
LDFLAGS=-o
#SRC=$(shell ls *.cc)
SRC=$(wildcard *.cc)
OBJ=$(SRC:.cc=.o)$(BIN):$(OBJ)$(CC) $(LDFLAGS) $@ $^
%.o:%.cc$(CC) $(FLAGS) $<.PHONY:clean
clean:rm -f $(BIN) $(OBJ).PHONY:test
test:@echo $(SRC)@echo $(OBJ)
master —— 管道 —→ worker/slaver (一对多)
Channel.hpp
#ifndef __CHANNEL_HPP__
#define __CHANNEL_HPP__#include <iostream>
#include <string>
#include <unistd.h>
using namespace std;class Channel
{
public:Channel(int wfd, pid_t pid) : _wfd(wfd), _pid(pid){_name = "Channel-" + to_string(wfd) + " : " + to_string(pid);}string name(){return _name;}void Send(int cmd){::write(_wfd, &cmd, sizeof(cmd));}void Close(){::close(_wfd);}int ChildID(){return _pid;}int WFD(){return _wfd;}private:int _wfd;string _name;pid_t _pid;
};
Task.hpp
#pragma once
#include <iostream>
#include <unordered_map>
#include <functional>
using std::cout;
using std::endl;
using task_t = std::function<void()>;void Download()
{cout << "Downloading..., pid = " << getpid() << endl;
}void Log()
{cout << "Logging..., pid = " << getpid() << endl;
}void Sync()
{cout << "Syncing..., pid = " << getpid() << endl;
}
static int num = 0;class Taskmanage
{
public:Taskmanage(){Inserttask(Download);Inserttask(Log);Inserttask(Sync);}void Inserttask(task_t t){tasks[num++] = t;}void Exe(int num){if (tasks.find(num) == tasks.end())return;tasks[num]();}int Select(){return rand() % num;}~Taskmanage() {}private:std::unordered_map<int, task_t> tasks;
};Taskmanage tm;void Worker()
{while (true){int cmd = 0;int n = ::read(0, &cmd, sizeof(cmd));if (n == sizeof(cmd)){tm.Exe(cmd);}else if (n == 0){cout << getpid() << " quit..." << endl;break;}else if (n > 0){}}
}
Procpool.hpp
#include <iostream>
#include <string>
#include <unistd.h>
#include <cstdlib>
#include <vector>
#include <functional>
#include <sys/types.h>
#include <sys/wait.h>
#include "Task.hpp"
#include "Channel.hpp"using namespace std;
using work_t = function<void()>; // typedef function<void()> work_t;enum
{NonErr = 0,UsageErr,PipeErr,ForkErr
};class Procpool
{
public:Procpool(int n, work_t w) : num(n), work(w){}// channels是输出型参数// work_t work: 回调// 创建子进程和子进程的任务是解耦的int Initprocpool(){for (int i = 0; i < num; i++){// 构建管道int pipefds[2] = {0};int n = pipe(pipefds);if (n < 0){return PipeErr;}// 创建进程pid_t id = fork();if (id < 0){return ForkErr;}// 进程间通信if (id == 0){// 子进程// 关闭历史fdcout << "Child " << getpid() << " close fd history: ";for (auto &e : channels){cout << e.WFD() << ", ";e.Close();}cout << " END" << endl;::close(pipefds[1]); // 读,关闭写(1)cout << "Child read fd: " << pipefds[0] << endl;dup2(pipefds[0], 0);work();::exit(0); // 子进程退出}// 父进程::close(pipefds[0]); // 写,关闭读(0)// Channel ch(pipefds[1], id);// channels.push_back(ch);channels.emplace_back(pipefds[1], id);}return NonErr;}void Dispitch(){int pid = 0;// 派发任务int num = 20;while (num--){// 确定任务码int task = tm.Select();// 选择一个子进程channelChannel &cur = channels[pid++];pid %= channels.size();cout << "%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%" << endl;cout << "send " << task << " to " << cur.name() << " | " << num << " left." << endl;cout << "%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%" << endl;// 派发任务cur.Send(task);sleep(1);}}void Cleanprocpool(){// 关一个收一个(需要每次子进程关闭历史写端fd)for (auto &e : channels){e.Close();}for (auto &e : channels){pid_t rid = ::waitpid(e.ChildID(), nullptr, 0);if (rid > 0){cout << "Child " << rid << " wait success" << endl;}}// 逆序关闭// for(int i = channels.size() - 1; i >= 0; i--){// channels[i].Close();// pid_t rid = ::waitpid(channels[i].ChildID(), nullptr, 0);// if (rid > 0)// {// cout << "Child " << rid << " wait success" << endl;// }// }// for (auto &e : channels)// {// e.Close();// }// for (auto &e : channels)// {// pid_t rid = ::waitpid(e.ChildID(), nullptr, 0);// if (rid > 0)// {// cout << "Child " << rid << " wait success" << endl;// }// }}void Debugprint(){for (auto &e : channels){cout << e.name() << endl;}}private:vector<Channel> channels;int num;work_t work;
};
Main.cc
#include "Procpool.hpp"
#include "Task.hpp"void Usage(string proc)
{cout << "Usage: " << proc << "proc_num" << endl;
}int main(int argc, char *argv[])
{if (argc != 2){Usage(argv[0]);return UsageErr;}int num = stoi(argv[1]);vector<Channel> channels;Procpool *pp = new Procpool(num, Worker);// 初始化进程池pp->Initprocpool();// Debugprint(channels);// 派发任务pp->Dispitch();// 退出进程池pp->Cleanprocpool();// 初始化进程池//Initprocpool(num, channels, Worker);// Debugprint(channels);// 派发任务//Dispitch(channels);// 退出进程池//Cleanprocpool(channels);delete pp;return 0;
}
让所有子进程都从标准输入里读取命令 dup2(pipefd[0], 0) 。
任务码通常用于进程间通信(IPC)中,帮助进程识别和处理不同的任务。
派发任务原则:负载均衡(任务量差不多),实现方法:轮询、随机(长维度)、历史任务数。
如何退出?父进程将 master 写端描述符关闭,所有子进程读端读到0后break,父进程wait进行回收。
如果不是两次遍历,而是在一次遍历中先关闭写端,再对该子进程wait的话,会阻塞,因为当初往下创建子进程时,会导致第一个管道的写端不断增多,从前到后创建的每个管道对应的写端数量依次递减。
法一:逆序关闭子进程
法二:子进程关闭历史上master的所有写端(channels 保存的就是历史上master打开的所有写端)