文章目录
- 1. Fork - Join模式
- 2. Producer - Consumer模式
- 3. Readers - Writers模式
- 4. Work Thread模式
- 5. Actor模式
- 6、 Pipeline模式概述
- 应用场景
- C++实现示例
- 代码解释
1. Fork - Join模式
- 原理:将一个大任务分解为多个子任务,这些子任务在不同的线程中并行执行,当所有子任务完成后,再将它们的结果合并得到最终结果。
- 优点:充分利用多核处理器的并行能力,提高计算效率;任务分解和结果合并的过程清晰,便于理解和实现分治算法等。
- 缺点:需要考虑子任务的划分合理性以及线程间的同步问题,以确保结果的正确性。
- 适用场景:适用于能有效分解为多个独立子任务的计算密集型任务,如大规模数据处理、科学计算中的矩阵运算等。
#include <iostream>
#include <vector>
#include <thread>
#include <numeric>// 计算数组某区间的和
void sum_subarray(const std::vector<int>& arr, int start, int end, int& result) {result = std::accumulate(arr.begin() + start, arr.begin() + end, 0);
}int main() {std::vector<int> arr(1000);std::iota(arr.begin(), arr.end(), 1);int mid = arr.size() / 2;int left_sum = 0, right_sum = 0;// Fork: 创建两个线程分别计算左右子数组的和std::thread left_thread(sum_subarray, std::ref(arr), 0, mid, std::ref(left_sum));std::thread right_thread(sum_subarray, std::ref(arr), mid, arr.size(), std::ref(right_sum));// Join: 等待两个线程完成left_thread.join();right_thread.join();int total_sum = left_sum + right_sum;std::cout << "Total sum: " << total_sum << std::endl;return 0;
}
2. Producer - Consumer模式
- 原理:生产者线程负责生产数据并将其放入缓冲区,消费者线程从缓冲区取出数据进行处理。通过缓冲区实现生产者和消费者的解耦,使它们可以以不同的速度运行。
- 优点:提高了程序的并发度和整体性能,生产者和消费者可以独立优化和扩展,代码可读性和维护性较好。
- 缺点:需要合理设计缓冲区大小和同步机制,避免缓冲区溢出或不足,同时要处理好生产者和消费者之间的同步与互斥问题。
- 适用场景:广泛应用于各种数据处理场景,如文件读取与处理、网络数据的接收与解析、消息队列系统等。
#include <iostream>
#include <queue>
#include <thread>
#include <mutex>
#include <condition_variable>std::queue<int> task_queue;
std::mutex mtx;
std::condition_variable cv;
bool is_running = true;// 生产者函数
void producer() {for (int i = 0; i < 10; ++i) {std::this_thread::sleep_for(std::chrono::milliseconds(100));{std::unique_lock<std::mutex> lock(mtx);task_queue.push(i);std::cout << "Produced: " << i << std::endl;}cv.notify_one();}{std::unique_lock<std::mutex> lock(mtx);is_running = false;}cv.notify_all();
}// 消费者函数
void consumer() {while (true) {std::unique_lock<std::mutex> lock(mtx);cv.wait(lock, [] { return!task_queue.empty() ||!is_running; });if (task_queue.empty() &&!is_running) {break;}int task = task_queue.front();task_queue.pop();std::cout << "Consumed: " << task << std::endl;}
}int main() {std::thread producer_thread(producer);std::thread consumer_thread(consumer);producer_thread.join();consumer_thread.join();return 0;
}
3. Readers - Writers模式
- 原理:允许多个读者线程同时访问共享资源,但当有写者线程访问时,所有读者和其他写者都需等待。写者线程具有更高的优先级,以确保数据的一致性。
- 优点:能有效提高对共享资源的访问效率,在有大量读操作和少量写操作的情况下,能充分利用多核处理器的并行性。
- 缺点:实现较为复杂,需要精细地控制读写线程的同步与互斥,以避免数据不一致和死锁等问题。
- 适用场景:常用于数据库系统、文件系统等需要频繁读写共享数据的场景,其中读操作远远多于写操作。
#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>std::mutex rw_mutex;
std::condition_variable rw_cv;
int readers = 0;
bool writer_active = false;// 读者函数
void reader(int id) {{std::unique_lock<std::mutex> lock(rw_mutex);rw_cv.wait(lock, [] { return!writer_active; });++readers;}std::cout << "Reader " << id << " is reading." << std::endl;std::this_thread::sleep_for(std::chrono::milliseconds(100));{std::unique_lock<std::mutex> lock(rw_mutex);--readers;if (readers == 0) {rw_cv.notify_one();}}
}// 写者函数
void writer(int id) {{std::unique_lock<std::mutex> lock(rw_mutex);rw_cv.wait(lock, [] { return!writer_active && readers == 0; });writer_active = true;}std::cout << "Writer " << id << " is writing." << std::endl;std::this_thread::sleep_for(std::chrono::milliseconds(100));{std::unique_lock<std::mutex> lock(rw_mutex);writer_active = false;rw_cv.notify_all();}
}int main() {std::thread r1(reader, 1);std::thread r2(reader, 2);std::thread w1(writer, 1);std::thread r3(reader, 3);r1.join();r2.join();w1.join();r3.join();return 0;
}
4. Work Thread模式
- 原理:有一个任务队列,多个工作线程从队列中获取任务并执行。任务队列可以是优先级队列或普通队列,工作线程根据一定的规则从队列中取出任务进行处理。
- 优点:实现相对简单,易于扩展工作线程的数量来应对不同的负载需求,能有效利用线程资源,避免线程的频繁创建和销毁。
- 缺点:任务队列的管理和线程的调度需要一定的开销,可能会出现任务饥饿现象,即某些低优先级任务长时间得不到执行。
- 适用场景:适用于处理大量异步任务的场景,如网络服务器中的请求处理、分布式系统中的任务调度等。
#include <iostream>
#include <queue>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <functional>
#include <vector>std::queue<std::function<void()>> work_queue;
std::mutex queue_mutex;
std::condition_variable queue_cv;
bool is_working = true;// 工作线程函数
void worker() {while (true) {std::function<void()> task;{std::unique_lock<std::mutex> lock(queue_mutex);queue_cv.wait(lock, [] { return!work_queue.empty() ||!is_working; });if (work_queue.empty() &&!is_working) {break;}task = std::move(work_queue.front());work_queue.pop();}task();}
}int main() {std::vector<std::thread> workers;for (int i = 0; i < 3; ++i) {workers.emplace_back(worker);}// 添加任务到队列{std::unique_lock<std::mutex> lock(queue_mutex);for (int i = 0; i < 5; ++i) {work_queue.emplace([i] {std::cout << "Task " << i << " is being executed." << std::endl;std::this_thread::sleep_for(std::chrono::milliseconds(100));});}}queue_cv.notify_all();// 等待所有任务完成{std::unique_lock<std::mutex> lock(queue_mutex);is_working = false;}queue_cv.notify_all();for (auto& t : workers) {t.join();}return 0;
}
5. Actor模式
- 原理:将整个处理过程分为多个阶段,每个阶段由一个或多个Actor(参与者)负责。数据在这些Actor之间像流水线一样依次传递,每个Actor处理完数据后将其传递给下一个Actor,直到最终处理完成。
- 优点:提高了系统的吞吐量和响应性,各个阶段可以并行执行,而且易于维护和扩展,每个Actor可以独立开发和测试。
- 缺点:需要精心设计流水线的各个阶段和数据传递方式,以确保数据的正确流转和系统的稳定性。
- 适用场景:适用于处理流程较为复杂且可以明确划分为多个阶段的任务,如视频编码解码、图像处理流水线、网络数据包的处理等。
#include <iostream>
#include <thread>
#include <queue>
#include <mutex>
#include <condition_variable>// 第一个阶段 Actor
class FirstActor {
public:std::queue<int> input_queue;std::mutex mtx;std::condition_variable cv;bool is_finished = false;void run() {for (int i = 0; i < 5; ++i) {std::this_thread::sleep_for(std::chrono::milliseconds(100));{std::unique_lock<std::mutex> lock(mtx);input_queue.push(i);}cv.notify_one();}{std::unique_lock<std::mutex> lock(mtx);is_finished = true;}cv.notify_all();}
};// 第二个阶段 Actor
class SecondActor {
public:FirstActor& first_actor;std::queue<int> output_queue;std::mutex mtx;std::condition_variable cv;bool is_finished = false;SecondActor(FirstActor& fa) : first_actor(fa) {}void run() {while (true) {int data;{std::unique_lock<std::mutex> lock(first_actor.mtx);first_actor.cv.wait(lock, [this] { return!first_actor.input_queue.empty() || first_actor.is_finished; });if (first_actor.input_queue.empty() && first_actor.is_finished) {break;}data = first_actor.input_queue.front();first_actor.input_queue.pop();}data *= 2;{std::unique_lock<std::mutex> lock(mtx);output_queue.push(data);}cv.notify_one();}{std::unique_lock<std::mutex> lock(mtx);is_finished = true;}cv.notify_all();}
};// 第三个阶段 Actor
class ThirdActor {
public:SecondActor& second_actor;ThirdActor(SecondActor& sa) : second_actor(sa) {}void run() {while (true) {int data;{std::unique_lock<std::mutex> lock(second_actor.mtx);second_actor.cv.wait(lock, [this] { return!second_actor.output_queue.empty() || second_actor.is_finished; });if (second_actor.output_queue.empty() && second_actor.is_finished) {break;}data = second_actor.output_queue.front();second_actor.output_queue.pop();}std::cout << "Final result: " << data << std::endl;}}
};int main() {FirstActor first_actor;SecondActor second_actor(first_actor);ThirdActor third_actor(second_actor);std::thread t1(&FirstActor::run, &first_actor);std::thread t2(&SecondActor::run, &second_actor);std::thread t3(&ThirdActor::run, &third_actor);t1.join();t2.join();t3.join();return 0;
}
6、 Pipeline模式概述
Pipeline(流水线)模式是一种将一个复杂任务分解为多个独立子任务,并让这些子任务像流水线一样依次执行的设计模式。每个子任务负责处理一部分工作,处理完后将结果传递给下一个子任务,最终完成整个复杂任务。这种模式可以提高系统的并发性能和可维护性,因为每个子任务可以独立开发、测试和优化,而且不同的子任务可以并行执行。
应用场景
- 数据处理流程:例如在图像或视频处理中,一个完整的处理流程可能包括图像读取、降噪、增强、裁剪等多个步骤,每个步骤可以作为一个独立的子任务在流水线上执行。
- 网络请求处理:在网络服务器中,一个请求可能需要经过接收、解析、验证、业务逻辑处理、响应生成等多个阶段,使用流水线模式可以高效地处理大量请求。
C++实现示例
以下是一个简单的C++示例,模拟一个简单的数据处理流水线,包含三个子任务:数据生成、数据处理和数据输出。
#include <iostream>
#include <queue>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <vector>// 第一个阶段:数据生成
class DataGenerator {
public:std::queue<int> outputQueue;std::mutex mtx;std::condition_variable cv;bool isFinished = false;void run() {for (int i = 0; i < 10; ++i) {std::this_thread::sleep_for(std::chrono::milliseconds(100));{std::unique_lock<std::mutex> lock(mtx);outputQueue.push(i);}cv.notify_one();}{std::unique_lock<std::mutex> lock(mtx);isFinished = true;}cv.notify_all();}
};// 第二个阶段:数据处理
class DataProcessor {
public:DataGenerator& generator;std::queue<int> outputQueue;std::mutex mtx;std::condition_variable cv;bool isFinished = false;DataProcessor(DataGenerator& gen) : generator(gen) {}void run() {while (true) {int data;{std::unique_lock<std::mutex> lock(generator.mtx);generator.cv.wait(lock, [this] { return!generator.outputQueue.empty() || generator.isFinished; });if (generator.outputQueue.empty() && generator.isFinished) {break;}data = generator.outputQueue.front();generator.outputQueue.pop();}// 简单的数据处理,这里将数据加倍data *= 2;{std::unique_lock<std::mutex> lock(mtx);outputQueue.push(data);}cv.notify_one();}{std::unique_lock<std::mutex> lock(mtx);isFinished = true;}cv.notify_all();}
};// 第三个阶段:数据输出
class DataOutputter {
public:DataProcessor& processor;DataOutputter(DataProcessor& proc) : processor(proc) {}void run() {while (true) {int data;{std::unique_lock<std::mutex> lock(processor.mtx);processor.cv.wait(lock, [this] { return!processor.outputQueue.empty() || processor.isFinished; });if (processor.outputQueue.empty() && processor.isFinished) {break;}data = processor.outputQueue.front();processor.outputQueue.pop();}std::cout << "Processed data: " << data << std::endl;}}
};int main() {DataGenerator generator;DataProcessor processor(generator);DataOutputter outputter(processor);std::thread generatorThread(&DataGenerator::run, &generator);std::thread processorThread(&DataProcessor::run, &processor);std::thread outputterThread(&DataOutputter::run, &outputter);generatorThread.join();processorThread.join();outputterThread.join();return 0;
}
代码解释
- DataGenerator类:负责生成数据,将生成的数据放入输出队列
outputQueue
,并通过条件变量cv
通知下一个阶段有新数据可用。 - DataProcessor类:从
DataGenerator
的输出队列中获取数据,对数据进行处理(这里简单地将数据加倍),然后将处理后的数据放入自己的输出队列,并通知下一个阶段。 - DataOutputter类:从
DataProcessor
的输出队列中获取数据,并将其输出到控制台。 - main函数:创建三个线程分别运行三个阶段的任务,并等待所有线程执行完毕。
通过这种方式,数据在三个阶段之间依次传递,形成了一个简单的流水线。每个阶段可以独立运行,提高了系统的并发性能。