使用原线程池 当 push 和 pop的对象过大时,消耗时延过高,需优化线程池
采用 std::move()+ unique_ptr的方法,能极大的减少时延,
实际就是避免了多次拷贝,直接使用指针。
代码实现
ThreadPool
#ifndef _THREAD_TOOL_H_
#define _THREAD_TOOL_H_#include<iostream>
#include<pthread.h>
#include<queue>
#include<ctime>
#include<cstdlib>
#include<Tools/Misc.h>
#include<deque>
#include<memory>
#define NUM 5
using namespace std;template<class T>
class ThreadPool
{
public:ThreadPool(int cap = NUM) : thread_num(cap){pthread_mutex_init(&_lock, nullptr);pthread_cond_init(&_cond, nullptr);}void LockQueue(){pthread_mutex_lock(&_lock);}void UnlockQueue(){pthread_mutex_unlock(&_lock);}void Wait(){pthread_cond_wait(&_cond, &_lock);}void Wake(){pthread_cond_signal(&_cond);}bool isQueueEmpty(){return _task_queue.size() == 0 ? true : false;}static void* Rountine(void *arg){pthread_detach(pthread_self());ThreadPool* self = (ThreadPool*)arg;while (true){self->LockQueue();while (self->isQueueEmpty()) //避免被虚假唤醒{self->Wait();}std::unique_ptr<T> t;self->Pop(t);self->UnlockQueue();// 执行任务t->run();}}void Thread_pool_init(){pthread_t tids[thread_num];for (int i = 0; i < thread_num; i++){pthread_create(&tids[i], nullptr, Rountine, this);}}void push(std::unique_ptr<T>&& in){LockQueue();_task_queue.push_back(std::move(in)); // 使用 move 将任务指针传入队列Wake();UnlockQueue();}// Pop 时返回 unique_ptr<T> 类型的任务指针void Pop(std::unique_ptr<T>& out){out = std::move(_task_queue.front());_task_queue.pop_front();}~ThreadPool(){pthread_mutex_destroy(&_lock);pthread_cond_destroy(&_cond);}private:pthread_mutex_t _lock;pthread_cond_t _cond;deque<std::unique_ptr<T>> _task_queue; // 存储智能指针int thread_num;
};#endif
task 任务
#ifndef __TASK__H_
#define __TASK__H_#include <iostream>
#include <functional>
#include <thread>
#include <chrono>#include <iostream>
#include <utility> template<class T>
class Task
{
public:Task() {executeFunc = nullptr;}Task(void(*func)(T), T data): executeFunc(func), _data(std::move(data)) {}Task(const Task<T>& other): executeFunc(other.executeFunc), _data(other._data) {}~Task() = default;Task<T>& operator=(Task<T> other) { std::swap(executeFunc, other.executeFunc);std::swap(_data, other._data);return *this;}void run() {if (executeFunc) {executeFunc(_data);}}public:void (*executeFunc)(T);T _data;
};#endif
main.cc
#include "task.h"
#include "ThreadPool.hpp"
#include <stdio.h>
#include <random>#define TASK_NUM 10static void DoJob(vector<int> data)
{for(int i =0 ;i< data.size(); i++){printf("data = %d ", data[i]);}printf("\n");
}int main()
{uint16_t num_threads = std::thread::hardware_concurrency();printf("thread_nums %d\n", num_threads);std::unique_ptr<ThreadPool<Task<vector<int>>>> tp(new ThreadPool<Task<vector<int>>>(num_threads));tp->Thread_pool_init();std::vector<int> v1;std::random_device rd;std::mt19937 gen(rd());// 定义范围int min = 10;int max = 50;while(true){// 定义均匀分布std::uniform_int_distribution<> dis(min, max);// 生成随机数int random_number = dis(gen);v1.push_back(random_number);if(v1.size() == TASK_NUM){auto task = std::make_unique<Task<std::vector<int>>>(DoJob, std::move(v1));tp->push(std::move(task));}}return 0;
}
此线程池 可以极大的减少拷贝,降低时延,并根据当前硬件的核数,开启对应的线程数量。