欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 财经 > 创投人物 > Linux从0到1——线程池【利用日志Debug】

Linux从0到1——线程池【利用日志Debug】

2025/2/23 13:38:23 来源:https://blog.csdn.net/weixin_73870552/article/details/145434855  浏览:    关键词:Linux从0到1——线程池【利用日志Debug】

Linux从0到1——线程池

  • 1. 简易日志系统
    • 1.1 可变参数列表
    • 1.2 设计日志
    • 1.3 改进
  • 2. 线程池
    • 2.1 需要用到的头文件
    • 2.2 需求分析
    • 2.3 实现细节
    • 2.4 完整代码
  • 3. 改造单例版本
  • 4. 改进思路


1. 简易日志系统


1.1 可变参数列表


1. printf

  • 我们其实一直在使用可变参数列表,printf函数就使用了它。

在这里插入图片描述

  • 第一个参数format是一个字符串,指定可变参数的数据格式。第二个参数...就是可变参数。

2. 自己设计一个可变参数的接口,计算任意整数的和

#include <iostream>
#include <cstdarg> // 包含处理可变参数的宏// 函数声明
int sum(int count, ...);// 函数定义
int sum(int count, ...) 
{int total = 0;va_list args; // 定义一个 va_list 对象,这是一个char *类型(或者void *)的指针// 初始化 va_list 对象va_start(args, count);// 遍历所有参数for (int i = 0; i < count; ++i) {total += va_arg(args, int); // 获取下一个参数并累加}// 清理 va_list 对象va_end(args);return total;
}int main() 
{// 测试函数int result = sum(5, 1, 2, 3, 4, 5);std::cout << "Sum: " << result << std::endl;result = sum(3, 10, 20, 30);std::cout << "Sum: " << result << std::endl;return 0;
}
  • va_list:是一个char*void*的指针类型,不同编译器可能有所差别。
  • va_start:初始化 va_list 对象,使其指向第一个可变参数。第二个参数固定传可变参数的前一个参数。
  • va_arg:从可变参数列表中获取下一个参数。需要指定参数的类型(在这里是 int)。
  • va_end:清理 va_list 对象,确保资源被正确释放(就是将该指针置空)。

在这里插入图片描述


1.2 设计日志


1. 需求分析

  • 首先,这个日志中的信息要有各种等级,来表明各个信息的重要程度,和他们的意义;
  • 日志信息要能向显示器写入,也能向文件写入。向文件写入时,可以向一个文件写入,也可以按信息不同等级,向不同等级文件写入;
  • 每一条日志信息的组成:[等级][写入时间][进程id] 用户自定义信息。

2. 大致思路

  • 设计两个枚举类型,一个来表示信息等级,一个来表示信息输出的方式。
  • log类对外只暴漏一个接口,功能是写入日志信息。

3. 完整的日志类实现Log.hpp

