📢博客主页:https://blog.csdn.net/2301_779549673
📢博客仓库:https://gitee.com/JohnKingW/linux_test/tree/master/lesson
📢欢迎点赞 👍 收藏 ⭐留言 📝 如有错误敬请指正!
📢本文由 JohnKi 原创,首发于 CSDN🙉
📢未来很长,值得我们全力奔赴更美好的生活✨
文章目录
- 📢前言
- 🏳️🌈一、条件变量 Cond
- 1.1 条件变量的本质
- 1.2 核心作用
- 🏳️🌈二、条件变量封装类
- 2.1 类定义
- 2.2 构造函数
- 2.3 Wait 方法
- 2.4 Notify 和 NotifyAll
- 2.5 析构函数
- 2.6 整体代码
- 2.7 示例用法
- 🏳️🌈三、POSIX 信号量
- 3.1 Sem类核心功能
- 3.2 设计意义
- 3.3 Sem类封装
- 🏳️🌈四、环形队列的逻辑及实现
- 4.1 核心设计
- 4.2 成员变量
- 4.3 核心接口
- 4.3.1 构造函数
- 4.3.2Equeue(生产者接口)
- 4.3.3 Pop(消费者接口)
- 4.4 同步机制
- 4.4.1 信号量控制
- 4.4.2 互斥锁保护
- 4.5 整体代码
- 4.6 测试代码
- 👥总结
📢前言
紧接上回 BlockQueue生产消费模型 的实现,这篇文章,笔者来介绍一下 条件变量 的意义和作用,然后进行 封装,并实现 BlockQueue 的加强版 环形阻塞队列 RingBuffer
🏳️🌈一、条件变量 Cond
1.1 条件变量的本质
条件变量 是 线程间的通信机制,用于在 特定条件不满足时挂起线程,并在条件满足时 唤醒线程继续执行。它必须与 **互斥锁(Mutex)** 配合使用,确保操作的原子性。
1.2 核心作用
-
避免忙等待(Busy Waiting)
问题:没有条件变量时,线程需反复检查条件是否满足(while(条件不满足)),浪费CPU资源。
解决:条件变量让线程在条件不满足时主动休眠,直到被其他线程唤醒。 -
实现线程间协作
生产者-消费者模型:生产者通知消费者“数据已准备好”,消费者通知生产者“缓冲区有空位”。
任务队列:工作线程在队列为空时休眠,主线程添加任务后唤醒工作线程。 -
保证操作的原子性
通过 互斥锁 + 条件变量
的组合,确保线程在检查条件和进入等待状态的整个过程是原子的,避免竞态条件。
🏳️🌈二、条件变量封装类
头文件和命名空间:
- 包含 和 <pthread.h>,后者提供 POSIX 线程操作。
- 使用
LockModule
命名空间中的Mutex
类,表示互斥锁。
2.1 类定义
class Cond {
public:Cond();void Wait(Mutex &mutex);void Notify();void NotifyAll();~Cond();
private:pthread_cond_t _cond;
};
封装了 pthread_cond_t
,提供初始化、等待、通知和销毁功能
2.2 构造函数
Cond() {int n = ::pthread_cond_init(&_cond, nullptr);(void)n; // 忽略返回值
}
- 调用
pthread_cond_init
初始化条件变量。 - 返回值
n
被忽略,实际应用中应检查错误(如返回非零表示失败)。
2.3 Wait 方法
void Wait(Mutex &mutex) {int n = ::pthread_cond_wait(&_cond, mutex.LockPtr());
}
作用:使当前线程等待条件变量,并释放关联的互斥锁。
参数:Mutex
对象的引用,调用其 LockPtr()
获取底层 pthread_mutex_t*
。
流程:
- 调用线程必须已锁定
mutex
。 pthread_cond_wait
自动释放mutex
并阻塞,直到被唤醒。- 被唤醒后重新获取
mutex
,继续执行。
2.4 Notify 和 NotifyAll
void Notify() {::pthread_cond_signal(&_cond); // 唤醒一个等待线程
}
void NotifyAll() {::pthread_cond_broadcast(&_cond); // 唤醒所有等待线程
}
区别:
Notify()
唤醒至少一个等待线程。NotifyAll()
唤醒所有等待线程。- 应在持有相同互斥锁时调用,以避免竞态条件
2.5 析构函数
~Cond() {::pthread_cond_destroy(&_cond); // 销毁条件变量
}
确保没有线程等待时销毁,否则行为未定义
2.6 整体代码
Mutex.hpp
#pragma once
#include <iostream>
#include <pthread.h> // POSIX线程库头文件namespace LockModule
{// 互斥锁封装类(不可拷贝构造/赋值)class Mutex{public:// 禁止拷贝(保护系统锁资源)Mutex(const Mutex&) = delete;const Mutex& operator = (const Mutex&) = delete;// 构造函数:初始化POSIX互斥锁Mutex(){// 初始化互斥锁属性为默认值int n = ::pthread_mutex_init(&_lock, nullptr);(void)n; // 实际开发建议处理错误码}// 析构函数:销毁锁资源~Mutex(){// 确保锁已处于未锁定状态int n = ::pthread_mutex_destroy(&_lock);(void)n; // 生产环境应检查返回值}// 加锁操作(阻塞直至获取锁)void Lock(){// 可能返回EDEADLK(死锁检测)等错误码int n = ::pthread_mutex_lock(&_lock);(void)n; // 简化处理,实际建议抛异常或记录日志}// 解锁操作(必须由锁持有者调用)void Unlock(){// 未持有锁时解锁将返回EPERMint n = ::pthread_mutex_unlock(&_lock);(void)n; }// 获取底层锁指针(可用于自定义条件变量)pthread_mutex_t *LockPtr(){return &_lock;}private:pthread_mutex_t _lock; // 底层锁对象};// RAII锁守卫(自动管理锁生命周期)class LockGuard{public:// 构造时加锁(必须传入已初始化的Mutex引用)LockGuard(Mutex &mtx):_mtx(mtx){_mtx.Lock(); // 进入临界区}// 析构时自动解锁(异常安全保证)~LockGuard(){_mtx.Unlock(); // 离开作用域自动释放}private:Mutex &_mtx; // 引用方式持有,避免拷贝导致未定义行为};
}
Cond.hpp
#pragma once#include <iostream>
#include <pthread.h>#include "Mutex.hpp"namespace CondModule{using namespace LockModule;class Cond{public:Cond(){int n = ::pthread_cond_init(&_cond, nullptr);(void)n;}void Wait(Mutex& mutex){// pthread_mutex_lock() 的作用是 锁定互斥锁,直到成功为止。// 第一个参数 cond 是一个条件变量的标识符,第二个参数 mutex 是一个互斥锁的标识符。// pthread_cond_wait() 用来等待条件变量 cond 被 pthread_cond_signal() 或 pthread_cond_broadcast() 唤醒。// 它将阻塞调用线程,直到被唤醒或被中断。// 返回值是 0 表示成功,其他值表示出错。// 出错时,errno 被设置。// 成功时,调用线程获得互斥锁 mutex,直到被 pthread_cond_signal() 或 pthread_cond_broadcast() 唤醒。int n = ::pthread_cond_wait(&_cond, mutex.LockPtr());(void)n;}void Notify(){// pthread_cond_signal() 用来唤醒一个正在等待条件变量的线程。// 第一个参数 cond 是一个条件变量的标识符。// 返回值是 0 表示成功,其他值表示出错。// 出错时,errno 被设置。// 成功时,一个正在等待条件变量的线程被唤醒。int n = ::pthread_cond_signal(&_cond);(void)n;}void NotifyAll(){// pthread_cond_broadcast() 用来唤醒所有正在等待条件变量的线程。// 第一个参数 cond 是一个条件变量的标识符。// 返回值是 0 表示成功,其他值表示出错。// 出错时,errno 被设置。// 成功时,所有正在等待条件变量的线程被唤醒。int n = ::pthread_cond_broadcast(&_cond);(void)n;}~Cond(){int n = ::pthread_cond_destroy(&_cond);(void)n;}private:pthread_cond_t _cond;};
}
2.7 示例用法
Mutex mutex;
Cond cond;
std::queue<int> buffer;// 生产者线程
void producer() {mutex.Lock();while (buffer.full()) {cond.Wait(mutex); // 等待缓冲区非满}buffer.push(1);cond.Notify(); // 通知消费者mutex.Unlock();
}// 消费者线程
void consumer() {mutex.Lock();while (buffer.empty()) {cond.Wait(mutex); // 等待缓冲区非空}buffer.pop();cond.Notify(); // 通知生产者mutex.Unlock();
}
🏳️🌈三、POSIX 信号量
POSIX信号量 和 SystemV信号量 作用相同,都是用于同步操作,达到无冲突的访问共享资源目的。但POSIX可以用于线程间同步。
因此我们可以封装一个 Sem类 来统筹管理 POSIX信号量
扮演 线程同步协调者 的角色,通过 信号量机制 精准控制生产与消费的节奏,确保缓冲区的线程安全与高效运作。其设计不仅简化了复杂的同步逻辑,还通过 封装 和 *RAII机制 *提升了代码的可靠性和可维护性,是多线程编程中资源同步的典范实现。
3.1 Sem类核心功能
用于简化 POSIX 信号量的使用。其主要功能如下:
- 初始化:创建并初始化信号量。
- P 操作(P()):申请资源(信号量减 1),若资源不足则阻塞。
- V 操作(V()):释放资源(信号量加 1),唤醒等待线程。
- 销毁:清理信号量资源。
3.2 设计意义
- 简化信号量操作
- 封装底层 API:将
sem_init
、sem_wait
、sem_post
、sem_destroy
封装为类方法,隐藏实现细节。 - 统一接口:通过
P()
和V()
提供直观的语义(“申请”和“释放”)。
- 提高代码可维护性
RAII 机制:构造函数初始化资源,析构函数自动释放,避免资源泄漏。
{Sem sem(5); // 信号量初始化sem.P(); // 使用信号量
} // 作用域结束自动调用 ~Sem()
- 支持多种同步场景
通过调整初始值value
,可适配不同场景:
- 互斥锁(Mutex):value = 1(二元信号量)。
- 资源池:value = N(表示最多允许 N 个线程同时访问资源)。
- 生产者-消费者模型:通过两个信号量分别控制缓冲区空间和数据数量。
3.3 Sem类封装
#pragma once
#include <semaphore.h>namespace SemModule{int defaultsemval = 1;class Sem{public:Sem(int value = defaultsemval) : _init_value(value){sem_init(&sem, 0, value);}void P(){::sem_wait(&sem);}void V(){::sem_post(&sem);}~Sem(){::sem_destroy(&sem);}private:sem_t sem;int _init_value;};
}
🏳️🌈四、环形队列的逻辑及实现
4.1 核心设计
这是一个基于 信号量(Semaphore)
和 互斥锁(Mutex)
的线程安全环形队列(Ring Buffer
),用于实现 生产者-消费者模型。通过以下机制确保线程安全:
信号量控制:空间信号量(_spacesem
)控制可用空间,数据信号量(_datasem
)控制可消费数据。
互斥锁保护:生产者和消费者各自拥有独立的锁(_p_lock
和 _c_lock
),支持多生产者和多消费者并发操作。
4.2 成员变量
4.3 核心接口
4.3.1 构造函数
RingBuffer(int cap): _ring(cap), // 初始化缓冲区容量_cap(cap), // 记录总容量_p_step(0), // 生产者起始位置_c_step(0), // 消费者起始位置_datasem(0), // 初始无可消费数据_spacesem(cap) {} // 初始空间=总容量
作用:初始化缓冲区及相关同步机制
4.3.2Equeue(生产者接口)
void Equeue(const T &in) {_spacesem.P(); // 等待可用空间(信号量-1){LockGuard lockguard(_p_lock); // 加生产者锁_ring[_p_step] = in; // 写入数据_p_step = (_p_step + 1) % _cap; // 环形移动}_datasem.V(); // 增加可消费数据(信号量+1)
}
流程:
- 申请空间:通过
_spacesem.P()
等待缓冲区有空位。 - 生产数据:在锁保护下写入数据并更新生产者索引。
- 通知消费者:通过
_datasem.V()
增加可消费数据数量。
4.3.3 Pop(消费者接口)
void Pop(T *out) {_datasem.P(); // 等待可消费数据(信号量-1){LockGuard lockguard(_c_lock); // 加消费者锁*out = _ring[_c_step]; // 读取数据_c_step = (_c_step + 1) % _cap; // 环形移动}_spacesem.V(); // 增加可用空间(信号量+1)
}
流程:
- 申请数据:通过
_datasem.P()
等待缓冲区有数据。 - 消费数据:在锁保护下读取数据并更新消费者索引。
- 通知生产者:通过
_spacesem.V()
增加可用空间数量。
4.4 同步机制
4.4.1 信号量控制
- 生产者阻塞条件:
_spacesem
为0时(缓冲区满)。 - 消费者阻塞条件:
_datasem
为0时(缓冲区空)。
4.4.2 互斥锁保护
4.5 整体代码
#include "Cond.hpp"
#include "Mutex.hpp"
#include "Sem.hpp"#include <vector>
#include <pthread.h>namespace RingBufferModule{using namespace LockModule;using namespace CondModule;using namespace SemModule;template<typename T>class RingBuffer{public:RingBuffer(int cap): _ring(cap), // 初始化缓冲区容量_cap(cap), // 记录总容量_p_step(0), // 生产者起始位置_c_step(0), // 消费者起始位置_datasem(0), // 初始无可消费数据_spacesem(cap) // 初始空间=总容量{}// 生产者接口void Equeue(const T &in) {_spacesem.P(); // 等待可用空间(信号量-1){LockGuard lockguard(_p_lock); // 加生产者锁_ring[_p_step] = in; // 写入数据_p_step = (_p_step + 1) % _cap; // 环形移动}_datasem.V(); // 增加可消费数据(信号量+1)}// 消费者接口void Pop(T *out) {_datasem.P(); // 等待可消费数据(信号量-1){LockGuard lockguard(_c_lock); // 加消费者锁*out = _ring[_c_step]; // 读取数据_c_step = (_c_step + 1) % _cap; // 环形移动}_spacesem.V(); // 增加可用空间(信号量+1)}// 析构函数~RingBuffer(){}private:std::vector<T> _ring; // 环,临界资源int _cap; // 环的容量 int _p_step; // 生产者指针位置int _c_step; // 消费者指针位置Mutex _p_lock; // 生产者锁Mutex _c_lock; // 消费者锁Sem _datasem; // 数据信号量Sem _spacesem; // 空间信号量};
}
4.6 测试代码
#include "RingBuffer.hpp"
#include <pthread.h>
#include <unistd.h>
#include <ctime>using namespace RingBufferModule;void *Consumer(void *args)
{RingBuffer<int> *ring_buffer = static_cast<RingBuffer<int> *>(args);while(true){sleep(1);// sleep(1);// 1. 消费数据int data;ring_buffer->Pop(&data);// 2. 处理:花时间std::cout << "消费了一个数据: " << data << std::endl;}
}void *Productor(void *args)
{RingBuffer<int> *ring_buffer = static_cast<RingBuffer<int> *>(args);int data = 0;while (true){// 1. 获取数据:花时间// sleep(1);// 2. 生产数据ring_buffer->Equeue(data);std::cout << "生产了一个数据: " << data << std::endl;data++;}
}int main()
{RingBuffer<int> *ring_buffer = new RingBuffer<int>(5); // 共享资源 -> 临界资源// 单生产,单消费pthread_t c1, p1, c2,c3,p2;pthread_create(&c1, nullptr, Consumer, ring_buffer);pthread_create(&c2, nullptr, Consumer, ring_buffer);pthread_create(&c3, nullptr, Consumer, ring_buffer);pthread_create(&p1, nullptr, Productor, ring_buffer);pthread_create(&p2, nullptr, Productor, ring_buffer);pthread_join(c1, nullptr);pthread_join(c2, nullptr);pthread_join(c3, nullptr);pthread_join(p1, nullptr);pthread_join(p2, nullptr);delete ring_buffer;return 0;
}
👥总结
本篇博文对 【Linux】条件变量封装类及环形队列的实现 做了一个较为详细的介绍,不知道对你有没有帮助呢
觉得博主写得还不错的三连支持下吧!会继续努力的~