欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 健康 > 美食 > C++11工作窃取式线程池

C++11工作窃取式线程池

2024/11/30 12:49:35 来源:https://blog.csdn.net/m0_68224121/article/details/141231939  浏览:    关键词:C++11工作窃取式线程池

一、概述

        工作窃取式线程池采用了工作窃取算法,具体来说就是当某个线程执行完自己队列中的任务后,会从其他线程的队列中“偷取”任务来执行。这种算法可以提高线程利用率,减少线程之间的竞争,以及减小线程的等待时间。

        在同步队列中设计std::vector<std::list<T>>,使用该容器来存储任务,利用数组加链表,设置vector的大小为bucketsize,即一般为CPU核数,利用链表存放具体任务。当index下标的list容器中任务处理结束,则使用threadIndex生成bucketSize范围内不为index下标的new_index,然后线程可以去处理该list容器中的任务。

        设置m_waitTime,即为当任务缓冲区中满或者空时,生产者和消费者的等待时间(在此利用条件变量->wait_for)当发生超时,则生产者和消费者退出。在线程池中存在异步运行策略,即当添加任务失败时,可以先执行此任务。任务使用function和bind结合的形式进行传递,即using task=std::function<void(void)>,bind绑定任务函数名以及参数,通过function可调用对象包装器执行。

        WorkStealingPool可以设定多个工作线程,每个工作线程都有一个自己的任务队列,每个线程在执行任务时会首先从自己的队列中获取任务,如果自己队列为空,则从其他线程的队列中获取任务。这种设计可以充分发挥多核处理器的并行能力,提高整体的任务处理效率。

二、同步队列的设计