#pragma once#include<iostream>
#include<string>
#include<cstdarg>
#include<ctime>
#include<fstream>
#include<sys/types.h>
#include<sys/stat.h>
#include<fcntl.h>
#include<unistd.h>enum
{Debug = 0,Info,       // 正常信息Warning,    // 告警Error,      // 错误Fatal       // 致命错误
};enum
{Screen = 0,    // 向显示器打印OneFile,        // 向一个文件打印ClassFile       // 向多个文件打印
};// 将日志等级转换为string
std::string LevelToString(int level)
{switch(level){case Debug:return "Debug";case Info:return "Info";case Warning:return "Warning";case Error:return "Error";case Fatal:return "Fatal";default:return "Unknown";}
}const int defaultstyle = Screen;                // 默认风格是向显示器打印
const std::string default_filename = "log."; // 默认文件名
const std::string logdir = "log";              // 默认日志文件夹class Log
{
public:Log():_style(defaultstyle),_filename(default_filename){mkdir(logdir.c_str(), 0775);}void Enable(int style){_style = style;}std::string TimeStampExLocalTime(){time_t currtime = time(nullptr);struct tm *curr = localtime(&currtime);char time_buffer[128];snprintf(time_buffer, sizeof(time_buffer), "%d-%d-%d %d:%d:%d", \curr->tm_year+1900, curr->tm_mon+1, curr->tm_mday,\curr->tm_hour, curr->tm_min, curr->tm_sec);return time_buffer;}void WriteLogToOneFile(const std::string &logname, const std::string &message){umask(0);int fd = open(logname.c_str(), O_CREAT | O_WRONLY | O_APPEND, 0666);if(fd < 0) return;write(fd, message.c_str(), message.size());close(fd);}void WriteLogToClassFile(const std::string &levelstr, const std::string &message){std::string logname = logdir;logname += "/";logname += _filename;logname += levelstr;WriteLogToOneFile(logname, message);}void WriteLog(const std::string &levelstr, const std::string &message){switch(_style){case Screen:std::cout << message;break;case OneFile:WriteLogToClassFile("all", message);break;case ClassFile:WriteLogToClassFile(levelstr, message);break;default:break;}}void LogMessage(int level, const char* format, ...) // 类C的一个日志接口{char rightbuffer[1024];va_list args;    // 这是一个char *类型(或者void *)的指针va_start(args, format);     // 让arg指向可变部分vsnprintf(rightbuffer, sizeof(rightbuffer), format, args);  // 将可变部分按照指定格式写入到content中va_end(args);   // 释放args, args = nullptrchar leftbuffer[1024];std::string levelstr = LevelToString(level);std::string currtime = TimeStampExLocalTime();      // 获取当前时间std::string idstr = std::to_string(getpid());snprintf(leftbuffer, sizeof(leftbuffer), "[%s][%s][%s] ", \levelstr.c_str(), currtime.c_str(), idstr.c_str());std::string loginfo = leftbuffer;loginfo += rightbuffer;WriteLog(levelstr, loginfo);}~Log(){}
private:int _style;std::string _filename;
};
  • LogMessage是对外暴漏的接口,用户可以通过该接口,写入格式化的日志信息。第一个参数是信息等级,第二个是信息格式,第三个是可变参数列表。
  • 日志在使用时需要Enable使能,指定日志的输出方式。
  • 日志信息统一放在一个的文件夹中,名字默认是log。向文件中写入时,文件名的格式是:log.等级。其中如果指定向一个文件中写入,文件名是log.all

4. 测试代码:

int main()
{Log log;log.Enable(OneFile);log.LogMessage(Debug, "hello %d world %lf\n", 10, 3.14);log.LogMessage(Error, "hello %d world %lf\n", 10, 3.14);log.LogMessage(Warning, "hello %d world %lf\n", 10, 3.14);log.LogMessage(Info, "hello %d world %lf\n", 10, 3.14);log.LogMessage(Fatal, "hello %d world %lf\n", 10, 3.14);log.LogMessage(Debug, "hello %d world %lf\n", 10, 3.14);log.LogMessage(Debug, "hello %d world %lf\n", 10, 3.14);return 0;
}

在这里插入图片描述


1.3 改进


1. 线程安全问题

  • 向文件或显示器写入时,不是线程安全的,可以考虑对WriteLog方法上锁。
#include"LockGuard.hpp"...class Log
{
public:Log():_style(defaultstyle),_filename(default_filename){mkdir(logdir.c_str(), 0775);pthread_mutex_init(&_mutex, nullptr);}...void LogMessage(int level, const char* format, ...) // 类C的一个日志接口{...{LockGuard lockguard(&_mutex);WriteLog(levelstr, loginfo);}}~Log(){}
private:int _style;std::string _filename;pthread_mutex_t _mutex;
};
  • LockGuard.hpp是我们之前实现过的守护锁。

2. 配置log

  • 以下代码可以写在Log.hpp头文件中,配置log只需要修改Config类的构造函数即可。
