Linux从0到1——线程池
- 1. 简易日志系统
- 1.1 可变参数列表
- 1.2 设计日志
- 1.3 改进
- 2. 线程池
- 2.1 需要用到的头文件
- 2.2 需求分析
- 2.3 实现细节
- 2.4 完整代码
- 3. 改造单例版本
- 4. 改进思路
1. 简易日志系统
1.1 可变参数列表
1. printf
- 我们其实一直在使用可变参数列表,
printf
函数就使用了它。
- 第一个参数
format
是一个字符串,指定可变参数的数据格式。第二个参数...
就是可变参数。
2. 自己设计一个可变参数的接口,计算任意整数的和
#include <iostream>
#include <cstdarg> // 包含处理可变参数的宏// 函数声明
int sum(int count, ...);// 函数定义
int sum(int count, ...)
{int total = 0;va_list args; // 定义一个 va_list 对象,这是一个char *类型(或者void *)的指针// 初始化 va_list 对象va_start(args, count);// 遍历所有参数for (int i = 0; i < count; ++i) {total += va_arg(args, int); // 获取下一个参数并累加}// 清理 va_list 对象va_end(args);return total;
}int main()
{// 测试函数int result = sum(5, 1, 2, 3, 4, 5);std::cout << "Sum: " << result << std::endl;result = sum(3, 10, 20, 30);std::cout << "Sum: " << result << std::endl;return 0;
}
va_list
:是一个char*
或void*
的指针类型,不同编译器可能有所差别。va_start
:初始化va_list
对象,使其指向第一个可变参数。第二个参数固定传可变参数的前一个参数。va_arg
:从可变参数列表中获取下一个参数。需要指定参数的类型(在这里是int
)。va_end
:清理va_list
对象,确保资源被正确释放(就是将该指针置空)。
1.2 设计日志
1. 需求分析
- 首先,这个日志中的信息要有各种等级,来表明各个信息的重要程度,和他们的意义;
- 日志信息要能向显示器写入,也能向文件写入。向文件写入时,可以向一个文件写入,也可以按信息不同等级,向不同等级文件写入;
- 每一条日志信息的组成:[等级][写入时间][进程id] 用户自定义信息。
2. 大致思路
- 设计两个枚举类型,一个来表示信息等级,一个来表示信息输出的方式。
log
类对外只暴漏一个接口,功能是写入日志信息。
3. 完整的日志类实现Log.hpp
#pragma once#include<iostream>
#include<string>
#include<cstdarg>
#include<ctime>
#include<fstream>
#include<sys/types.h>
#include<sys/stat.h>
#include<fcntl.h>
#include<unistd.h>enum
{Debug = 0,Info, // 正常信息Warning, // 告警Error, // 错误Fatal // 致命错误
};enum
{Screen = 0, // 向显示器打印OneFile, // 向一个文件打印ClassFile // 向多个文件打印
};// 将日志等级转换为string
std::string LevelToString(int level)
{switch(level){case Debug:return "Debug";case Info:return "Info";case Warning:return "Warning";case Error:return "Error";case Fatal:return "Fatal";default:return "Unknown";}
}const int defaultstyle = Screen; // 默认风格是向显示器打印
const std::string default_filename = "log."; // 默认文件名
const std::string logdir = "log"; // 默认日志文件夹class Log
{
public:Log():_style(defaultstyle),_filename(default_filename){mkdir(logdir.c_str(), 0775);}void Enable(int style){_style = style;}std::string TimeStampExLocalTime(){time_t currtime = time(nullptr);struct tm *curr = localtime(&currtime);char time_buffer[128];snprintf(time_buffer, sizeof(time_buffer), "%d-%d-%d %d:%d:%d", \curr->tm_year+1900, curr->tm_mon+1, curr->tm_mday,\curr->tm_hour, curr->tm_min, curr->tm_sec);return time_buffer;}void WriteLogToOneFile(const std::string &logname, const std::string &message){umask(0);int fd = open(logname.c_str(), O_CREAT | O_WRONLY | O_APPEND, 0666);if(fd < 0) return;write(fd, message.c_str(), message.size());close(fd);}void WriteLogToClassFile(const std::string &levelstr, const std::string &message){std::string logname = logdir;logname += "/";logname += _filename;logname += levelstr;WriteLogToOneFile(logname, message);}void WriteLog(const std::string &levelstr, const std::string &message){switch(_style){case Screen:std::cout << message;break;case OneFile:WriteLogToClassFile("all", message);break;case ClassFile:WriteLogToClassFile(levelstr, message);break;default:break;}}void LogMessage(int level, const char* format, ...) // 类C的一个日志接口{char rightbuffer[1024];va_list args; // 这是一个char *类型(或者void *)的指针va_start(args, format); // 让arg指向可变部分vsnprintf(rightbuffer, sizeof(rightbuffer), format, args); // 将可变部分按照指定格式写入到content中va_end(args); // 释放args, args = nullptrchar leftbuffer[1024];std::string levelstr = LevelToString(level);std::string currtime = TimeStampExLocalTime(); // 获取当前时间std::string idstr = std::to_string(getpid());snprintf(leftbuffer, sizeof(leftbuffer), "[%s][%s][%s] ", \levelstr.c_str(), currtime.c_str(), idstr.c_str());std::string loginfo = leftbuffer;loginfo += rightbuffer;WriteLog(levelstr, loginfo);}~Log(){}
private:int _style;std::string _filename;
};
LogMessage
是对外暴漏的接口,用户可以通过该接口,写入格式化的日志信息。第一个参数是信息等级,第二个是信息格式,第三个是可变参数列表。- 日志在使用时需要
Enable
使能,指定日志的输出方式。 - 日志信息统一放在一个的文件夹中,名字默认是
log
。向文件中写入时,文件名的格式是:log.等级
。其中如果指定向一个文件中写入,文件名是log.all
。
4. 测试代码:
int main()
{Log log;log.Enable(OneFile);log.LogMessage(Debug, "hello %d world %lf\n", 10, 3.14);log.LogMessage(Error, "hello %d world %lf\n", 10, 3.14);log.LogMessage(Warning, "hello %d world %lf\n", 10, 3.14);log.LogMessage(Info, "hello %d world %lf\n", 10, 3.14);log.LogMessage(Fatal, "hello %d world %lf\n", 10, 3.14);log.LogMessage(Debug, "hello %d world %lf\n", 10, 3.14);log.LogMessage(Debug, "hello %d world %lf\n", 10, 3.14);return 0;
}
1.3 改进
1. 线程安全问题
- 向文件或显示器写入时,不是线程安全的,可以考虑对
WriteLog
方法上锁。
#include"LockGuard.hpp"...class Log
{
public:Log():_style(defaultstyle),_filename(default_filename){mkdir(logdir.c_str(), 0775);pthread_mutex_init(&_mutex, nullptr);}...void LogMessage(int level, const char* format, ...) // 类C的一个日志接口{...{LockGuard lockguard(&_mutex);WriteLog(levelstr, loginfo);}}~Log(){}
private:int _style;std::string _filename;pthread_mutex_t _mutex;
};
LockGuard.hpp
是我们之前实现过的守护锁。
2. 配置log
- 以下代码可以写在
Log.hpp
头文件中,配置log
只需要修改Config
类的构造函数即可。
// 配置log
Log log;class Config
{
public:Config(){// 在此配置log.Enable(ClassFile);}~Config(){}
};Config config;
2. 线程池
2.1 需要用到的头文件
1. Log.hpp
#pragma once#include<iostream>
#include<string>
#include<cstdarg>
#include<ctime>
#include<fstream>
#include<sys/types.h>
#include<sys/stat.h>
#include<fcntl.h>
#include<unistd.h>
#include"LockGuard.hpp"enum
{Debug = 0,Info, // 正常信息Warning, // 告警Error, // 错误Fatal // 致命错误
};enum
{Screen = 0, // 向显示器打印OneFile, // 向一个文件打印ClassFile // 向多个文件打印
};// 将日志等级转换为string
std::string LevelToString(int level)
{switch(level){case Debug:return "Debug";case Info:return "Info";case Warning:return "Warning";case Error:return "Error";case Fatal:return "Fatal";default:return "Unknown";}
}const int defaultstyle = Screen; // 默认风格是向显示器打印
const std::string default_filename = "log."; // 默认文件名
const std::string logdir = "log"; // 默认日志文件夹class Log
{
public:Log():_style(defaultstyle),_filename(default_filename){mkdir(logdir.c_str(), 0775);pthread_mutex_init(&_mutex, nullptr);}void Enable(int style){_style = style;}std::string TimeStampExLocalTime(){time_t currtime = time(nullptr);struct tm *curr = localtime(&currtime);char time_buffer[128];snprintf(time_buffer, sizeof(time_buffer), "%d-%d-%d %d:%d:%d", \curr->tm_year+1900, curr->tm_mon+1, curr->tm_mday,\curr->tm_hour, curr->tm_min, curr->tm_sec);return time_buffer;}void WriteLogToOneFile(const std::string &logname, const std::string &message){umask(0);int fd = open(logname.c_str(), O_CREAT | O_WRONLY | O_APPEND, 0666);if(fd < 0) return;write(fd, message.c_str(), message.size());close(fd);}void WriteLogToClassFile(const std::string &levelstr, const std::string &message){std::string logname = logdir;logname += "/";logname += _filename;logname += levelstr;WriteLogToOneFile(logname, message);}void WriteLog(const std::string &levelstr, const std::string &message){switch(_style){case Screen:std::cout << message;break;case OneFile:WriteLogToClassFile("all", message);break;case ClassFile:WriteLogToClassFile(levelstr, message);break;default:break;}}void LogMessage(int level, const char* format, ...) // 类C的一个日志接口{char rightbuffer[1024];va_list args; // 这是一个char *类型(或者void *)的指针va_start(args, format); // 让arg指向可变部分vsnprintf(rightbuffer, sizeof(rightbuffer), format, args); // 将可变部分按照指定格式写入到content中va_end(args); // 释放args, args = nullptrchar leftbuffer[1024];std::string levelstr = LevelToString(level);std::string currtime = TimeStampExLocalTime(); // 获取当前时间std::string idstr = std::to_string(getpid());snprintf(leftbuffer, sizeof(leftbuffer), "[%s][%s][%s] ", \levelstr.c_str(), currtime.c_str(), idstr.c_str());std::string loginfo = leftbuffer;loginfo += rightbuffer;{LockGuard lockguard(&_mutex);WriteLog(levelstr, loginfo);}}~Log(){}
private:int _style;std::string _filename;pthread_mutex_t _mutex;
};// 配置log
Log log;class Config
{
public:Config(){// 在此配置log.Enable(ClassFile);}~Config(){}
};Config config;
2. Thread.hpp
#pragma once#include <iostream>
#include <string>
#include <functional>template<class T>
using func_t = std::function<void(T&)>;template<class T>
class Thread
{
public:Thread(const std::string &threadname, func_t<T> func, T &data):_tid(0), _threadname(threadname), _isrunning(false), _func(func), _data(data){}static void *ThreadRoutine(void* args){Thread *ts = static_cast<Thread *>(args);ts->_func(ts->_data);return nullptr;}bool Start(){int n = pthread_create(&_tid, nullptr, ThreadRoutine, this);if(n == 0) {_isrunning = true;return true;}else return false;}bool Join(){if(!_isrunning) return false;int n = pthread_join(_tid, nullptr);if(n == 0){_isrunning = false;return true;}return false;}bool IsRunning(){return _isrunning;}std::string ThreadName(){return _threadname;}~Thread(){}private:pthread_t _tid; // 库级别线程idstd::string _threadname; // 线程名bool _isrunning; // 运行状态func_t<T> _func; // 线程执行的回调方法T _data;
};
3. LockGuard.hpp
#pragma once#include <pthread.h>// 不定义锁,默认认为外部会给我们传入锁对象
class Mutex
{
public:Mutex(pthread_mutex_t *lock):_lock(lock){}void Lock(){pthread_mutex_lock(_lock);}void Unlock(){pthread_mutex_unlock(_lock);}~Mutex(){}private:pthread_mutex_t *_lock;
};class LockGuard
{
public:LockGuard(pthread_mutex_t *lock):_mutex(lock){_mutex.Lock();}~LockGuard(){_mutex.Unlock();}private:Mutex _mutex;
};
4. Task.hpp
#pragma once
#include <iostream>
#include <string>
#include <unistd.h>const int defaultvalue = 0;enum
{ok = 0,div_zero, // 除0错误mod_zero, // 模0错误unknow // 未知错误
};const std::string opers = "+-*/%)(&";class Task
{
public:Task(){}Task(int x, int y, char op): data_x(x), data_y(y), oper(op), result(defaultvalue), code(ok){}void Run(){switch (oper){case '+':result = data_x + data_y;break;case '-':result = data_x - data_y;break;case '*':result = data_x * data_y;break;case '/':{if (data_y == 0)code = div_zero;elseresult = data_x / data_y;}break;case '%':{if (data_y == 0)code = mod_zero;elseresult = data_x % data_y;}break;default:code = unknow;break;}}std::string PrintTask(){std::string s;s = std::to_string(data_x);s += oper;s += std::to_string(data_y);s += "=?";return s;}std::string PrintResult(){std::string s;s = std::to_string(data_x);s += oper;s += std::to_string(data_y);s += "=";s += std::to_string(result);s += " [";s += std::to_string(code);s += "]";return s;}void operator()(){Run();}~Task(){}private:int data_x;int data_y;char oper; // + - * / %int result;int code; // 结果码,0: 结果可信 !0: 结果不可信,1,2,3,4
};
2.2 需求分析
这个线程池要有一个任务队列queue
,和一个线程数组vector
,任务由外部Push
进线程池内部的任务队列中,多个线程并发的从任务队列拿任务,并执行。
并发访问任务队列时,会有线程安全问题,需要控制同步和互斥:
Push
操作一定是互斥的,Pop
操作一定是互斥的;- 如果任务队列为空,则让线程等待,
Push
进去一个任务,就激活一个线程。
每一个线程需要执行的逻辑是,先从任务队列中拿任务,如果没拿到(说明任务队列为空),就先等待;拿到了就立即执行该任务。
2.3 实现细节
1. 线程池的内部成员变量
- 一个任务队列;
- 一个线程数组;
- 一个整数类型
_thread_num
记录线程池中的线程数量; - 一个互斥量和一个条件变量,用来控制访问任务队列时的同步和互斥。
template <class T>
class ThreadPool
{...
private:std::queue<T> _q;std::vector<Thread<ThreadData>> _threads; // ThreadData是线程信息类int _thread_num; // 线程个数pthread_mutex_t _mutex;pthread_cond_t _cond;
};
2. 设计一个线程信息类,存储每个线程的名字(可扩展)
class ThreadData
{
public:ThreadData(const std::string &threadname):_threadname(threadname){}~ThreadData(){}public:std::string _threadname;
};
3. 构造函数和析构函数
- 构造函数需要初始化互斥量和条件变量,并且创建指定个数的线程,不启动;
- 析构函数只需要释放互斥量和条件变量即可。
static const int default_num = 5; // 默认线程池中的线程个数template <class T>
class ThreadPool
{
public:ThreadPool(int thread_num = default_num): _thread_num(thread_num){pthread_mutex_init(&_mutex, nullptr);pthread_cond_init(&_cond, nullptr);// 构建指定个数的线程for (int i = 0; i < _thread_num; i++){// 待优化std::string threadname = "thread-";threadname += std::to_string(i + 1);ThreadData td(threadname);Thread<ThreadData> t(threadname, \std::bind(&ThreadPool<T>::ThreadRun, \this, std::placeholders::_1), td);_threads.push_back(t);log.LogMessage(Info, "%s is created...\n", threadname.c_str());}}~ThreadPool(){pthread_mutex_destroy(&_mutex);pthread_cond_destroy(&_cond);}...private:std::queue<T> _q;std::vector<Thread<ThreadData>> _threads;int _thread_num; // 线程个数pthread_mutex_t _mutex;pthread_cond_t _cond;
};
std::bind(&ThreadPool<T>::ThreadRun, this, std::placeholders::_1)
是一个函数模版对象,其中ThreadRun
为每一个线程要执行的方法,这个方法是一个成员函数。
4. ThreadRun方法
- 向队列中拿任务时,需要上锁;队列中没有任务时,需要让线程等待。
template <class T>
class ThreadPool
{
public:void ThreadWait(const ThreadData &td){log.LogMessage(Debug, "no task, %s is sleeping...\n", td._threadname.c_str());pthread_cond_wait(&_cond, &_mutex);}void ThreadRun(ThreadData &td){while (true){// 取任务T t;{LockGuard lockguard(&_mutex);while (_q.empty()){ThreadWait(td);log.LogMessage(Debug, "thread, %s wake up.\n", td._threadname.c_str());}t = _q.front();_q.pop();}// 处理任务t();log.LogMessage(Info, "%s handler task %s done, result is: %s\n", \td._threadname.c_str(), t.PrintTask().c_str(), t.PrintResult().c_str());}}...private:std::queue<T> _q;std::vector<Thread<ThreadData>> _threads;int _thread_num; // 线程个数pthread_mutex_t _mutex;pthread_cond_t _cond;
};
5. Push方法,向任务队列中放任务
- 访问队列时上锁,
Push
完成后激活一个等待的线程。
template <class T>
class ThreadPool
{
public:void ThreadWakeup(){pthread_cond_signal(&_cond);}void Push(T &in){log.LogMessage(Debug, "other thread push a task, task is: %s\n", in.PrintTask().c_str());LockGuard lockgurad(&_mutex);_q.push(in);ThreadWakeup();}...private:std::queue<T> _q;std::vector<Thread<ThreadData>> _threads;int _thread_num; // 线程个数pthread_mutex_t _mutex;pthread_cond_t _cond;
};
6. 启动线程池,join线程池中的线程
template <class T>
class ThreadPool
{
public:bool Start(){// 启动for (auto &thread : _threads){thread.Start();log.LogMessage(Info, "%s is running ...\n", thread.ThreadName().c_str());}return true;}// for debugvoid Wait(){for (auto &thread : _threads){thread.Join();}}...private:std::queue<T> _q;std::vector<Thread<ThreadData>> _threads;int _thread_num; // 线程个数pthread_mutex_t _mutex;pthread_cond_t _cond;
};
2.4 完整代码
#pragma once#include <iostream>
#include <queue>
#include <pthread.h>
#include <vector>
#include <functional>
#include "Log.hpp"
#include "Thread.hpp"
#include "LockGuard.hpp"static const int default_num = 5; // 默认线程池中的线程个数class ThreadData
{
public:ThreadData(const std::string &threadname):_threadname(threadname){}~ThreadData(){}public:std::string _threadname;
};template <class T>
class ThreadPool
{
public:ThreadPool(int thread_num = default_num): _thread_num(thread_num){pthread_mutex_init(&_mutex, nullptr);pthread_cond_init(&_cond, nullptr);// 构建指定个数的线程for (int i = 0; i < _thread_num; i++){// 待优化std::string threadname = "thread-";threadname += std::to_string(i + 1);ThreadData td(threadname);Thread<ThreadData> t(threadname, \std::bind(&ThreadPool<T>::ThreadRun, \this, std::placeholders::_1), td);_threads.push_back(t);log.LogMessage(Info, "%s is created...\n", threadname.c_str());}}bool Start(){// 启动for (auto &thread : _threads){thread.Start();log.LogMessage(Info, "%s is running ...\n", thread.ThreadName().c_str());}return true;}void ThreadWait(const ThreadData &td){log.LogMessage(Debug, "no task, %s is sleeping...\n", td._threadname.c_str());pthread_cond_wait(&_cond, &_mutex);}void ThreadWakeup(){pthread_cond_signal(&_cond);}void ThreadRun(ThreadData &td){while (true){// 取任务T t;{LockGuard lockguard(&_mutex);while (_q.empty()){ThreadWait(td);log.LogMessage(Debug, "thread, %s wake up.\n", td._threadname.c_str());}t = _q.front();_q.pop();}// 处理任务t();log.LogMessage(Info, "%s handler task %s done, result is: %s\n", \td._threadname.c_str(), t.PrintTask().c_str(), t.PrintResult().c_str());}}void Push(T &in){log.LogMessage(Debug, "other thread push a task, task is: %s\n", in.PrintTask().c_str());LockGuard lockgurad(&_mutex);_q.push(in);ThreadWakeup();}// for debugvoid Wait(){for (auto &thread : _threads){thread.Join();}}~ThreadPool(){pthread_mutex_destroy(&_mutex);pthread_cond_destroy(&_cond);}private:std::queue<T> _q;std::vector<Thread<ThreadData>> _threads;int _thread_num; // 线程个数pthread_mutex_t _mutex;pthread_cond_t _cond;
};
测试代码
#include<iostream>
#include<memory>
#include"ThreadPool.hpp"
#include"Task.hpp"
#include"Log.hpp"int main()
{std::unique_ptr<ThreadPool<Task>> tp(new ThreadPool<Task>());tp->Start();srand((uint64_t)time(nullptr) ^ getpid());while(true){int x = rand() % 100 + 1;usleep(1234);int y = rand() % 200;usleep(1234);char oper = opers[rand() % opers.size()];Task t(x, y, oper);tp->Push(t);sleep(1);}tp->Wait();return 0;
}
3. 改造单例版本
线程池一般在整个程序中只有唯一一份,适合用单例模式实现,下面实现“懒汉”线程池。
第一步:构造函数私有化,并禁掉拷贝构造和赋值
template <class T>
class ThreadPool
{
private:ThreadPool(int thread_num = default_num): _thread_num(thread_num){pthread_mutex_init(&_mutex, nullptr);pthread_cond_init(&_cond, nullptr);// 构建指定个数的线程for (int i = 0; i < _thread_num; i++){// 待优化std::string threadname = "thread-";threadname += std::to_string(i + 1);ThreadData td(threadname);Thread<ThreadData> t(threadname, \std::bind(&ThreadPool<T>::ThreadRun, \this, std::placeholders::_1), td);_threads.push_back(t);log.LogMessage(Info, "%s is created...\n", threadname.c_str());}}ThreadPool(const ThreadPool<T> &tp) = delete;const ThreadPool<T> &operator=(const ThreadPool<T> &tp) = delete;public:...private:...
};
第二步:提供静态指针instance,并提供静态方法GetInstance获取静态指针
template <class T>
class ThreadPool
{
private:// 构造函数私有化,并禁掉拷贝构造和赋值...public:// 有线程安全问题static ThreadPool<T> *GetInstance(){LockGuard lockguard(&sig_lock);if(instance == nullptr){log.LogMessage(Info, "创建单例成功...\n");instance = new ThreadPool<T>();}return instance;}private:// 成员变量...static ThreadPool<T> *instance;static pthread_mutex_t sig_lock;
};template<class T>
ThreadPool<T>* ThreadPool<T>::instance = nullptr;template<class T>
pthread_mutex_t ThreadPool<T>::sig_lock = PTHREAD_MUTEX_INITIALIZER;
特别注意:
GetInstance
函数存在线程安全问题,同时可能有多个线程满足instance == nullptr
,从而错误的调用多次new
方法,所以需要对GetInstance
函数整个上锁;- 这把锁保护的是
instance
,所以需要我们设计一个新的互斥量,并且因为GetInstance
方法是静态的,没有this
指针,所以这个新的互斥量也要是静态的; - 可是如果是上面这种写法,效率是很低的,在
instance
不为空时,任何线程想要调用GetInstance
应该是立即返回的,但是此时却要回回上锁; - 下面是
GetInstance
的改良版,保证了在instance
不为空时,任何线程想要调用GetInstance
会立即返回,不需要上锁。
template <class T>
class ThreadPool
{
private:...public:// 有线程安全问题static ThreadPool<T> *GetInstance(){if(instance == nullptr){LockGuard lockguard(&sig_lock);if(instance == nullptr){log.LogMessage(Info, "创建单例成功...\n");instance = new ThreadPool<T>();}}return instance;}private:...static ThreadPool<T> *instance;static pthread_mutex_t sig_lock;
};template<class T>
ThreadPool<T>* ThreadPool<T>::instance = nullptr;template<class T>
pthread_mutex_t ThreadPool<T>::sig_lock = PTHREAD_MUTEX_INITIALIZER;
完整单例线程池代码
#pragma once#include <iostream>
#include <queue>
#include <pthread.h>
#include <vector>
#include <functional>
#include "Log.hpp"
#include "Thread.hpp"
#include "LockGuard.hpp"static const int default_num = 5; // 默认线程池中的线程个数class ThreadData
{
public:ThreadData(const std::string &threadname):_threadname(threadname){}~ThreadData(){}public:std::string _threadname;
};template <class T>
class ThreadPool
{
private:ThreadPool(int thread_num = default_num): _thread_num(thread_num){pthread_mutex_init(&_mutex, nullptr);pthread_cond_init(&_cond, nullptr);// 构建指定个数的线程for (int i = 0; i < _thread_num; i++){// 待优化std::string threadname = "thread-";threadname += std::to_string(i + 1);ThreadData td(threadname);Thread<ThreadData> t(threadname, \std::bind(&ThreadPool<T>::ThreadRun, \this, std::placeholders::_1), td);_threads.push_back(t);log.LogMessage(Info, "%s is created...\n", threadname.c_str());}}ThreadPool(const ThreadPool<T> &tp) = delete;const ThreadPool<T> &operator=(const ThreadPool<T> &tp) = delete;public:// 有线程安全问题static ThreadPool<T> *GetInstance(){if(instance == nullptr){LockGuard lockguard(&sig_lock);if(instance == nullptr){log.LogMessage(Info, "创建单例成功...\n");instance = new ThreadPool<T>();}}return instance;}bool Start(){// 启动for (auto &thread : _threads){thread.Start();log.LogMessage(Info, "%s is running ...\n", thread.ThreadName().c_str());}return true;}void ThreadWait(const ThreadData &td){log.LogMessage(Debug, "no task, %s is sleeping...\n", td._threadname.c_str());pthread_cond_wait(&_cond, &_mutex);}void ThreadWakeup(){pthread_cond_signal(&_cond);}void ThreadRun(ThreadData &td){while (true){// 取任务T t;{LockGuard lockguard(&_mutex);while (_q.empty()){ThreadWait(td);log.LogMessage(Debug, "thread, %s wake up.\n", td._threadname.c_str());}t = _q.front();_q.pop();}// 处理任务t();log.LogMessage(Info, "%s handler task %s done, result is: %s\n", \td._threadname.c_str(), t.PrintTask().c_str(), t.PrintResult().c_str());}}void Push(T &in){log.LogMessage(Debug, "other thread push a task, task is: %s\n", in.PrintTask().c_str());{LockGuard lockgurad(&_mutex);_q.push(in);ThreadWakeup();}}// for debugvoid Wait(){for (auto &thread : _threads){thread.Join();}}~ThreadPool(){pthread_mutex_destroy(&_mutex);pthread_cond_destroy(&_cond);}private:std::queue<T> _q; // 任务队列std::vector<Thread<ThreadData>> _threads;int _thread_num; // 线程个数pthread_mutex_t _mutex;pthread_cond_t _cond;static ThreadPool<T> *instance;static pthread_mutex_t sig_lock
};template<class T>
ThreadPool<T>* ThreadPool<T>::instance = nullptr;template<class T>
pthread_mutex_t ThreadPool<T>::sig_lock = PTHREAD_MUTEX_INITIALIZER;
测试代码
#include<iostream>
#include<memory>
#include"ThreadPool.hpp"
#include"Task.hpp"
#include"Log.hpp"int main()
{ThreadPool<Task>::GetInstance()->Start();srand((uint64_t)time(nullptr) ^ getpid());while(true){int x = rand() % 100 + 1;usleep(1234);int y = rand() % 200;usleep(1234);char oper = opers[rand() % opers.size()];Task t(x, y, oper);ThreadPool<Task>::GetInstance()->Push(t);sleep(1);}ThreadPool<Task>::GetInstance()->Wait();return 0;
}
4. 改进思路
设计一个动态线程池,可以根据当前的线程池中的线程数量和任务数量,动态创建或销毁线程。
具体思路以伪代码的形式呈现给大家,感兴趣的小伙伴可以自己实现:
template <class T>
class ThreadPool
{
private:...public:...void CheckSelf(){// 1. _task_num > _task_num_high_water && _thread_num < _thread_num_high_water// 创建更多线程,并更新_thread_num// 2. _task_num == _task_num_low_water && _thread_num >= _thread_num_high_water// 把自己退出了,并且更新_thread_num}void ThreadRun(ThreadData &td){while (true){// 取任务CheckSelf();...}}private:...// 扩展int _task_num; // 任务数量// 高低水位线int _thread_num_low_water;int _thread_num_high_water;int _task_num_low_water;int _task_num_high_water;
};