#ifndef SYNCQUEUE_HPP
#define SYNCQUEUE_HPP
#include<vector>
#include<list>
#include<mutex>
#include<condition_variable>
#include<iostream>
using namespace std;template<class T>
class SyncQueue
{
private:std::vector<std::list<T>> m_taskQueues;//任务队列size_t m_bucketSize;//桶的大小,vector的sizesize_t m_maxSize;//任务队列的大小mutable std::mutex m_mutex;std::condition_variable m_notEmpty;//对应于消费者std::condition_variable m_notFull;//对应于生产者size_t m_waitTime;//任务队列满等待时间bool m_needStop;//同步队列停止标志bool IsFull(const int index)const{bool full = m_taskQueues[index].size() >= m_maxSize;if (full){clog << "m_queue已经满了,需要等待..." << endl;}return full;}bool IsEmpty(const int index)const{bool empty = m_taskQueues[index].empty();if (empty){clog << "m_queue已经空了,需要等待..." << endl;}return empty;}template<class F>int Add(F&& task, const int index){std::unique_lock<std::mutex> locker(m_mutex);bool waitret = m_notFull.wait_for(locker, std::chrono::seconds(m_waitTime),[=] {return m_needStop || !IsFull(index); });if (!waitret){return 1;}if (m_needStop){return 2;}m_taskQueues[index].push_back(std::forward<F>(task));m_notEmpty.notify_all();return 0;}
public:SyncQueue(int bucketsize, int maxsize = 200, size_t timeout = 1) :m_bucketSize(bucketsize), m_maxSize(maxsize),m_needStop(false), m_waitTime(timeout){m_taskQueues.resize(m_bucketSize);}~SyncQueue() { }int Put(const T& task, const int index){return Add(task, index);}int Put(T&& task, const int index){return Add(std::forward<T>(task), index);}int Take(std::list<T>& list, const int index){std::unique_lock<std::mutex> locker(m_mutex);bool waitret = m_notEmpty.wait_for(locker, std::chrono::seconds(m_waitTime),[this, index]()->bool{return m_needStop||!IsEmpty(index)});if (!waitret){return 1;}if (m_needStop){return 2;}list = std::move(m_taskQueues[index]);m_notFull.notify_all;return 0;}int Take(T& task, const int index){std::unique_lock<std::mutex> locker(m_mutex);bool waitret = m_notEmpty.wait_for(locker, std::chrono::seconds(m_waitTime),[this, index]()->bool {return m_needStop || !IsEmpty(index); });if (!waitret){return 1;}if (m_needStop){return 2;}task = m_taskQueues[index].front();m_taskQueues[index].pop_front();m_notFull.notify_all();return 0;}void Stop(){std::unique_lock<std::mutex> locker(m_mutex);for (int i = 0; i < m_bucketSize; ++i){while (!m_needStop && !IsEmpty(i)){m_notFull.wait(locker);//生产者阻塞}}m_needStop = true;m_notEmpty.notify_all();m_notFull.notify_all();}size_t Size()const{std::unique_lock<std::mutex> locker(m_mutex);size_t sum = 0;for (auto& xlist : m_taskQueues){sum += xlist.size();}return sum;}
};#endif

三、线程池设计

#ifndef WORKSTEALINGPOOL_HPP
#define WORKSTEALINGPOOL_HPP
#include "SyncQueue.hpp"
#include<functional>
#include<future>
#include<memory>
#include<vector>
using namespace std;class WorkStealingPool
{
public:using Task = std::function<void(void)>;
private:size_t m_numThreads;//线程数量SyncQueue<Task> m_queue;std::vector<std::shared_ptr<std::thread>> m_threadgroup;//线程组std::atomic_bool m_running;std::once_flag m_flag;void Start(int numthreads){m_running = true;for (int i = 0; i < numthreads; ++i){m_threadgroup.push_back(std::make_shared<std::thread>(&WorkStealingPool::RunInThread, this, i));}}void RunInThread(const int index){while (m_running){std::list<Task> tasklist;if (m_queue.Take(tasklist, index) == 0){for (auto& task : tasklist){task();}}else{int i = threadIndex();if (i != index && m_queue.Take(tasklist, i) == 0){clog << "偷取任务成功..." << endl;for (auto& task : tasklist){clog << "偷取任务成功..." << endl;for (auto& task : tasklist){task();}}}}}}void StopThreadGroup(){m_queue.Stop();m_running = false;for (auto& tha : m_threadgroup){if (tha && tha->joinable()){tha->join();}}m_threadgroup.clear();}int threadIndex(){static int num = 0;return ++num % m_numThreads;}
public:WorkStealingPool(const int qusize = 100, const int numthreads = 8) :m_numThreads(numthreads),m_queue(m_numThreads, qusize), m_running(false){Start(m_numThreads);}~WorkStealingPool(){Stop();}void Stop(){std::call_once(m_flag, [this]()->void {StopThreadGroup(); });}template<class Func, class... Args>auto submit(Func&& func, Args&&... args){using RetType = decltype(func(args...));auto task = std::make_shared < std::packaged_task<RetType(void)>(std::bind(std::forward<Func>(func),std::forward<Args>(args)...));std::future<RetType> result = task->get_future();if (m_queue.Put([task](void)->void {(*task)(); }, threadIndex()) != 0){(*task)();}return result;}
};#endif

四、适用场景

1、任务分解型应用:当一个任务需要被分解成多个子任务进行并行处理时,工作窃取式线程池可以自动管理任务的分配和调度,充分利用多核处理器的并行能力,提高任务处理效率。

2、递归型任务:对于递归型的任务,workstealingpool能够适应任务的动态变化,根据需要创建和调度子任务,以实现更高效的递归执行。

3、高吞吐量任务:WorkStealingPool的工作窃取算法可以减小线程之间的竞争,并且能够在任务队列为空时从其他线程窃取任务,从而减少线程的等待时间,提高整体的任务处理吞吐量。适用于高吞吐量的任务场景。

4、CPU密集型任务:对于需要大量的CPU计算而没有I.O阻塞的任务,使用工作窃取式线程池可以更好地充分利用CPU核心,并且可以根据需要增加或减小线程数量,以适应任务的计算量。

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com