// 配置log
Log log;class Config
{
public:Config(){// 在此配置log.Enable(ClassFile);}~Config(){}
};Config config;

2. 线程池


2.1 需要用到的头文件


1. Log.hpp

#pragma once#include<iostream>
#include<string>
#include<cstdarg>
#include<ctime>
#include<fstream>
#include<sys/types.h>
#include<sys/stat.h>
#include<fcntl.h>
#include<unistd.h>
#include"LockGuard.hpp"enum
{Debug = 0,Info,       // 正常信息Warning,    // 告警Error,      // 错误Fatal       // 致命错误
};enum
{Screen = 0,    // 向显示器打印OneFile,        // 向一个文件打印ClassFile       // 向多个文件打印
};// 将日志等级转换为string
std::string LevelToString(int level)
{switch(level){case Debug:return "Debug";case Info:return "Info";case Warning:return "Warning";case Error:return "Error";case Fatal:return "Fatal";default:return "Unknown";}
}const int defaultstyle = Screen;                // 默认风格是向显示器打印
const std::string default_filename = "log."; // 默认文件名
const std::string logdir = "log";              // 默认日志文件夹class Log
{
public:Log():_style(defaultstyle),_filename(default_filename){mkdir(logdir.c_str(), 0775);pthread_mutex_init(&_mutex, nullptr);}void Enable(int style){_style = style;}std::string TimeStampExLocalTime(){time_t currtime = time(nullptr);struct tm *curr = localtime(&currtime);char time_buffer[128];snprintf(time_buffer, sizeof(time_buffer), "%d-%d-%d %d:%d:%d", \curr->tm_year+1900, curr->tm_mon+1, curr->tm_mday,\curr->tm_hour, curr->tm_min, curr->tm_sec);return time_buffer;}void WriteLogToOneFile(const std::string &logname, const std::string &message){umask(0);int fd = open(logname.c_str(), O_CREAT | O_WRONLY | O_APPEND, 0666);if(fd < 0) return;write(fd, message.c_str(), message.size());close(fd);}void WriteLogToClassFile(const std::string &levelstr, const std::string &message){std::string logname = logdir;logname += "/";logname += _filename;logname += levelstr;WriteLogToOneFile(logname, message);}void WriteLog(const std::string &levelstr, const std::string &message){switch(_style){case Screen:std::cout << message;break;case OneFile:WriteLogToClassFile("all", message);break;case ClassFile:WriteLogToClassFile(levelstr, message);break;default:break;}}void LogMessage(int level, const char* format, ...) // 类C的一个日志接口{char rightbuffer[1024];va_list args;    // 这是一个char *类型(或者void *)的指针va_start(args, format);     // 让arg指向可变部分vsnprintf(rightbuffer, sizeof(rightbuffer), format, args);  // 将可变部分按照指定格式写入到content中va_end(args);   // 释放args, args = nullptrchar leftbuffer[1024];std::string levelstr = LevelToString(level);std::string currtime = TimeStampExLocalTime();      // 获取当前时间std::string idstr = std::to_string(getpid());snprintf(leftbuffer, sizeof(leftbuffer), "[%s][%s][%s] ", \levelstr.c_str(), currtime.c_str(), idstr.c_str());std::string loginfo = leftbuffer;loginfo += rightbuffer;{LockGuard lockguard(&_mutex);WriteLog(levelstr, loginfo);}}~Log(){}
private:int _style;std::string _filename;pthread_mutex_t _mutex;
};// 配置log
Log log;class Config
{
public:Config(){// 在此配置log.Enable(ClassFile);}~Config(){}
};Config config;

2. Thread.hpp

#pragma once#include <iostream>
#include <string>
#include <functional>template<class T>
using func_t = std::function<void(T&)>;template<class T>
class Thread
{
public:Thread(const std::string &threadname, func_t<T> func, T &data):_tid(0), _threadname(threadname), _isrunning(false), _func(func), _data(data){}static void *ThreadRoutine(void* args){Thread *ts = static_cast<Thread *>(args);ts->_func(ts->_data);return nullptr;}bool Start(){int n = pthread_create(&_tid, nullptr, ThreadRoutine, this);if(n == 0) {_isrunning = true;return true;}else return false;}bool Join(){if(!_isrunning) return false;int n = pthread_join(_tid, nullptr);if(n == 0){_isrunning = false;return true;}return false;}bool IsRunning(){return _isrunning;}std::string ThreadName(){return _threadname;}~Thread(){}private:pthread_t _tid;             // 库级别线程idstd::string _threadname;    // 线程名bool _isrunning;             // 运行状态func_t<T> _func;               // 线程执行的回调方法T _data;
};

3. LockGuard.hpp

#pragma once#include <pthread.h>// 不定义锁,默认认为外部会给我们传入锁对象
class Mutex
{
public:Mutex(pthread_mutex_t *lock):_lock(lock){}void Lock(){pthread_mutex_lock(_lock);}void Unlock(){pthread_mutex_unlock(_lock);}~Mutex(){}private:pthread_mutex_t *_lock;
};class LockGuard
{
public:LockGuard(pthread_mutex_t *lock):_mutex(lock){_mutex.Lock();}~LockGuard(){_mutex.Unlock();}private:Mutex _mutex;
};

4. Task.hpp

#pragma once
#include <iostream>
#include <string>
#include <unistd.h>const int defaultvalue = 0;enum
{ok = 0,div_zero, // 除0错误mod_zero, // 模0错误unknow    // 未知错误
};const std::string opers = "+-*/%)(&";class Task
{
public:Task(){}Task(int x, int y, char op): data_x(x), data_y(y), oper(op), result(defaultvalue), code(ok){}void Run(){switch (oper){case '+':result = data_x + data_y;break;case '-':result = data_x - data_y;break;case '*':result = data_x * data_y;break;case '/':{if (data_y == 0)code = div_zero;elseresult = data_x / data_y;}break;case '%':{if (data_y == 0)code = mod_zero;elseresult = data_x % data_y;}break;default:code = unknow;break;}}std::string PrintTask(){std::string s;s = std::to_string(data_x);s += oper;s += std::to_string(data_y);s += "=?";return s;}std::string PrintResult(){std::string s;s = std::to_string(data_x);s += oper;s += std::to_string(data_y);s += "=";s += std::to_string(result);s += " [";s += std::to_string(code);s += "]";return s;}void operator()(){Run();}~Task(){}private:int data_x;int data_y;char oper; // + - * / %int result;int code; // 结果码,0: 结果可信 !0: 结果不可信,1,2,3,4
};

2.2 需求分析


在这里插入图片描述

这个线程池要有一个任务队列queue,和一个线程数组vector,任务由外部Push进线程池内部的任务队列中,多个线程并发的从任务队列拿任务,并执行。

并发访问任务队列时,会有线程安全问题,需要控制同步和互斥:

  • Push操作一定是互斥的,Pop操作一定是互斥的;
  • 如果任务队列为空,则让线程等待,Push进去一个任务,就激活一个线程。

每一个线程需要执行的逻辑是,先从任务队列中拿任务,如果没拿到(说明任务队列为空),就先等待;拿到了就立即执行该任务。


2.3 实现细节


1. 线程池的内部成员变量

  • 一个任务队列;
  • 一个线程数组;
  • 一个整数类型_thread_num记录线程池中的线程数量;
  • 一个互斥量和一个条件变量,用来控制访问任务队列时的同步和互斥。
template <class T>
class ThreadPool
{...
private:std::queue<T> _q;std::vector<Thread<ThreadData>> _threads;	// ThreadData是线程信息类int _thread_num; // 线程个数pthread_mutex_t _mutex;pthread_cond_t _cond;
};

2. 设计一个线程信息类,存储每个线程的名字(可扩展)

class ThreadData
{
public:ThreadData(const std::string &threadname):_threadname(threadname){}~ThreadData(){}public:std::string _threadname;
};

3. 构造函数和析构函数

  • 构造函数需要初始化互斥量和条件变量,并且创建指定个数的线程,不启动;
  • 析构函数只需要释放互斥量和条件变量即可。
static const int default_num = 5; // 默认线程池中的线程个数template <class T>
class ThreadPool
{
public:ThreadPool(int thread_num = default_num): _thread_num(thread_num){pthread_mutex_init(&_mutex, nullptr);pthread_cond_init(&_cond, nullptr);// 构建指定个数的线程for (int i = 0; i < _thread_num; i++){// 待优化std::string threadname = "thread-";threadname += std::to_string(i + 1);ThreadData td(threadname);Thread<ThreadData> t(threadname, \std::bind(&ThreadPool<T>::ThreadRun, \this, std::placeholders::_1), td);_threads.push_back(t);log.LogMessage(Info, "%s is created...\n", threadname.c_str());}}~ThreadPool(){pthread_mutex_destroy(&_mutex);pthread_cond_destroy(&_cond);}...private:std::queue<T> _q;std::vector<Thread<ThreadData>> _threads;int _thread_num; // 线程个数pthread_mutex_t _mutex;pthread_cond_t _cond;
};
  • std::bind(&ThreadPool<T>::ThreadRun, this, std::placeholders::_1)是一个函数模版对象,其中ThreadRun为每一个线程要执行的方法,这个方法是一个成员函数。

4. ThreadRun方法

  • 向队列中拿任务时,需要上锁;队列中没有任务时,需要让线程等待。
template <class T>
class ThreadPool
{
public:void ThreadWait(const ThreadData &td){log.LogMessage(Debug, "no task, %s is sleeping...\n", td._threadname.c_str());pthread_cond_wait(&_cond, &_mutex);}void ThreadRun(ThreadData &td){while (true){// 取任务T t;{LockGuard lockguard(&_mutex);while (_q.empty()){ThreadWait(td);log.LogMessage(Debug, "thread, %s wake up.\n", td._threadname.c_str());}t = _q.front();_q.pop();}// 处理任务t();log.LogMessage(Info, "%s handler task %s done, result is: %s\n", \td._threadname.c_str(), t.PrintTask().c_str(), t.PrintResult().c_str());}}...private:std::queue<T> _q;std::vector<Thread<ThreadData>> _threads;int _thread_num; // 线程个数pthread_mutex_t _mutex;pthread_cond_t _cond;
};

5. Push方法,向任务队列中放任务

  • 访问队列时上锁,Push完成后激活一个等待的线程。
template <class T>
class ThreadPool
{
public:void ThreadWakeup(){pthread_cond_signal(&_cond);}void Push(T &in){log.LogMessage(Debug, "other thread push a task, task is: %s\n", in.PrintTask().c_str());LockGuard lockgurad(&_mutex);_q.push(in);ThreadWakeup();}...private:std::queue<T> _q;std::vector<Thread<ThreadData>> _threads;int _thread_num; // 线程个数pthread_mutex_t _mutex;pthread_cond_t _cond;
};

6. 启动线程池,join线程池中的线程

template <class T>
class ThreadPool
{
public:bool Start(){// 启动for (auto &thread : _threads){thread.Start();log.LogMessage(Info, "%s is running ...\n", thread.ThreadName().c_str());}return true;}// for debugvoid Wait(){for (auto &thread : _threads){thread.Join();}}...private:std::queue<T> _q;std::vector<Thread<ThreadData>> _threads;int _thread_num; // 线程个数pthread_mutex_t _mutex;pthread_cond_t _cond;
};

2.4 完整代码


#pragma once#include <iostream>
#include <queue>
#include <pthread.h>
#include <vector>
#include <functional>
#include "Log.hpp"
#include "Thread.hpp"
#include "LockGuard.hpp"static const int default_num = 5; // 默认线程池中的线程个数class ThreadData
{
public:ThreadData(const std::string &threadname):_threadname(threadname){}~ThreadData(){}public:std::string _threadname;
};template <class T>
class ThreadPool
{
public:ThreadPool(int thread_num = default_num): _thread_num(thread_num){pthread_mutex_init(&_mutex, nullptr);pthread_cond_init(&_cond, nullptr);// 构建指定个数的线程for (int i = 0; i < _thread_num; i++){// 待优化std::string threadname = "thread-";threadname += std::to_string(i + 1);ThreadData td(threadname);Thread<ThreadData> t(threadname, \std::bind(&ThreadPool<T>::ThreadRun, \this, std::placeholders::_1), td);_threads.push_back(t);log.LogMessage(Info, "%s is created...\n", threadname.c_str());}}bool Start(){// 启动for (auto &thread : _threads){thread.Start();log.LogMessage(Info, "%s is running ...\n", thread.ThreadName().c_str());}return true;}void ThreadWait(const ThreadData &td){log.LogMessage(Debug, "no task, %s is sleeping...\n", td._threadname.c_str());pthread_cond_wait(&_cond, &_mutex);}void ThreadWakeup(){pthread_cond_signal(&_cond);}void ThreadRun(ThreadData &td){while (true){// 取任务T t;{LockGuard lockguard(&_mutex);while (_q.empty()){ThreadWait(td);log.LogMessage(Debug, "thread, %s wake up.\n", td._threadname.c_str());}t = _q.front();_q.pop();}// 处理任务t();log.LogMessage(Info, "%s handler task %s done, result is: %s\n", \td._threadname.c_str(), t.PrintTask().c_str(), t.PrintResult().c_str());}}void Push(T &in){log.LogMessage(Debug, "other thread push a task, task is: %s\n", in.PrintTask().c_str());LockGuard lockgurad(&_mutex);_q.push(in);ThreadWakeup();}// for debugvoid Wait(){for (auto &thread : _threads){thread.Join();}}~ThreadPool(){pthread_mutex_destroy(&_mutex);pthread_cond_destroy(&_cond);}private:std::queue<T> _q;std::vector<Thread<ThreadData>> _threads;int _thread_num; // 线程个数pthread_mutex_t _mutex;pthread_cond_t _cond;
};

测试代码

#include<iostream>
#include<memory>
#include"ThreadPool.hpp"
#include"Task.hpp"
#include"Log.hpp"int main()
{std::unique_ptr<ThreadPool<Task>> tp(new ThreadPool<Task>());tp->Start();srand((uint64_t)time(nullptr) ^ getpid());while(true){int x = rand() % 100 + 1;usleep(1234);int y = rand() % 200;usleep(1234);char oper = opers[rand() % opers.size()];Task t(x, y, oper);tp->Push(t);sleep(1);}tp->Wait();return 0;
}

3. 改造单例版本


线程池一般在整个程序中只有唯一一份,适合用单例模式实现,下面实现“懒汉”线程池。

第一步:构造函数私有化,并禁掉拷贝构造和赋值

template <class T>
class ThreadPool
{
private:ThreadPool(int thread_num = default_num): _thread_num(thread_num){pthread_mutex_init(&_mutex, nullptr);pthread_cond_init(&_cond, nullptr);// 构建指定个数的线程for (int i = 0; i < _thread_num; i++){// 待优化std::string threadname = "thread-";threadname += std::to_string(i + 1);ThreadData td(threadname);Thread<ThreadData> t(threadname, \std::bind(&ThreadPool<T>::ThreadRun, \this, std::placeholders::_1), td);_threads.push_back(t);log.LogMessage(Info, "%s is created...\n", threadname.c_str());}}ThreadPool(const ThreadPool<T> &tp) = delete;const ThreadPool<T> &operator=(const ThreadPool<T> &tp) = delete;public:...private:...
};

第二步:提供静态指针instance,并提供静态方法GetInstance获取静态指针

template <class T>
class ThreadPool
{
private:// 构造函数私有化,并禁掉拷贝构造和赋值...public:// 有线程安全问题static ThreadPool<T> *GetInstance(){LockGuard lockguard(&sig_lock);if(instance == nullptr){log.LogMessage(Info, "创建单例成功...\n");instance = new ThreadPool<T>();}return instance;}private:// 成员变量...static ThreadPool<T> *instance;static pthread_mutex_t sig_lock;
};template<class T>
ThreadPool<T>* ThreadPool<T>::instance = nullptr;template<class T>
pthread_mutex_t ThreadPool<T>::sig_lock = PTHREAD_MUTEX_INITIALIZER;

特别注意:

  • GetInstance函数存在线程安全问题,同时可能有多个线程满足instance == nullptr,从而错误的调用多次new方法,所以需要对GetInstance函数整个上锁;
  • 这把锁保护的是instance,所以需要我们设计一个新的互斥量,并且因为GetInstance方法是静态的,没有this指针,所以这个新的互斥量也要是静态的;
  • 可是如果是上面这种写法,效率是很低的,在instance不为空时,任何线程想要调用GetInstance应该是立即返回的,但是此时却要回回上锁;
  • 下面是GetInstance的改良版,保证了在instance不为空时,任何线程想要调用GetInstance会立即返回,不需要上锁。
template <class T>
class ThreadPool
{
private:...public:// 有线程安全问题static ThreadPool<T> *GetInstance(){if(instance == nullptr){LockGuard lockguard(&sig_lock);if(instance == nullptr){log.LogMessage(Info, "创建单例成功...\n");instance = new ThreadPool<T>();}}return instance;}private:...static ThreadPool<T> *instance;static pthread_mutex_t sig_lock;
};template<class T>
ThreadPool<T>* ThreadPool<T>::instance = nullptr;template<class T>
pthread_mutex_t ThreadPool<T>::sig_lock = PTHREAD_MUTEX_INITIALIZER;

完整单例线程池代码

#pragma once#include <iostream>
#include <queue>
#include <pthread.h>
#include <vector>
#include <functional>
#include "Log.hpp"
#include "Thread.hpp"
#include "LockGuard.hpp"static const int default_num = 5; // 默认线程池中的线程个数class ThreadData
{
public:ThreadData(const std::string &threadname):_threadname(threadname){}~ThreadData(){}public:std::string _threadname;
};template <class T>
class ThreadPool
{
private:ThreadPool(int thread_num = default_num): _thread_num(thread_num){pthread_mutex_init(&_mutex, nullptr);pthread_cond_init(&_cond, nullptr);// 构建指定个数的线程for (int i = 0; i < _thread_num; i++){// 待优化std::string threadname = "thread-";threadname += std::to_string(i + 1);ThreadData td(threadname);Thread<ThreadData> t(threadname, \std::bind(&ThreadPool<T>::ThreadRun, \this, std::placeholders::_1), td);_threads.push_back(t);log.LogMessage(Info, "%s is created...\n", threadname.c_str());}}ThreadPool(const ThreadPool<T> &tp) = delete;const ThreadPool<T> &operator=(const ThreadPool<T> &tp) = delete;public:// 有线程安全问题static ThreadPool<T> *GetInstance(){if(instance == nullptr){LockGuard lockguard(&sig_lock);if(instance == nullptr){log.LogMessage(Info, "创建单例成功...\n");instance = new ThreadPool<T>();}}return instance;}bool Start(){// 启动for (auto &thread : _threads){thread.Start();log.LogMessage(Info, "%s is running ...\n", thread.ThreadName().c_str());}return true;}void ThreadWait(const ThreadData &td){log.LogMessage(Debug, "no task, %s is sleeping...\n", td._threadname.c_str());pthread_cond_wait(&_cond, &_mutex);}void ThreadWakeup(){pthread_cond_signal(&_cond);}void ThreadRun(ThreadData &td){while (true){// 取任务T t;{LockGuard lockguard(&_mutex);while (_q.empty()){ThreadWait(td);log.LogMessage(Debug, "thread, %s wake up.\n", td._threadname.c_str());}t = _q.front();_q.pop();}// 处理任务t();log.LogMessage(Info, "%s handler task %s done, result is: %s\n", \td._threadname.c_str(), t.PrintTask().c_str(), t.PrintResult().c_str());}}void Push(T &in){log.LogMessage(Debug, "other thread push a task, task is: %s\n", in.PrintTask().c_str());{LockGuard lockgurad(&_mutex);_q.push(in);ThreadWakeup();}}// for debugvoid Wait(){for (auto &thread : _threads){thread.Join();}}~ThreadPool(){pthread_mutex_destroy(&_mutex);pthread_cond_destroy(&_cond);}private:std::queue<T> _q;   // 任务队列std::vector<Thread<ThreadData>> _threads;int _thread_num; // 线程个数pthread_mutex_t _mutex;pthread_cond_t _cond;static ThreadPool<T> *instance;static pthread_mutex_t sig_lock
};template<class T>
ThreadPool<T>* ThreadPool<T>::instance = nullptr;template<class T>
pthread_mutex_t ThreadPool<T>::sig_lock = PTHREAD_MUTEX_INITIALIZER;

测试代码

#include<iostream>
#include<memory>
#include"ThreadPool.hpp"
#include"Task.hpp"
#include"Log.hpp"int main()
{ThreadPool<Task>::GetInstance()->Start();srand((uint64_t)time(nullptr) ^ getpid());while(true){int x = rand() % 100 + 1;usleep(1234);int y = rand() % 200;usleep(1234);char oper = opers[rand() % opers.size()];Task t(x, y, oper);ThreadPool<Task>::GetInstance()->Push(t);sleep(1);}ThreadPool<Task>::GetInstance()->Wait();return 0;
}

4. 改进思路


设计一个动态线程池,可以根据当前的线程池中的线程数量和任务数量,动态创建或销毁线程。

具体思路以伪代码的形式呈现给大家,感兴趣的小伙伴可以自己实现:

template <class T>
class ThreadPool
{
private:...public:...void CheckSelf(){// 1. _task_num > _task_num_high_water && _thread_num < _thread_num_high_water// 创建更多线程,并更新_thread_num// 2. _task_num == _task_num_low_water && _thread_num >= _thread_num_high_water// 把自己退出了,并且更新_thread_num}void ThreadRun(ThreadData &td){while (true){// 取任务CheckSelf();...}}private:...// 扩展int _task_num;	// 任务数量// 高低水位线int _thread_num_low_water;int _thread_num_high_water;int _task_num_low_water;int _task_num_high_water;
};

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com

热搜词