欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 文旅 > 艺术 > 百度自动驾驶apollo源码解读12:线程池

百度自动驾驶apollo源码解读12:线程池

2024/10/24 21:25:49 来源:https://blog.csdn.net/hfut31415926/article/details/140565229  浏览:    关键词:百度自动驾驶apollo源码解读12:线程池

apollo项目里面有个线程池,源码链接:https://github.com/ApolloAuto/apollo/blob/master/cyber/base/thread_pool.h

仅用一个头文件去实现,此处贴出来源码吧。

1. 源码实现

#ifndef CYBER_BASE_THREAD_POOL_H_
#define CYBER_BASE_THREAD_POOL_H_#include <atomic>
#include <functional>
#include <future>
#include <memory>
#include <queue>
#include <stdexcept>
#include <thread>
#include <utility>
#include <vector>#include "cyber/base/bounded_queue.h"/*
五大池:内存池、连接池、线程池、进程池、协程池
线程池存在的意义:
传统多线程方案中我们采用的服务器模型则是一旦接受到请求之后,即创建一个新的线程,由该线程执行任务。
任务执行完毕后,线程退出,这就是是“即时创建,即时销毁”的策略。尽管与创建进程相比,创建线程的时间
已经大大的缩短,但是如果提交给线程的任务是执行时间较短,而且执行次数极其频繁,那么服务器将处于不停
的创建线程,销毁线程的状态。
此线程池有点:使用比较灵活
此线程池缺陷:接口较少,没有一些访问当先线程池转态、强制停止所有任务等接口
尚不明白的地方:task_queue_的类型是std::function<void()>,这个void是函数的返回值吧,那这样
是不是次线程池可以执行的函数体必须是void返回值类型,经过测试,并不是这样。
*/namespace apollo {
namespace cyber {
namespace base {class ThreadPool {public:explicit ThreadPool(std::size_t thread_num, std::size_t max_task_num = 1000);template <typename F, typename... Args>auto Enqueue(F&& f, Args&&... args)-> std::future<typename std::result_of<F(Args...)>::type>;~ThreadPool();private:std::vector<std::thread> workers_;BoundedQueue<std::function<void()>> task_queue_;std::atomic_bool stop_;
};inline ThreadPool::ThreadPool(std::size_t threads, std::size_t max_task_num): stop_(false) {if (!task_queue_.Init(max_task_num, new BlockWaitStrategy())) {throw std::runtime_error("Task queue init failed.");}workers_.reserve(threads);for (size_t i = 0; i < threads; ++i) {workers_.emplace_back([this] {while (!stop_) {std::function<void()> task;if (task_queue_.WaitDequeue(&task)) {task();}}});}
}// before using the return value, you should check value.valid()
template <typename F, typename... Args>
auto ThreadPool::Enqueue(F&& f, Args&&... args)-> std::future<typename std::result_of<F(Args...)>::type> {using return_type = typename std::result_of<F(Args...)>::type;auto task = std::make_shared<std::packaged_task<return_type()>>(std::bind(std::forward<F>(f), std::forward<Args>(args)...));std::future<return_type> res = task->get_future();// don't allow enqueueing after stopping the poolif (stop_) {return std::future<return_type>();}task_queue_.Enqueue([task]() { (*task)(); });return res;
};// the destructor joins all threads
inline ThreadPool::~ThreadPool() {if (stop_.exchange(true)) {return;}task_queue_.BreakAllWait();for (std::thread& worker : workers_) {worker.join();}
}}  // namespace base
}  // namespace cyber
}  // namespace apollo#endif  // CYBER_BASE_THREAD_POOL_H_

2. 写一个简单的测试例子

#include <atomic>
#include <iostream>#include "cyber/base/thread_pool.h"using namespace apollo::cyber::base;std::atomic<int> g_int;int main(int argc, char* argv[]) {ThreadPool* tp = new ThreadPool(10, 10000);for (size_t i = 0; i < 10000; i++) {tp->Enqueue<>([&](int a) -> bool {g_int.fetch_add(1);std::this_thread::sleep_for(std::chrono::milliseconds(1));return true;}, 1000);}// 等待所有线程运行结束std::this_thread::sleep_for(std::chrono::seconds(10));std::cout << g_int << std::endl;return 0;
}


另外。在激光雷达里面看到了一个线程池的代码,默认项目里面没有,是激光雷达厂商提供的吧。

3. 激光雷达里面看到了一个线程池的代码

3.1 utility.h头文件

//
// Created by ljy on 2023/3/16.
//#ifndef INC_20230307_UTILITY_H
#define INC_20230307_UTILITY_H
#pragma once#include <atomic>
#include <condition_variable>
#include <functional>
#include <future>
#include <memory>
#include <mutex>
#include <queue>
#include <thread>
#include <utility>
#include <vector>
#define MAX_THREAD_NUM 4class ThreadPool {
private:inline ThreadPool();public:typedef std::shared_ptr<ThreadPool> Ptr;ThreadPool(ThreadPool &) = delete;ThreadPool &operator=(const ThreadPool &) = delete;~ThreadPool();public:static Ptr getInstance();int idlCount();template <class F, class... Args>inline auto commit(F &&f, Args &&... args)-> std::future<decltype(f(args...))> {if (stoped.load())throw std::runtime_error("Commit on ThreadPool is stopped.");using RetType = decltype(f(args...));auto task = std::make_shared<std::packaged_task<RetType()>>(std::bind(std::forward<F>(f), std::forward<Args>(args)...));  // wtf !std::future<RetType> future = task->get_future();{std::lock_guard<std::mutex> lock{m_lock};tasks.emplace([task]() { (*task)(); });}cv_task.notify_one();return future;}private:using Task = std::function<void()>;std::vector<std::thread> pool;std::queue<Task> tasks;std::mutex m_lock;std::condition_variable cv_task;std::atomic<bool> stoped;std::atomic<int> idl_thr_num;static Ptr instance_ptr;static std::mutex instance_mutex;
};#endif //INC_20230307_UTILITY_H

3.2 utility.cpp实现文件

//
// Created by ljy on 2023/3/16.
//#include "utility.h"ThreadPool::Ptr ThreadPool::instance_ptr = nullptr;
std::mutex ThreadPool::instance_mutex;ThreadPool::ThreadPool() : stoped{false} {idl_thr_num = MAX_THREAD_NUM;for (int i = 0; i < idl_thr_num; ++i) {pool.emplace_back([this] {while (!this->stoped) {std::function<void()> task;{std::unique_lock<std::mutex> lock{this->m_lock};this->cv_task.wait(lock, [this] {return this->stoped.load() || !this->tasks.empty();});if (this->stoped && this->tasks.empty()) return;task = std::move(this->tasks.front());this->tasks.pop();}idl_thr_num--;task();idl_thr_num++;}});}
}ThreadPool::Ptr ThreadPool::getInstance() {if (instance_ptr == nullptr) {std::lock_guard<std::mutex> lk(instance_mutex);if (instance_ptr == nullptr) {instance_ptr = std::shared_ptr<ThreadPool>(new ThreadPool);}}return instance_ptr;
}ThreadPool::~ThreadPool() {stoped.store(true);cv_task.notify_all();for (std::thread &thread: pool) {thread.join();}
}int ThreadPool::idlCount() { return idl_thr_num; }

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com