欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 财经 > 产业 > 匿名管道在进程池中的应用案例

匿名管道在进程池中的应用案例

2024/10/25 12:19:56 来源:https://blog.csdn.net/Crazy_Duck_Weng/article/details/141469087  浏览:    关键词:匿名管道在进程池中的应用案例

目录

  • 1. 管理多个管道文件
  • 2. 初始化创建出来的管道
  • 3. 子进程的工作
  • 4. 模拟任务模块
  • 5. 父进程分发任务
  • 6. 清理释放子进程
  • 7. 优化

本案例是基于 【进程间通信(一)】【管道通信(下)】 中对于管道的应用场景做的一个代码案例,可以先观看管道通信相关的文章,以得到该案例更好的观感体验。

1. 管理多个管道文件

父进程创建了一批子进程,每个子进程与父进程要实现进程通信,就要创建一个管道文件,作为双方通信的资源地。操作系统需要对系统中存在的大量进程、打开的文件等内核结构进行管理,因此我们创建出来的管道,也要进行管理。

const int processNum = 10;// 管理通信信道 --- 先描述
class channel
{
public:channel(int cmdfd, pid_t slaverId, const std::string processName): _cmdfd(cmdfd), _slaverId(slaverId), _processName(processName){}int _cmdfd;               // 发送任务的文件描述符pid_t _slaverId;          // 子进程 pidstd::string _processName; // 子进程名字
};// 管理通信信道 --- 后组织// father -> w, child -> rstd::vector<channel> channels;

2. 初始化创建出来的管道

使用循环结构 + fork() 创建多个子进程。父进程对管道做写入,子进程读取,以此来实现父进程通过管道通信对子进程派发任务的案例。

因此创建完子进程后,子进程要把写端关闭,只保留管道的读端。同样的,父进程则关闭对管道的读端。

