文章目录
- 0. 引言
- 1. 设计思路
- 2. 代码实现与详解
- 2.1 忙等待机制:`BusyWait` 函数
- 2.2 核心工作函数:`Work`
- 2.3 主函数:`main`
- 3. CPU使用模式分析
- 4. 完整代码
0. 引言
本文深入探讨了如何利用C++与POSIX线程库(pthread
)编写多线程程序,以模拟不同负载下的CPU资源占用情况。
该工具应用在Linux编程: C++程序线程CPU使用率监控与分析小工具
1. 设计思路
本文的代码设计旨在创建一个多线程的工作池(worker pool),每个线程在运行期间根据指定的占用比例模拟CPU密集型工作。以下是代码实现中的几个核心技术要点:
- 线程命名与管理:为每个线程设置唯一名称,有助于在调试和监控时轻松识别不同线程。
- CPU亲和性设置:通过设置线程的CPU亲和性(affinity),确保每个线程绑定到特定的CPU核,避免频繁的上下文切换,从而提升性能。
- 线程调度策略与优先级:采用实时调度策略(
SCHED_FIFO
),并为每个线程分配不同的优先级,以更好地控制线程的执行顺序和响应时间。 - 忙等待与系统调用优化:使用自旋等待(busy-waiting)和系统调用相结合的策略,提高线程对CPU资源的利用率。
2. 代码实现与详解
2.1 忙等待机制:BusyWait
函数
忙等待(busy-waiting)是一种常见的CPU资源占用方法。在本例中,BusyWait
函数实现了一个简易的忙等待循环。
void BusyWait(std::size_t nanosec) {const auto t0 = std::chrono::high_resolution_clock::now();while (std::chrono::duration_cast<std::chrono::nanoseconds>(std::chrono::high_resolution_clock::now() - t0).count() < nanosec) {getpid(); // 简单的系统调用,切换到内核模式sched_yield(); // 让出处理器给其他线程,进行内核交互}
}
函数解析:
getpid()
和sched_yield()
系统调用用于模拟线程的实际工作负载。getpid()
:虽然是一个简单的系统调用,但它迫使线程进入内核模式,增加了内核CPU时间的消耗。sched_yield()
:请求内核调度器将CPU时间片让给其他线程,进一步增加了内核参与调度的次数。
这种设计既确保了线程的高占用率,又避免了在忙等待期间完全占用CPU资源。
2.2 核心工作函数:Work
Work
函数定义了每个线程的核心行为和策略,包括线程命名、CPU亲和性设置、调度策略和优先级设置等。
[[noreturn]] void Work(float percentage, int thread_id) {assert(percentage >= 0.0f && percentage <= 1.0f);constexpr float kPeriod = 1'000'000.0f;// 设置线程名称const std::string thread_name = "worker_" + std::to_string(thread_id);(void)pthread_setname_np(pthread_self(), thread_name.c_str());// 设置CPU亲和性cpu_set_t cpuset;CPU_ZERO(&cpuset);CPU_SET(static_cast<int>(thread_id % std::thread::hardware_concurrency()), &cpuset);(void)pthread_setaffinity_np(pthread_self(), sizeof(cpu_set_t), &cpuset);// 设置调度策略和优先级struct sched_param param;param.sched_priority = sched_get_priority_min(SCHED_FIFO) + thread_id;if (pthread_setschedparam(pthread_self(), SCHED_FIFO, ¶m) != 0) {std::cerr << "Failed to set thread scheduling policy and priority for thread " << thread_id << "\n";}while (true) {BusyWait(static_cast<std::size_t>(kPeriod * percentage));std::this_thread::sleep_for(std::chrono::nanoseconds(static_cast<std::size_t>(kPeriod * (1.0f - percentage))));}
}
关键步骤:
-
线程命名:通过
pthread_setname_np
,为每个线程设置一个唯一的名称(例如worker_0
,worker_1
),便于调试和监控。 -
CPU亲和性设置:通过
pthread_setaffinity_np
将线程绑定到特定的CPU核心(根据thread_id
),避免线程在多个核心之间频繁切换,提高缓存命中率。 -
调度策略和优先级设置:
- 使用
SCHED_FIFO
调度策略,确保线程按照先进先出的顺序执行。 - 使用
pthread_setschedparam
设置线程优先级。优先级由线程ID决定,以模拟不同的调度策略和响应时间。
- 使用
-
工作循环:
- 线程按照指定比例先进行忙等待(模拟CPU密集型任务),然后进入睡眠状态释放CPU资源。
- 这种设计确保了线程在指定时间窗口内合理占用CPU,同时在其余时间内不占用CPU资源。
2.3 主函数:main
主函数负责初始化和启动多个worker线程,并在程序结束时清理所有线程资源。
int main(int argc, char* argv[]) {if (argc < 3) {std::cout << "Args: worker_num occupation_rate.\n";return 0;}const int num = std::stoi(argv[1]);const float percentage = std::stof(argv[2]);if (num < 1) {std::cout << "Error: num of workers less than 1.\n";return 0;}if (percentage < 0.0f || percentage > 1.0f) {std::cout << "Error: occupation rate should be between [0.0, 1.0].\n";return 0;}std::cout << "num of workers: " << num << "\n"<< "occupation rate: " << percentage << "\n";// 创建和启动worker线程std::vector<std::unique_ptr<std::thread>> threads;threads.reserve(num);for (int i = 0; i < num; ++i) {threads.push_back(std::make_unique<std::thread>(worker_app::Work, percentage, i));}// 等待所有线程完成for (auto& td : threads) {if (td->joinable()) {td->join();}}return 0;
}
3. CPU使用模式分析
-
用户态CPU使用(User CPU):
- 在
Work
函数的主循环中,线程主要在BusyWait
函数中消耗CPU时间。此时线程处于用户态(User Mode),不断执行忙等待循环,模拟了一个典型的CPU密集型任务。
- 在
-
内核态CPU使用(Kernel CPU):
BusyWait
函数中的getpid()
和sched_yield()
系统调用会导致线程从用户态切换到内核态,增加了内核CPU的负载。- 尤其是
sched_yield()
,它显式请求内核进行上下文切换,这会导致较高的内核CPU使用率。
4. 完整代码
// g++ -o dummp_worker dummp_worker.cc -O2
#include <pthread.h>
#include <sched.h>
#include <unistd.h> // For getpid() and other system calls
#include <cassert>
#include <chrono>
#include <iostream>
#include <memory>
#include <thread>
#include <vector>namespace worker_app {void BusyWait(std::size_t nanosec) {const auto t0 = std::chrono::high_resolution_clock::now();while (std::chrono::duration_cast<std::chrono::nanoseconds>(std::chrono::high_resolution_clock::now() - t0).count() <nanosec) {// Perform simple system calls during the busy-wait loopgetpid(); // This call is simple but ensures a switch to kernel modesched_yield(); // Yield the processor, another system call to engage the kernel}
}[[noreturn]] void Work(float percentage, int thread_id) {assert(percentage >= 0.0f && percentage <= 1.0f);constexpr float kPeriod = 1'000'000.0f;// Set thread nameconst std::string thread_name = "worker_" + std::to_string(thread_id);(void)pthread_setname_np(pthread_self(), thread_name.c_str());// Set CPU affinity to ensure the thread uses a specific CPU corecpu_set_t cpuset;CPU_ZERO(&cpuset);CPU_SET(static_cast<int>(thread_id % std::thread::hardware_concurrency()), &cpuset);(void)pthread_setaffinity_np(pthread_self(), sizeof(cpu_set_t), &cpuset);// Set thread scheduling policy and prioritystruct sched_param param;param.sched_priority = sched_get_priority_min(SCHED_FIFO) + thread_id; // Vary priority by thread_idif (pthread_setschedparam(pthread_self(), SCHED_FIFO, ¶m) != 0) {std::cerr << "Failed to set thread scheduling policy and priority for thread " << thread_id << "\n";}while (true) {BusyWait(static_cast<std::size_t>(kPeriod * percentage));std::this_thread::sleep_for(std::chrono::nanoseconds(static_cast<std::size_t>(kPeriod * (1.0f - percentage))));}
}} // namespace worker_appint main(int argc, char* argv[]) {if (argc < 3) {std::cout << "Args: worker_num occupation_rate.\n";return 0;}const int num = std::stoi(argv[1]);const float percentage = std::stof(argv[2]);if (num < 1) {std::cout << "Error: num of workers less than 1.\n";return 0;}if (percentage < 0.0f || percentage > 1.0f) {std::cout << "Error: occupation rate should be between [0.0, 1.0].\n";return 0;}std::cout << "num of workers: " << num << "\n"<< "occupation rate: " << percentage << "\n";// Create and start worker threadsstd::vector<std::unique_ptr<std::thread>> threads;threads.reserve(num);for (int i = 0; i < num; ++i) {threads.push_back(std::make_unique<std::thread>(worker_app::Work, percentage, i));}// Join all threadsfor (auto& td : threads) {if (td->joinable()) {td->join();}}return 0;
}