// version-1(有点小bug,后面优化)
void InitProcessPool(std::vector<channel> *channels)
{for (int i = 0; i < processNum; ++i){int pipefd[2];      int n = pipe(pipefd);       // 提前创建管道assert(n == 0);(void)n;pid_t id = fork();  // child processif (id == 0){close(pipefd[1]);       // 关闭子进程对管道的写端dup2(pipefd[0], 0);     // 子进程对管道的读端做重定向到标准输入,弱化管道的概念,read 直接读 0 fd即可close(pipefd[0]);       // 这一步可做可不做slaver();     std::cout << "child process(" << getpid() << ") quit!\n";      exit(0);            }// father processclose(pipefd[0]);       // 关闭父进程对管道的读端// 添加 channel 字段std::string name = "process-" + std::to_string(i);channels->push_back(channel(pipefd[1], id, name));}
}

3. 子进程的工作

因为在初始化中,我们对所有子进程的读端做了重定向,因此在子进程的操作中,弱化了管道的概念,不需要传参指明文件描述符,直接对重定向后的 fd 做读取即可。这里人为规定好,子进程一次读4 bytes。

void slaver()
{while (true){int cmdCode = 0;// 规定通信协议:一次读 4 bytesint n = read(0, &cmdCode, sizeof(int));     // 因为对管道的读端做了重定向到标准输入,因此都是从 0 fd中读取if (n == sizeof(int)){std::cout << "[slaver process-" << getpid() << "] get a command, cmdcode: " << cmdCode << std::endl;if (cmdCode >= 0 && cmdCode < tasks.size()) tasks[cmdCode]();}if (n == 0) break;       // 读到为结尾的本质是父进程不再往管道写入数据了,因此子进程可以退出了,没有必要继续读取了。}
}

4. 模拟任务模块

#pragma once#include <iostream>
#include <vector>typedef void(*task_t)();void task1()
{std::cout << "task1" << "\n";
}
void task2()
{std::cout << "task2" << "\n";
}
void task3()
{std::cout << "task3" << "\n";
}
void task4()
{std::cout << "task4" << "\n";
}void LoadTask(std::vector<task_t> *tasks)
{tasks->push_back(task1);tasks->push_back(task2);tasks->push_back(task3);tasks->push_back(task4);
}

5. 父进程分发任务

分发任务需要解决几个问题

  • 选择哪个任务 ---- 随机数做选择,后续可以优化为用户交互来选择任务。
  • 选择哪个子进程 ----- 可以是随机数,也可以轮询分配,只要保证负载均衡即可(即不要一直在某几个子进程执行即可,要尽可能的 “公平” )
  • 最后发送任务,即与子进程进行通信,写入文件的系统接口是 write,管道文件也是文件,因此直接 write 写入数据即可。这里人为规定好,父进程一次写 4 bytes。
void Menu()
{std::cout << " -------------------------------------" << std::endl;std::cout << "|   1. task1             2. task2     |" << std::endl;std::cout << "|   3. task3             4. task4     |" << std::endl;std::cout << "|             0. exit                 |" << std::endl;std::cout << " ------------------------------------- " << std::endl;
}void ContralSlaver(const std::vector<channel> &channels)
{int which = 0;      while (true){Menu();int input = 0;std::cout << "Please Enter@ ";std::cin >> input;if(input <= 0 || input > tasks.size()) break;// 1. 选择任务// int cmdCode = rand() % tasks.size();int cmdCode = input - 1;// 2. 选择进程(这一步需要保证负载平衡,可以采用随机数或者轮询的策略)// int processId = rand() % channels.size();std::cout << "[father process]# " << "cmdcode: " << cmdCode << " is already send to [" << channels[which]._slaverId << "] process name: "<< channels[which]._processName << std::endl;// 3. 发送任务// 规定通信协议:一次写 4 byteswrite(channels[which]._cmdfd, &cmdCode, sizeof(cmdCode));++which;which %= channels.size();       // 在选择进程执行任务时,采用轮询的策略// sleep(1);}
}

6. 清理释放子进程

void QuitProcess(const std::vector<channel> &channels)
{// for (const auto &c : channels)// {// 	   close(c._cmdfd);// 	   waitpid(c._slaverId, nullptr, 0);// }for (const auto &c : channels) close(c._cmdfd);sleep(5);for (const auto &c : channels) waitpid(c._slaverId, nullptr, 0);
}

这个功能模块着重需要注意,如果以上面的代码为基准的话,那么这里不能在一个 for 循环内执行关闭 fd 和 waitpid 的操作。在上面初始化的模块中,是有点问题存在的。子进程创建时,会继承父进程的很多东西,包括父进程的 pcb 中的部分字段,其中就包括文件描述符表,因此父进程每创建一个子进程,就要创建一个管道文件,就会多打开一个文件描述符,然后指向新建的管道。而后续 fork 创建子进程时,子进程直接就继承了父进程这张文件描述符啊。也就是说,越到后面,子进程继承下来的文件描述符表中的写端就越来越多。

举个例子,假如创建10个子进程,每创建一个子进程的同时会创建一个管道文件,那么父进程就有一个 fd 指向该管道文件的写端。第 1 个子进程创建时,父进程有了指向管道写端的第一个 fd;第二个子进程创建时,继承父进程的文件描述符表,同时创建管道文件,因此第二个子进程有两个 fd 写端打开着,一个指向上一个创建的管道文件的写端,一个指向与该子进程相关的管道文件的写端。。。。以此类推,到创建第10个子进程时,该子进程的文件描述符表中,会多了 9 个 fd 指向与别的子进程相关的管道的写端。

所以在 close 和 waitpid 时,如果直接一个循环解决,那么父进程等待子进程时就会一直陷入阻塞状态。因为关闭了指向第一个管道的写端,还有剩下 9 个子进程指向该管道的写端!只要还要文件描述符指向该写端,那么子进程就无法退出!子进程无法退出,父进程就等不到子进程,就要一直阻塞!

弄清楚了原理,我们自然就能够知道最简单粗暴的解决方式。要把该管道的所有写端全部关闭完了,这个管道的读端才可以不需要继续做读取,相关的进程才能够退出。而最后创建的那个管道的写端,是只有一个子进程指向它的(即最后创建的子进程),因此倒序遍历 channel 就能够解决该问题。

还是举个例子方便大家理解,第一个子进程要退出,需要关闭第一个管道文件的写端,而第一个管道的写端,所有子进程都有,因为剩下的九个进程继承了父进程的文件描述符,它们的表中都有 fd 指向这个管道的写端。而第二个子进程要退出,需要关闭第二个管道的写端,那么就需要关闭后续8个进程的写端。所以倒过来退出子进程的话,关闭倒数第二个管道文件的写端(倒数第一个可以直接关闭,没有其它进程的 fd_array[ ] 指向它了),并且倒数第二个子进程退出后,该子进程指向其它的管道的写端,自然就被关闭了,这样就能够关闭倒数第三个管道文件的写端,然后退出子进程,就这样以此类推。。。

因此除了先把全部的写端关闭了,还可以这样写:

void QuitProcess(const std::vector<channel> &channels)
{// version1 for(int i = channels.size() - 1; i >= 0; --i){close(channels[i]._cmdfd);waitpid(channels[i]._slaverId, nullptr, 0);}
}

7. 优化

在初始化模块时就解决这个问题,记录所有的管道写端 fd,然后每创建出一个子进程,将其继承下来的所有不属于该进程的 fd 全部关闭,这样后续 QuitProcess 清理释放子进程时,就可以一个循环解决,也不需要倒叙了。

void InitProcessPool(std::vector<channel> *channels)
{// version 2: 确保每一个子进程都只有一个写端std::vector<int> oldfds;    // 记录父进程打开的所有fd,即管道写端for (int i = 0; i < processNum; ++i){...pid_t id = fork();  if (id == 0){// 关闭之前所有管道的写端for(auto fd : oldfds) close(fd);  ...}...oldfds.push_back(pipefd[1]);}
}void QuitProcess(const std::vector<channel> &channels)
{// version2for (const auto &c : channels){close(c._cmdfd);waitpid(c._slaverId, nullptr, 0);}
}

如果感觉该篇文章给你带来了收获,可以 点赞👍 + 收藏⭐️ + 关注➕ 支持一下!

感谢各位观看!

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com