并不是真的路过而已,也不是真的不会想你..............................................................................
文章目录
前言
一、【生产者消费者模型的介绍】
1、【概念引入】
2、【特点—321原则】
3、【优点】
二、【基于阻塞队列的生产者消费者模型】
2.1、【概念介绍】
2.2、【模拟实现】
1、生产者消费者步调一致
2、生产者生产的快,消费者消费的慢
3、生产者生产的慢,消费者消费的快
4、满足某一条件时再唤醒对应的生产者或消费者
2.3、【虚假唤醒】
2.4、【基于计算任务的生产者消费者模型】
2.5、【使用Lock_Guard并实现多线程模型】
1、【使用Lock_Guard】
2、【实现多线程】
三、【再谈POSIX信号量】
四、【基于环形队列的生产消费模型】
4.1、【概念介绍】
4.2、【空间资源和数据资源】
4.3、【生产者和消费者申请和释放资源】
1、生产者申请空间资源,释放数据资源
2、消费者申请数据资源,释放空间资源
3、伪代码
4.3、【两个必须遵守的规则】
1、第一个规则:生产者和消费者不能对同一个位置进行访问。
2、第二个规则:无论是生产者还是消费者,都不应该将对方套一个圈以上。
4.4、【模拟实现】
1、【生产者消费者步调一致】
2、【生产者生产的快,消费者消费的慢】
3、【生产者生产的慢,消费者消费的快】
4.5、【添加计算任务】
4.6、【多线程和LockGuard】
4.7、【信号量保护环形队列的原理】
总结
前言
本片博客介绍了Linux中一个十分重要的知识——生产消费模型,除此之外,你还能了解到信号量,虚假换醒,内容很多,请耐心观看!
一、【生产者消费者模型的介绍】
1、【概念引入】
生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题。
生产者和消费者彼此之间不直接通讯,而通过这个容器来通讯,所以生产者生产完数据之后不用等待消费者处理,直接将生产的数据放到这个容器当中,消费者也不用找生产者要数据,而是直接从这个容器里取数据,这个容器就相当于一个缓冲区,平衡了生产者和消费者的处理能力,这个容器实际上就是用来给生产者和消费者解耦的。
2、【特点—321原则】
生产者消费者模型是多线程同步与互斥的一个经典场景,其特点如下:
- 三种关系: 生产者和生产者(互斥关系)、消费者和消费者(互斥关系)、生产者和消费者(互斥关系、同步关系)。
- 两种角色: 生产者和消费者。(通常由进程或线程承担)
- 一个交易场所: 通常指的是内存中的一段缓冲区。(可以自己通过某种方式组织起来)
我们用代码编写生产者消费者模型的时候,本质就是对这三个特点进行维护。
下面有这样一个问题:生产者和生产者、消费者和消费者、生产者和消费者,它们之间为什么会存在互斥关系?
- 介于生产者和消费者之间的容器可能会被多个执行流同时访问,因此我们需要将该临界资源用互斥锁保护起来。
- 其中,所有的生产者和消费者都会竞争式的申请锁,因此生产者和生产者、消费者和消费者、生产者和消费者之间都存在互斥关系。
生产者和消费者之间为什么会存在同步关系?
- 如果让生产者一直生产,那么当生产者生产的数据将容器塞满后,生产者再生产数据就会生产失败。
- 反之,让消费者一直消费,那么当容器当中的数据被消费完后,消费者再进行消费就会消费失败。
- 虽然这样不会造成任何数据不一致的问题,但是这样会引起另一方的饥饿问题,是非常低效的。我们应该让生产者和消费者访问该容器时具有一定的顺序性,比如让生产者先生产,然后再让消费者进行消费。
注意: 互斥关系保证的是数据的正确性,而同步关系是为了让多线程之间协同起来。
3、【优点】
- 解耦。
- 支持并发。
- 支持忙闲不均。
如果我们在主函数中调用某一函数,那么我们必须等该函数体执行完后才继续执行主函数的后续代码,因此函数调用本质上是一种紧耦合。
对应到生产者消费者模型中,函数传参实际上就是生产者生产的过程,而执行函数体实际上就是消费者消费的过程,但生产者只负责生产数据,消费者只负责消费数据,在消费者消费期间生产者可以同时进行生产,因此生产者消费者模型本质是一种松耦合。
二、【基于阻塞队列的生产者消费者模型】
2.1、【概念介绍】
在多线程编程中,阻塞队列(Blocking Queue)是一种常用于实现生产者和消费者模型的数据结构。
其与普通的队列的区别在于:
- 当队列为空时,从队列获取元素的操作将会被阻塞,直到队列中放入了元素。
- 当队列满时,往队列里存放元素的操作会被阻塞,直到有元素从队列中取出。
知识联系: 看到以上阻塞队列的描述,我们很容易想到的就是管道,而阻塞队列最典型的应用场景实际上就是管道的实现。
2.2、【模拟实现】
1、生产者消费者步调一致
为了方便理解,下面我们以单生产者、单消费者为例进行实现。
其中的BlockQueue就是生产者消费者模型当中的交易场所,我们可以用C++STL库当中的queue进行实现,代码如下:
BlockQueue.hpp:
#include <iostream> #include <pthread.h> #include <queue> #include <unistd.h>#define NUM 5template<class T> class BlockQueue { private:bool IsFull(){return _q.size() == _cap;}bool IsEmpty(){return _q.empty();} public:BlockQueue(int cap = NUM): _cap(cap){pthread_mutex_init(&_mutex, nullptr);pthread_cond_init(&_full, nullptr);pthread_cond_init(&_empty, nullptr);}~BlockQueue(){pthread_mutex_destroy(&_mutex);pthread_cond_destroy(&_full);pthread_cond_destroy(&_empty);}//向阻塞队列插入数据(生产者调用)void Push(const T& data){pthread_mutex_lock(&_mutex);while (IsFull())//这里能用if吗{//不能进行生产,直到阻塞队列可以容纳新的数据pthread_cond_wait(&_full, &_mutex);}_q.push(data);pthread_mutex_unlock(&_mutex);pthread_cond_signal(&_empty); //唤醒在empty条件变量下等待的消费者线程}//从阻塞队列获取数据(消费者调用)void Pop(T& data){pthread_mutex_lock(&_mutex);while (IsEmpty())//这里能用if吗{//不能进行消费,直到阻塞队列有新的数据pthread_cond_wait(&_empty, &_mutex);}data = _q.front();_q.pop();pthread_mutex_unlock(&_mutex);pthread_cond_signal(&_full); //唤醒在full条件变量下等待的生产者线程} private:std::queue<T> _q; //阻塞队列int _cap; //阻塞队列最大容器数据个数pthread_mutex_t _mutex;pthread_cond_t _full;//生产者的条件变量pthread_cond_t _empty;//消费者的条件变量 };
相关说明:
- 由于我们实现的是单生产者、单消费者的生产者消费者模型,因此我们不需要维护生产者和生产者之间的关系,也不需要维护消费者和消费者之间的关系,我们只需要维护生产者和消费者之间的同步与互斥关系即可。
- 将BlockingQueue当中存储的数据模板化,方便以后需要时进行复用。
- 这里设置BlockingQueue存储数据的上限为5,当阻塞队列中存储了五组数据时生产者就不能进行生产了,此时生产者就应该被阻塞。
- 阻塞队列是会被生产者和消费者同时访问的临界资源,因此我们需要用一把互斥锁将其保护起来。
- 生产者线程要向阻塞队列当中Push数据,前提是阻塞队列里面有空间,若阻塞队列已经满了,那么此时该生产者线程就需要进行等待,直到阻塞队列中有空间时再将其唤醒。
- 消费者线程要从阻塞队列当中Pop数据,前提是阻塞队列里面有数据,若阻塞队列为空,那么此时该消费者线程就需要进行等待,直到阻塞队列中有新的数据时再将其唤醒。
- 因此在这里我们需要用到两个条件变量,一个条件变量用来描述队列为空,另一个条件变量用来描述队列已满。当阻塞队列满了的时候,要进行生产的生产者线程就应该在full条件变量下进行等待;当阻塞队列为空的时候,要进行消费的消费者线程就应该在empty条件变量下进行等待。
不论是生产者线程还是消费者线程,它们都是先申请到锁进入临界区后再判断是否满足生产或消费条件的,如果对应条件不满足,那么对应线程就会被挂起。但此时该线程是拿着锁的,为了避免死锁问题,在调用pthread_cond_wait函数时就需要传入当前线程手中的互斥锁,此时当该线程被挂起时就会自动释放手中的互斥锁,而当该线程被唤醒时又会自动获取到该互斥锁。
- 当生产者生产完一个数据后,意味着阻塞队列当中至少有一个数据,而此时可能有消费者线程正在empty条件变量下进行等待,因此当生产者生产完数据后需要唤醒在empty条件变量下等待的消费者线程。
- 同样的,当消费者消费完一个数据后,意味着阻塞队列当中至少有一个空间,而此时可能有生产者线程正在full条件变量下进行等待,因此当消费者消费完数据后需要唤醒在full条件变量下等待的生产者线程。
而在主函数中我们就只需要创建一个生产者线程和一个消费者线程,让生产者线程不断生产数据,让消费者线程不断消费数据。
main.cc:
#include "BlockQueue.hpp"void* Producer(void* arg) {BlockQueue<int>* bq = (BlockQueue<int>*)arg;//生产者不断进行生产while (true){sleep(1);int data = rand() % 100 + 1;bq->Push(data); //生产数据std::cout << "Producer: " << data << std::endl;} } void* Consumer(void* arg) {BlockQueue<int>* bq = (BlockQueue<int>*)arg;//消费者不断进行消费while (true){sleep(1);int data = 0;bq->Pop(data); //消费数据std::cout << "Consumer: " << data << std::endl;} } int main() {srand((unsigned int)time(nullptr));pthread_t producer, consumer;BlockQueue<int>* bq = new BlockQueue<int>;//创建生产者线程和消费者线程pthread_create(&producer, nullptr, Producer, bq);pthread_create(&consumer, nullptr, Consumer, bq);//join生产者线程和消费者线程pthread_join(producer, nullptr);pthread_join(consumer, nullptr);delete bq;return 0; }
相关说明:
- 阻塞队列要让生产者线程向队列中Push数据,让消费者线程从队列中Pop数据,因此这个阻塞队列必须要让这两个线程同时看到,所以我们在创建生产者线程和消费者线程时,需要将该阻塞队列作为线程执行函数的参数进行传入。
- 代码中生产者生产数据就是将获取到的随机数Push到阻塞队列,而消费者消费数据就是从阻塞队列Pop数据,为了便于观察,我们可以将生产者生产的数据和消费者消费的数据进行打印输出。
由于代码中生产者是每隔一秒生产一个数据,而消费者是每隔一秒消费一个数据,因此运行代码后我们可以看到生产者和消费者的执行步调是一致的。
小贴士: 以
.hpp
为后缀的文件也是头文件,该头文件同时包含类的定义与实现,调用者只需include该hpp文件即可。因为开源项目一般不需要进行保护,所以在开源项目中用的比较多。2、生产者生产的快,消费者消费的慢
我们可以让生产者不停的进行生产,而消费者每隔一秒进行消费,将mian.cc修改如下:
#include "BlockQueue.hpp"void* Producer(void* arg) {BlockQueue<int>* bq = (BlockQueue<int>*)arg;//生产者不断进行生产while (true){int data = rand() % 100 + 1;bq->Push(data); //生产数据std::cout << "Producer: " << data << std::endl;} } void* Consumer(void* arg) {BlockQueue<int>* bq = (BlockQueue<int>*)arg;while (true){sleep(1);//消费者每隔一秒再进行消费int data = 0;bq->Pop(data); //消费数据std::cout << "Consumer: " << data << std::endl;} }int main() {srand((unsigned int)time(nullptr));pthread_t producer, consumer;BlockQueue<int>* bq = new BlockQueue<int>;//创建生产者线程和消费者线程pthread_create(&producer, nullptr, Producer, bq);pthread_create(&consumer, nullptr, Consumer, bq);//join生产者线程和消费者线程pthread_join(producer, nullptr);pthread_join(consumer, nullptr);delete bq;return 0; }
此时由于生产者生产的很快,运行代码后一瞬间生产者就将阻塞队列打满了,此时生产者想要再进行生产就只能在full条件变量下进行等待,直到消费者消费完一个数据后,生产者才会被唤醒进而继续进行生产,生产者生产完一个数据后又会进行等待,因此后续生产者和消费者的步调又变成一致的了。
3、生产者生产的慢,消费者消费的快
当然,我们也可以让生产者每隔一秒进行生产,而消费者不停的进行消费,修改如下:
main.cc:
#include "BlockQueue.hpp" void* Producer(void* arg) {BlockQueue<int>* bq = (BlockQueue<int>*)arg;//生产者每隔一秒进行生产while (true){sleep(1);int data = rand() % 100 + 1;bq->Push(data); //生产数据std::cout << "Producer: " << data << std::endl;} } void* Consumer(void* arg) {BlockQueue<int>* bq = (BlockQueue<int>*)arg;//消费者不断进行消费while (true){int data = 0;bq->Pop(data); //消费数据std::cout << "Consumer: " << data << std::endl;} }int main() {srand((unsigned int)time(nullptr));pthread_t producer, consumer;BlockQueue<int>* bq = new BlockQueue<int>;//创建生产者线程和消费者线程pthread_create(&producer, nullptr, Producer, bq);pthread_create(&consumer, nullptr, Consumer, bq);//join生产者线程和消费者线程pthread_join(producer, nullptr);pthread_join(consumer, nullptr);delete bq;return 0; }
虽然消费者消费的很快,但一开始阻塞队列中是没有数据的,因此消费者只能在empty条件变量下进行等待,直到生产者生产完一个数据后,消费者才会被唤醒进而进行消费,消费者消费完这一个数据后又会进行等待,因此生产者和消费者的步调就是一致的。
4、满足某一条件时再唤醒对应的生产者或消费者
我们也可以当阻塞队列当中存储的数据大于队列容量的一半时,再唤醒消费者线程进行消费;当阻塞队列当中存储的数据小于队列容器的一半时,再唤醒生产者线程进行生产。
//向阻塞队列插入数据(生产者调用) void Push(const T& data) {pthread_mutex_lock(&_mutex);while (IsFull())//能否使用if{//不能进行生产,直到阻塞队列可以容纳新的数据pthread_cond_wait(&_full, &_mutex);}_q.push(data);if (_q.size() >= _cap / 2)//设置消费条件{pthread_cond_signal(&_empty); //唤醒在empty条件变量下等待的消费者线程}pthread_mutex_unlock(&_mutex); } //从阻塞队列获取数据(消费者调用) void Pop(T& data) {pthread_mutex_lock(&_mutex);while (IsEmpty())//能否使用if{//不能进行消费,直到阻塞队列有新的数据pthread_cond_wait(&_empty, &_mutex);}data = _q.front();_q.pop();if (_q.size() <= _cap / 2)//设置生产条件{pthread_cond_signal(&_full); //唤醒在full条件变量下等待的生产者线程}pthread_mutex_unlock(&_mutex); }
我们仍然让生产者生产的快,消费者消费的慢。
void* Producer(void* arg) {BlockQueue<int>* bq = (BlockQueue<int>*)arg;//生产者不断进行生产while (true){int data = rand() % 100 + 1;bq->Push(data); //生产数据std::cout << "Producer: " << data << std::endl;} } void* Consumer(void* arg) {BlockQueue<int>* bq = (BlockQueue<int>*)arg;//消费者不断进行消费while (true){sleep(1);int data = 0;bq->Pop(data); //消费数据std::cout << "Consumer: " << data << std::endl;} }
运行代码后生产者还是一瞬间将阻塞队列打满后进行等待,但此时不是消费者消费一个数据就唤醒生产者线程,而是当阻塞队列当中的数据小于队列容器的一半时,才会唤醒生产者线程进行生产。
2.3、【虚假唤醒】
还记得我们上面在Push和Pop函数中提的问题吗?也就是在对队列进行为满,为空的操作时我们代码中使用的是while而不是if,那么这里我们能用if吗?
先说结论这里是可以用的,因为这里我们只有单个的生产和消费线程,但是让我们思考一下下面的场景:
那就是当我们有多个生产和消费线程时,对于Pop函数而言,队列如果空了,消费线程就要在pthread_cond_wait(&_empty, &_mutex);这里阻塞,直到生产线程生产数据放到队列,然后通过pthread_cond_signal(&_empty); 唤醒消费线程,但是由于这里是多个消费线程,所以我们一般会使用 pthread_cond_broadcast( )函数用于唤醒等待队列中的全部消费线程,那么这里就会引发一个问题,如果我们的生产线程只生产了一个数据,但是有多个消费线程都被唤醒了,唤醒的所有消费线程首先会竞争锁,一定会有一个消费线程拿到锁,并把生产线程生产的那一个数据消费了,那么此时队列为空,刚刚进行消费的消费线程会将锁释放,并通知生产线程进行生产,但是请注意没有进行消费的那些消费线程也都被唤醒了,也正在等待锁资源,所以这时候如果我们用if,就可能会导致又有一个消费线程拿到了锁,并进行消费,可队列是空的呀,不能消费,这显然是不合理的,而造成这个问题本质原因是因为if只会判断一次队列是否为空,而且是在消费线程醒来之前判断的,所以我们要用while进行重复判断直到队列不为空,才让消费线程进行消费。
下面总结一下到底什么是虚假换醒:
虚假唤醒是一种现象,它只会出现在多线程环境中,指的是在多线程环境下,多个线程等待在同一个条件上,等到条件满足时,所有等待的线程都被唤醒,但由于多个线程执行的顺序不同,后面竞争到锁、获得运行权的线程在运行时条件已经不再满足,线程应该睡眠但是却继续往下运行的一种现象。
用人能听懂的话来介绍一下虚假唤醒:
多线程环境的编程中,我们经常遇到让多个线程等待在一个条件上,等到这个条件成立的时候我们再去唤醒这些线程,让它们接着往下执行代码的场景。假如某一时刻条件成立,所有的线程都被唤醒了,然后去竞争锁,因为同一时刻只会有一个线程能拿到锁,其他的线程都会阻塞到锁上无法往下执行,等到成功争抢到锁的线程消费完条件,释放了锁,后面的线程继续运行,拿到锁时这个条件很可能已经不满足了,这个时候线程应该继续在这个条件上阻塞下去,而不应该继续执行,如果继续执行了,就说发生了虚假唤醒。讲个故事方便理解:
假设这里有三个人:萧炎,美杜莎,药尘,他们三人要玩一个游戏,就是美杜莎只有一个苹果而且要把它放进一个箱子里,萧炎和药尘都要从箱子拿苹果吃,但是他们三个都不知道彼此是那么时候把手放进箱子里,美杜莎把手放进箱子里是放苹果,萧炎和药尘把手放进箱子是为了拿苹果,并且萧炎和药尘只能吃一个苹果,好了为了能吃到苹果,萧炎和药尘只能一直把手放进箱子看看有没有苹果,但是箱子只有一个口,所以一次只能有一个人的手进去,为了防止萧炎和药尘一直把箱子口占着导致美杜莎放不了苹果,这里设置一个铃铛规定三个人无论谁把手放到箱子里,放之后都要摇一下铃铛,对萧炎和药尘而言,美杜莎摇铃铛是告诉他们她把苹果放了进去,他们要去竞争箱子里的口,竞争到的人才能拿到苹果,并且他们在发现箱子里没有苹果之后要去睡觉,而不是一直去看箱子里有没有苹果,直到听到美杜莎摇响铃铛,对美杜莎而言,萧炎和药尘摇铃铛是告诉她箱子里没有苹果,她要去放苹果了,好了到这里游戏可以开始了:
一开始萧炎把手放进箱子的口里发现没有苹果,于是摇了铃铛,药尘重复操作;美杜莎听到铃铛就把苹果放进了箱子并摇响铃铛,叫醒了两个人,萧炎和药尘都听到了铃铛,但是箱子只有一个苹果,也只有一个口能把手放进去,萧炎是主角,所以萧炎先竞争到箱子口,并拿走苹果吃掉了,可是药尘也被叫醒了,他却吃了个寂寞。
可以看出来,等到药尘被叫醒想去拿苹果的时候苹果是没有了的,这时候他应该继续睡的。但是他没有,他醒了,他还想去竞争箱子口然后拿苹果,所以他吃了个寂寞。所以这时候药尘就是虚假换醒。这里的三个人就对应多个线程,箱子对应阻塞队列,苹果对应数据,铃铛对应条件变量,箱子的口对应锁,这个例子与上面的代码几乎没有差别,也就是用while而不用if的原因,因为用while每次萧炎和药尘醒过来之后都会再看看箱子里有没有苹果(唤醒自己的条件是否满足),如果不满足,就会继续睡下去,不会接着往下运行,从而避免了虚假唤醒。
总结一下:
pthread_cond_wait
函数是让当前执行流进行等待的函数,是函数就意味着有可能调用失败,调用失败后该执行流就会继续往后执行。- 其次在多消费者的情况下,当生产者生产了一个数据后如果使用
pthread_cond_broadcast
函数唤醒消费者,就会一次性唤醒多个消费者,但待消费的数据只有一个,此时其他消费者就被虚假唤醒了。- 为了避免出现上述情况,我们就要让线程被唤醒后再次进行判断,确认是否真的满足生产消费条件,因此这里必须要用while进行判断。
- 等待在一个条件上的线程被全部唤醒后会去竞争锁,所以这些线程会一个一个地去消费这个条件,等到后面的线程去消费这个条件时,条件可能已经不满足了,所以每个被唤醒的线程都需要再检查一次条件是否满足。如果不满足,应该继续睡下去;只有满足了才能往下执行。
- 所以判断是否满足生产消费条件时不能用if,而应该用while。
2.4、【基于计算任务的生产者消费者模型】
当然,实际使用生产者消费者模型时可不是简单的让生产者生产一个数字让消费者进行打印而已,我们这样做只是为了测试代码的正确性。
由于我们将BlockingQueue当中存储的数据进行了模板化,此时就可以让BlockingQueue当中存储其他类型的数据。例如,我们想要实现一个基于计算任务的生产者消费者模型,此时我们只需要定义一个Task类,这个类当中需要包含一个Run成员函数,该函数代表着我们想让消费者如何处理拿到的数据。
Task.hpp:
#pragma once #include <iostream> #include <string> #include <unistd.h>const int defaultvalue = 0;//结果的默认值enum//联合体用来表示结果是否可信 {ok = 0,div_zero,//1mod_zero,//2unknow//3 };const std::string opers = "+-*/%)(&";//符号表,用来传递计算符class Task { public:Task(){}//无参构造,方便消费者消费Task(int x, int y, char op): data_x(x), data_y(y), oper(op), result(defaultvalue), code(ok){}void Run(){switch (oper){case '+':result = data_x + data_y;break;case '-':result = data_x - data_y;break;case '*':result = data_x * data_y;break;case '/':{if (data_y == 0)code = div_zero;elseresult = data_x / data_y;}break;case '%':{if (data_y == 0)code = mod_zero;elseresult = data_x % data_y;}break;default:code = unknow;break;}}void operator()()//重载括号使得t.Run()==t(){Run();// sleep(2);}std::string PrintTask()//将任务打印出来{std::string s;s = std::to_string(data_x);s += oper;s += std::to_string(data_y);s += "=?";return s;}std::string PrintResult()//将结果打印出来{std::string s;s = std::to_string(data_x);s += oper;s += std::to_string(data_y);s += "=";s += std::to_string(result);s += " [";s += std::to_string(code);s += "]";return s;}~Task(){}private:int data_x;int data_y;char oper; // + - * / %int result;int code; // 结果码,0: 结果可信 !0: 结果不可信,1,2,3,4 };
BlockQueue.hpp:
#pragma once#include <iostream> #include <pthread.h> #include <queue> #include <unistd.h>#define NUM 5template<class T> class BlockQueue { private:bool IsFull(){return _q.size() == _cap;}bool IsEmpty(){return _q.empty();} public:BlockQueue(int cap = NUM): _cap(cap){pthread_mutex_init(&_mutex, nullptr);pthread_cond_init(&_full, nullptr);pthread_cond_init(&_empty, nullptr);}~BlockQueue(){pthread_mutex_destroy(&_mutex);pthread_cond_destroy(&_full);pthread_cond_destroy(&_empty);}//向阻塞队列插入数据(生产者调用)void Push(const T& data){pthread_mutex_lock(&_mutex);while (IsFull())//这里能用if吗?{//不能进行生产,直到阻塞队列可以容纳新的数据pthread_cond_wait(&_full, &_mutex);}_q.push(data);pthread_mutex_unlock(&_mutex);pthread_cond_signal(&_empty); //唤醒在empty条件变量下等待的消费者线程}//从阻塞队列获取数据(消费者调用)void Pop(T& data){pthread_mutex_lock(&_mutex);while (IsEmpty())//这里能用if吗?{//不能进行消费,直到阻塞队列有新的数据pthread_cond_wait(&_empty, &_mutex);}data = _q.front();_q.pop();pthread_mutex_unlock(&_mutex);pthread_cond_signal(&_full); //唤醒在full条件变量下等待的生产者线程} private:std::queue<T> _q; //阻塞队列int _cap; //阻塞队列最大容器数据个数pthread_mutex_t _mutex;pthread_cond_t _full;//生产者的条件变量pthread_cond_t _empty;//消费者的条件变量 };
此时生产者放入阻塞队列的数据就是一个Task对象,而消费者从阻塞队列拿到Task对象后,就可以用该对象调用Run成员函数进行数据处理。
main.cc:
#include "BlockQueue.hpp" #include "Task.hpp" void* Producer(void* arg) {BlockQueue<Task>* bq = (BlockQueue<Task>*)arg;while (true){//获取任务处理值int x = rand() % 10;int y = rand() % 10;char oper = opers[rand() % (opers.size())];//获取计算符Task t(x, y, oper);//构造任务对象std::cout << "producer task:" << t.PrintTask() << std::endl;bq->Push(t); //生产数据sleep(1);} } void* Consumer(void* arg) {BlockQueue<Task> *bq = static_cast<BlockQueue<Task> *>(arg);//消费者不断进行消费while (true){// sleep(1);Task t;//使用无参构造,方便接收队列中的任务// 1. 消费数据 bq->pop(data);bq->Pop(t);// 2. 进行处理// t.Run();t(); std::cout << "consumer data: " << t.PrintResult() << std::endl;// 注意:消费者没有sleep!!!} }int main() {srand((unsigned int)time(nullptr));pthread_t producer, consumer;BlockQueue<Task>* bq = new BlockQueue<Task>;//创建生产者线程和消费者线程pthread_create(&producer, nullptr, Producer, bq);pthread_create(&consumer, nullptr, Consumer, bq);//join生产者线程和消费者线程pthread_join(producer, nullptr);pthread_join(consumer, nullptr);delete bq;return 0; }
运行代码,当阻塞队列被生产者打满后消费者被唤醒,此时消费者在消费数据时执行的就是计算任务,当阻塞队列当中的数据被消费到低于一定阈值后又会唤醒生产者进行生产。
2.5、【使用Lock_Guard并实现多线程模型】
1、【使用Lock_Guard】
LockGuard本质上就类似于智能指针,是一种RAII思想,我们使用LockGuard来创建锁,让他创建时直接加锁,出了作用域析构自动解锁。
Lock_Guard.hpp:
#pragma once#include <pthread.h>// 不定义锁,默认认为外部会给我们传入锁对象 class Mutex { public:Mutex(pthread_mutex_t *lock):_lock(lock){}void Lock(){pthread_mutex_lock(_lock);}void Unlock(){pthread_mutex_unlock(_lock);}~Mutex(){}private:pthread_mutex_t *_lock; };class LockGuard { public:LockGuard(pthread_mutex_t *lock): _mutex(lock){_mutex.Lock();}~LockGuard(){_mutex.Unlock();} private:Mutex _mutex; };
Lock_Guard是一种自动锁,不需要我们再手动加锁和释放锁,对Block_Queue.hpp修改如下:
#pragma once#include <iostream> #include <pthread.h> #include <queue> #include <unistd.h> #include "Lock_Guard.hpp" #define NUM 5template<class T> class BlockQueue { private:bool IsFull(){return _q.size() == _cap;}bool IsEmpty(){return _q.empty();} public:BlockQueue(int cap = NUM): _cap(cap){pthread_mutex_init(&_mutex, nullptr);pthread_cond_init(&_full, nullptr);pthread_cond_init(&_empty, nullptr);}~BlockQueue(){pthread_mutex_destroy(&_mutex);pthread_cond_destroy(&_full);pthread_cond_destroy(&_empty);}//向阻塞队列插入数据(生产者调用)void Push(const T& data){LockGuard lockguard(&_mutex);//使用lockguard函数调用完,该对象会调用析构函数解锁//pthread_mutex_lock(&_mutex);while (IsFull())//这里能用if吗?{//不能进行生产,直到阻塞队列可以容纳新的数据pthread_cond_wait(&_full, &_mutex);}_q.push(data);//pthread_mutex_unlock(&_mutex);pthread_cond_signal(&_empty); //唤醒在empty条件变量下等待的消费者线程}//从阻塞队列获取数据(消费者调用)void Pop(T& data){//pthread_mutex_lock(&_mutex);// LockGuard lockguard(&_mutex);/使用lockguard函数调用完,该对象会调用析构函数解锁while (IsEmpty())//这里能用if吗?{//不能进行消费,直到阻塞队列有新的数据pthread_cond_wait(&_empty, &_mutex);}data = _q.front();_q.pop();//pthread_mutex_unlock(&_mutex);pthread_cond_signal(&_full); //唤醒在full条件变量下等待的生产者线程} private:std::queue<T> _q; //阻塞队列int _cap; //阻塞队列最大容器数据个数pthread_mutex_t _mutex;pthread_cond_t _full;//生产者的条件变量pthread_cond_t _empty;//消费者的条件变量 };
2、【实现多线程】
多线程版很简单,因为我们的代码本来就是互斥的,因为在生产和消费都进行了上锁。因此直接创建线程就好了。
因此我们只需设计一个新类,对BlockQueue,线程名进行封装,就可以创建多线程了。
代码如下:
template<class T> class ThreadData { public:ThreadData(BlockQueue<T>* bq):_bq(bq){}BlockQueue<T>* _bq;std::string _name; };
又提供了一个类叫做线程数据,方便查看创建的每个线程。这里也对Block_Queue.hpp和main.cc的代码改了一下,最终代码如下:
Block_Queue.hpp:
#pragma once#include <iostream> #include <queue> #include <pthread.h> #include "LockGuard.hpp"const int defaultcap = 5; // 默认容量template<class T> class BlockQueue { public:BlockQueue(int cap = defaultcap):_capacity(cap){pthread_mutex_init(&_mutex, nullptr);pthread_cond_init(&_p_cond, nullptr);pthread_cond_init(&_c_cond, nullptr);}bool IsFull(){return _q.size() == _capacity;}bool IsEmpty(){return _q.size() == 0;}void Push(const T &in) // 生产者的{LockGuard lockguard(&_mutex);// pthread_mutex_lock(&_mutex); while(IsFull()) // 使用while写出来的代码,具有较强的鲁棒、健壮性{// 阻塞等待pthread_cond_wait(&_p_cond, &_mutex); // }_q.push(in);// if(_q.size() > _productor_water_line) pthread_cond_signal(&_c_cond);pthread_cond_signal(&_c_cond);// pthread_mutex_unlock(&_mutex);}void Pop(T *out) // 消费者的{LockGuard lockguard(&_mutex);// pthread_mutex_lock(&_mutex);while(IsEmpty()){// 阻塞等待pthread_cond_wait(&_c_cond, &_mutex);}*out = _q.front();_q.pop();// if(_q.size() < _consumer_water_line) pthread_cond_signal(&_p_cond);pthread_cond_signal(&_p_cond);// pthread_mutex_unlock(&_mutex);}~BlockQueue(){pthread_mutex_destroy(&_mutex);pthread_cond_destroy(&_p_cond);pthread_cond_destroy(&_c_cond);} private:std::queue<T> _q;int _capacity; // _q.size() == _capacity, 满了,不能在生产,_q.size() == 0, 空,不能消费了pthread_mutex_t _mutex;pthread_cond_t _p_cond; // 给生产者的pthread_cond_t _c_cond; // 给消费者的 };
Task.hpp:
#pragma once #include <iostream> #include <string> #include <unistd.h>const int defaultvalue = 0;//结果的默认值enum//联合体用来表示结果是否可信 {ok = 0,div_zero,//1mod_zero,//2unknow//3 };const std::string opers = "+-*/%)(&";//符号表,用来传递计算符class Task { public:Task(){}//无参构造,方便消费者消费Task(int x, int y, char op): data_x(x), data_y(y), oper(op), result(defaultvalue), code(ok){}void Run(){switch (oper){case '+':result = data_x + data_y;break;case '-':result = data_x - data_y;break;case '*':result = data_x * data_y;break;case '/':{if (data_y == 0)code = div_zero;elseresult = data_x / data_y;}break;case '%':{if (data_y == 0)code = mod_zero;elseresult = data_x % data_y;}break;default:code = unknow;break;}}void operator()()//重载括号使得t.Run()==t(){Run();// sleep(2);}std::string PrintTask()//将任务打印出来{std::string s;s = std::to_string(data_x);s += oper;s += std::to_string(data_y);s += "=?";return s;}std::string PrintResult()//将结果打印出来{std::string s;s = std::to_string(data_x);s += oper;s += std::to_string(data_y);s += "=";s += std::to_string(result);s += " [";s += std::to_string(code);s += "]";return s;}~Task(){}private:int data_x;int data_y;char oper; // + - * / %int result;int code; // 结果码,0: 结果可信 !0: 结果不可信,1,2,3,4 };
LockGuard.hpp:
#pragma once#include <pthread.h>// 不定义锁,默认认为外部会给我们传入锁对象 class Mutex { public:Mutex(pthread_mutex_t *lock):_lock(lock){}void Lock(){pthread_mutex_lock(_lock);}void Unlock(){pthread_mutex_unlock(_lock);}~Mutex(){}private:pthread_mutex_t *_lock; };class LockGuard { public:LockGuard(pthread_mutex_t *lock): _mutex(lock){_mutex.Lock();}~LockGuard(){_mutex.Unlock();} private:Mutex _mutex; };
main.cc:
#include "BlockQueue.hpp" #include "Task.hpp" #include <pthread.h> #include <ctime> #include <sys/types.h> #include <unistd.h> #include<sstream> template<class T> class ThreadData { public:ThreadData(BlockQueue<T>* bq):_bq(bq){}BlockQueue<T>* _bq;std::string _name; };void *consumer(void *args) {ThreadData<Task> *td = (ThreadData<Task>*)args;while (true){// sleep(1);Task t;// 1. 消费数据 bq->pop(&data);td->_bq->Pop(&t);// 2. 进行处理// t.Run();t(); std::cout << "consumer data: " << t.PrintResult() << ", " << td->_name << std::endl;// 注意:消费者没有sleep!!!}return nullptr; }void *productor(void *args) {ThreadData<Task> *td = (ThreadData<Task>*)args;while (true){int data1 = rand() % 10; // [1, 10] int data2 = rand() % 10; // [1, 10] char oper = opers[rand() % (opers.size())];Task t(data1, data2, oper);std::cout << "productor task: " << t.PrintTask()<<td->_name << std::endl;// 2. 进行生产// bq->Push(data);td->_bq->Push(t);sleep(1);}return nullptr; } int main() {srand((uint16_t)time(nullptr) ^ getpid() ^ pthread_self()); // 只是为了形成更随机的数据BlockQueue<Task> *bq = new BlockQueue<Task>();pthread_t c[3], p[2]; // 消费者和生产者//创建3个消费线程for(int i=1;i<4;i++){ThreadData<Task> *td = new ThreadData<Task>(bq);std::stringstream ss;ss<<"cthread-"<<i;td->_name=ss.str();pthread_create(&c[i-1], nullptr, consumer, td);}//创建两个生产线程for(int i=1;i<3;i++){ThreadData<Task> *td = new ThreadData<Task>(bq);std::stringstream ss;ss<<"pthread-"<<i;td->_name=ss.str();pthread_create(&p[i-1], nullptr, productor, td);}//进行等待for(int i=0;i<3;i++){pthread_join(c[i], nullptr);}pthread_join(p[0], nullptr);pthread_join(p[1], nullptr);delete bq;return 0; }
Makefile:
testblockqueue:main.ccg++ -o $@ $^ -lpthread .PHONY:clean clean:rm testblockqueue
结果:
三、【再谈POSIX信号量】
3.1、【信号量的原理及概念】
1、【原理】
- 我们将可能会被多个执行流同时访问的资源叫做临界资源,临界资源需要进行保护否则会出现数据不一致等问题。
- 当我们仅用一个互斥锁对临界资源进行保护时,相当于我们将这块临界资源看作一个整体,同一时刻只允许一个执行流对这块临界资源进行访问。
- 但实际我们可以将这块临界资源再分割为多个区域,当多个执行流需要访问临界资源时,如果这些执行流访问的是临界资源的不同区域,那么我们可以让这些执行流同时访问临界资源的不同区域,此时不会出现数据不一致等问题。
2、【概念】
信号量(信号灯)本质是一个计数器,是描述临界资源中资源数目的计数器,信号量能够更细粒度的对临界资源进行管理。
每个执行流在进入临界区之前都应该先申请信号量,申请成功就有了操作特定的临界资源的权限,当操作完毕后就应该释放信号量。
信号量的PV操作:
- P操作:我们将申请信号量称为P操作,申请信号量的本质就是申请获得临界资源中某块资源的使用权限,当申请成功时临界资源中资源的数目应该减一,因此P操作的本质就是让计数器减一。
- V操作:我们将释放信号量称为V操作,释放信号量的本质就是归还临界资源中某块资源的使用权限,当释放成功时临界资源中资源的数目就应该加一,因此V操作的本质就是让计数器加一。
PV操作必须是原子操作:
多个执行流为了访问临界资源会竞争式的申请信号量,因此信号量是会被多个执行流同时访问的,也就是说信号量本质也是临界资源。
但信号量本质就是用于保护临界资源的,我们不可能再用信号量去保护信号量,所以信号量的PV操作必须是原子操作。
注意: 内存当中变量的
++
、--
操作并不是原子操作,因此信号量不可能只是简单的对一个全局变量进行++
、--
操作。申请信号量失败会被挂起等待:
当执行流在申请信号量时,可能此时信号量的值为0,也就是说信号量描述的临界资源已经全部被申请了,此时该执行流就应该在该信号量的等待队列当中进行等待,直到有信号量被释放时再被唤醒。
注意: 信号量的本质是计数器,但不意味着只有计数器,信号量还包括一个等待队列。
3.2、【信号量函数】
1、【初始化信号量】
初始化信号量的函数叫做sem_init,该函数的函数原型如下:
int sem_init(sem_t *sem, int pshared, unsigned int value);
参数说明:
- sem:需要初始化的信号量。
- pshared:传入0值表示线程间共享,传入非零值表示进程间共享。
- value:信号量的初始值(计数器的初始值)。
返回值说明:
- 初始化信号量成功返回0,失败返回-1。
注意: POSIX信号量和System V信号量作用相同,都是用于同步操作,达到无冲突的访问共享资源目的,但POSIX信号量可以用于线程间同步。
2、【销毁信号量】
销毁信号量的函数叫做sem_destroy,该函数的函数原型如下:
int sem_destroy(sem_t *sem);
参数说明:
- sem:需要销毁的信号量。
返回值说明:
- 销毁信号量成功返回0,失败返回-1。
3、【等待信号量(申请信号量)】
等待信号量的函数叫做sem_wait,该函数的函数原型如下:
int sem_wait(sem_t *sem);
参数说明:
- sem:需要等待的信号量。
返回值说明:
- 等待信号量成功返回0,信号量的值减一。
- 等待信号量失败返回-1,信号量的值保持不变。
4、【发布信号量(释放信号量)】
发布信号量的函数叫做sem_post,该函数的函数原型如下:
int sem_post(sem_t *sem);
参数说明:
- sem:需要发布的信号量。
返回值说明:
- 发布信号量成功返回0,信号量的值加一。
- 发布信号量失败返回-1,信号量的值保持不变。
3.3、【二元信号量模拟实现互斥功能】
信号量本质是一个计数器,如果将信号量的初始值设置为1,那么此时该信号量叫做二元信号量。
信号量的初始值为1,说明信号量所描述的临界资源只有一份,此时信号量的作用基本等价于互斥锁。
例如,下面我们实现一个多线程抢票系统,其中我们用二元信号量模拟实现多线程互斥。
我们在主线程当中创建四个新线程,让这四个新线程执行抢票逻辑,并且每次抢完票后打印输出此时剩余的票数,其中我们用全局变量tickets记录当前剩余的票数,此时tickets是会被多个执行流同时访问的临界资源,在下面的代码中我们并没有对tickets进行任何保护操作。
#include <iostream> #include <string> #include <unistd.h> #include <pthread.h>int tickets = 2000; void* TicketGrabbing(void* arg) {std::string name = (char*)arg;while (true){if (tickets > 0){usleep(1000);std::cout << name << " get a ticket, tickets left: " << --tickets << std::endl;}else{break;}}std::cout << name << " quit..." << std::endl;pthread_exit((void*)0); }int main() {pthread_t tid1, tid2, tid3, tid4;pthread_create(&tid1, nullptr, TicketGrabbing, (void*)"thread 1");pthread_create(&tid2, nullptr, TicketGrabbing, (void*)"thread 2");pthread_create(&tid3, nullptr, TicketGrabbing, (void*)"thread 3");pthread_create(&tid4, nullptr, TicketGrabbing, (void*)"thread 4");pthread_join(tid1, nullptr);pthread_join(tid2, nullptr);pthread_join(tid3, nullptr);pthread_join(tid4, nullptr);return 0; }
这里会出现线程安全问题我们之前已经接触过了,运行代码后可以看到,线程打印输出剩余票数时出现了票数剩余为负数的情况,这是不符合我们预期的。
下面我们在抢票逻辑当中加入二元信号量,让每个线程在访问全局变量tickets之前先申请信号量,访问完毕后再释放信号量,此时二元信号量就达到了互斥的效果。
##include <iostream> #include <string> #include <unistd.h> #include <pthread.h> #include <semaphore.h>//使用信号量需要包含该头文件 //这里使用自定义的是因为我们可以对信号量的PV操作 class Sem { public:Sem(int num){sem_init(&_sem, 0, num);}~Sem(){sem_destroy(&_sem);}void P(){sem_wait(&_sem);//--}void V(){sem_post(&_sem);//++} private:sem_t _sem; };Sem sem(1); //二元信号量 int tickets = 2000; void* TicketGrabbing(void* arg) {std::string name = (char*)arg;while (true){sem.P();//多线程进入之前,只有一个线程获得进入机会,信号量为1只有一块资源,可以理解为信号量掌控着资源的使用权if (tickets > 0){usleep(1000);std::cout << name << " get a ticket, tickets left: " << --tickets << std::endl;//抢票sem.V();//有机会的线程执行完归还资源使用权}else{sem.V();//票抢完了也要归还break;}}std::cout << name << " quit..." << std::endl;pthread_exit((void*)0); }int main() {pthread_t tid1, tid2, tid3, tid4;pthread_create(&tid1, nullptr, TicketGrabbing, (void*)"thread 1");pthread_create(&tid2, nullptr, TicketGrabbing, (void*)"thread 2");pthread_create(&tid3, nullptr, TicketGrabbing, (void*)"thread 3");pthread_create(&tid4, nullptr, TicketGrabbing, (void*)"thread 4");pthread_join(tid1, nullptr);pthread_join(tid2, nullptr);pthread_join(tid3, nullptr);pthread_join(tid4, nullptr);return 0; }
运行代码后就不会出现剩余票数为负的情况了,因为此时同一时刻只会有一个执行流对全局变量tickets进行访问,不会出现数据不一致的问题。
四、【基于环形队列的生产消费模型】
4.1、【概念介绍】
现在我们使用上面的信号量接口,来写一个基于环形队列的生产者消费者模型。
说是一个环形队列,其实本质上就是一个数组,从头放到尾部,如果当n大于了数组的长度,我们就让n%=v.size(),这样n又会回到索引0的位置。就像环一样滚动起来了。
现在有了环形队列,既然是生产者消费者模型,我们就得让生产者去生产数据并往环形队列里面放入,消费者从环形队列中拿取数据。如果消费者不进行消费,生产者最多能往队列里面生产 size 份数据。也就是消费者被生产者刚好超了一圈。就比如下图,生产者生产了一圈,消费者一直没有消费,此时生产者就不能再生产了,因为这会造成数据覆盖,必须让消费者去消费之后再生产。
同理,消费者也最多赶上生产者,就不能再消费了,因为数据已经被消费者消费完了,生产者还没来得及生产。
所以总结就是,为空,应该让生产者去生产,为满,应该让消费者去消费,但是为空或者为满只是占了实际生产消费情况的少部分,更多的情况并没有那么极端。因此我们只需要偶尔进行维持就可以了,这样就能让生产者和消费者同时进行操作了。
4.2、【空间资源和数据资源】
生产者关注的是空间资源,消费者关注的是数据资源,所以对于生产者和消费者来说,它们关注的资源是不同的:
- 生产者关注的是环形队列当中是否有空间(blank),只要有空间生产者就可以进行生产。
- 消费者关注的是环形队列当中是否有数据(data),只要有数据消费者就可以进行消费。
现在我们用信号量来描述环形队列当中的空间资源(blank_sem)和数据资源(data_sem),在我们初始信号量时给它们设置的初始值是不同的:
- blank_sem的初始值我们应该设置为环形队列的容量size,因为刚开始时环形队列当中全是空间。
- data_sem的初始值我们应该设置为0,因为刚开始时环形队列当中没有数据。
4.3、【生产者和消费者申请和释放资源】
1、生产者申请空间资源,释放数据资源
对于生产者来说,生产者每次生产数据前都需要先申请blank_sem:
- 如果blank_sem的值不为0,则信号量申请成功,此时生产者可以进行生产操作。
- 如果blank_sem的值为0,则信号量申请失败,此时生产者需要在blank_sem的等待队列下进行阻塞等待,直到环形队列当中有新的空间后再被唤醒。
当生产者生产完数据后,应该释放data_sem:
- 虽然生产者在进行生产前是对blank_sem进行的P操作,但是当生产者生产完数据,应该对data_sem进行V操作而不是blank_sem。
- 生产者在生产数据前申请到的是
blank位置
,当生产者生产完数据后,该位置当中存储的是生产者生产的数据,在该数据被消费者消费之前,该位置不再是blank位置
,而应该是data位置
。- 当生产者生产完数据后,意味着环形队列当中多了一个
data位置
,因此我们应该对data_sem进行V操作。2、消费者申请数据资源,释放空间资源
对于消费者来说,消费者每次消费数据前都需要先申请data_sem:
- 如果data_sem的值不为0,则信号量申请成功,此时消费者可以进行消费操作。
- 如果data_sem的值为0,则信号量申请失败,此时消费者需要在data_sem的等待队列下进行阻塞等待,直到环形队列当中有新的数据后再被唤醒。
当消费者消费完数据后,应该释放blank_sem:
- 虽然消费者在进行消费前是对data_sem进行的P操作,但是当消费者消费完数据,应该对blank_sem进行V操作而不是data_sem。
- 消费者在消费数据前申请到的是
data位置
,当消费者消费完数据后,该位置当中的数据已经被消费过了,再次被消费就没有意义了,为了让生产者后续可以在该位置生产新的数据,我们应该将该位置算作blank位置
,而不是data位置
。- 当消费者消费完数据后,意味着环形队列当中多了一个
blank位置
,因此我们应该对blank_sem进行V操作。3、伪代码
4.3、【两个必须遵守的规则】
在基于环形队列的生产者和消费者模型当中,生产者和消费者必须遵守如下两个规则。
1、第一个规则:生产者和消费者不能对同一个位置进行访问。
生产者和消费者在访问环形队列时:
- 如果生产者和消费者访问的是环形队列当中的同一个位置,那么此时生产者和消费者就相当于同时对这一块临界资源进行了访问,这当然是不允许的。
- 而如果生产者和消费者访问的是环形队列当中的不同位置,那么此时生产者和消费者是可以同时进行生产和消费的,此时不会出现数据不一致等问题。
2、第二个规则:无论是生产者还是消费者,都不应该将对方套一个圈以上。
- 生产者从消费者的位置开始一直按顺时针方向进行生产,如果生产者生产的速度比消费者消费的速度快,那么当生产者绕着消费者生产了一圈数据后再次遇到消费者,此时生产者就不应该再继续生产了,因为再生产就会覆盖还未被消费者消费的数据。
- 同理,消费者从生产者的位置开始一直按顺时针方向进行消费,如果消费者消费的速度比生产者生产的速度快,那么当消费者绕着生产者消费了一圈数据后再次遇到生产者,此时消费者就不应该再继续消费了,因为再消费就会消费到缓冲区中保存的废弃数据。
4.4、【模拟实现】
1、【生产者消费者步调一致】
其中的RingQueue就是生产者消费者模型当中的交易场所,我们可以用C++STL库当中的vector进行实现。
RingQueue.hpp:
#pragma once#include <iostream> #include <unistd.h> #include <pthread.h> #include <semaphore.h> #include <vector>#define NUM 8template<class T> class RingQueue { private://P操作void P(sem_t& s){sem_wait(&s);}//V操作void V(sem_t& s){sem_post(&s);} public:RingQueue(int cap = NUM): _cap(cap), _p_pos(0), _c_pos(0){_q.resize(_cap);sem_init(&_blank_sem, 0, _cap); //blank_sem初始值设置为环形队列的容量sem_init(&_data_sem, 0, 0); //data_sem初始值设置为0}~RingQueue(){sem_destroy(&_blank_sem);sem_destroy(&_data_sem);}//向环形队列插入数据(生产者调用)void Push(const T& data){P(_blank_sem); //生产者关注空间资源,本质对空间信号量--_q[_p_pos] = data;V(_data_sem); //生产,对数据信号量++//更新下一次生产的位置_p_pos++;_p_pos %= _cap;}//从环形队列获取数据(消费者调用)void Pop(T& data){P(_data_sem); //消费者关注数据资源,对数据信号量--data = _q[_c_pos];V(_blank_sem);//对空间信号量++//更新下一次消费的位置_c_pos++;_c_pos %= _cap;} private:std::vector<T> _q; //环形队列int _cap; //环形队列的容量上限int _p_pos; //生产位置int _c_pos; //消费位置sem_t _blank_sem; //描述空间资源sem_t _data_sem; //描述数据资源 };
相关说明:
- 当不设置环形队列的大小时,我们默认将环形队列的容量上限设置为8。
- 代码中的RingQueue是用vector实现的,生产者每次生产的数据放到vector下标为p_pos的位置,消费者每次消费的数据来源于vector下标为c_pos的位置。
- 生产者每次生产数据后p_pos都会进行++,标记下一次生产数据的存放位置,++后的下标会与环形队列的容量进行取模运算,实现“环形”的效果。
- 消费者每次消费数据后c_pos都会进行++,标记下一次消费数据的来源位置,++后的下标会与环形队列的容量进行取模运算,实现“环形”的效果。
- p_pos只会由生产者线程进行更新,c_pos只会由消费者线程进行更新,对这两个变量访问时不需要进行保护,因此代码中将p_pos和c_pos的更新放到了V操作之后,就是为了尽量减少临界区的代码。
为了方便理解,我们这里实现单生产者、单消费者的生产者消费者模型。于是在主函数我们就只需要创建一个生产者线程和一个消费者线程,生产者线程不断生产数据放入环形队列,消费者线程不断从环形队列里取出数据进行消费。
main.cc:
#include "RingQueue.hpp"void* Producer(void* arg) {RingQueue<int>* rq = (RingQueue<int>*)arg;while (true){sleep(1);int data = rand() % 100 + 1;rq->Push(data);std::cout << "Producer: " << data << std::endl;} } void* Consumer(void* arg) {RingQueue<int>* rq = (RingQueue<int>*)arg;while (true){sleep(1);int data = 0;rq->Pop(data);std::cout << "Consumer: " << data << std::endl;} } int main() {srand((unsigned int)time(nullptr));pthread_t producer, consumer;//创建单个生产和消费线程RingQueue<int>* rq = new RingQueue<int>;pthread_create(&producer, nullptr, Producer, rq);pthread_create(&consumer, nullptr, Consumer, rq);pthread_join(producer, nullptr);pthread_join(consumer, nullptr);delete rq;return 0; }
相关说明:
- 环形队列要让生产者线程向队列中Push数据,让消费者线程从队列中Pop数据,因此这个环形队列必须要让这两个线程同时看到,所以我们在创建生产者线程和消费者线程时,需要将环形队列作为线程执行例程的参数进行传入。
- 代码中生产者生产数据就是将获取到的随机数Push到环形队列,而消费者就是从环形队列Pop数据,为了便于观察,我们可以将生产者生产的数据和消费者消费的数据进行打印输出。
结果:
2、【生产者生产的快,消费者消费的慢】
我们可以让生产者不停的进行生产,而消费者每隔一秒进行消费。
void* Producer(void* arg) {RingQueue<int>* rq = (RingQueue<int>*)arg;while (true){int data = rand() % 100 + 1;rq->Push(data);std::cout << "Producer: " << data << std::endl;} } void* Consumer(void* arg) {RingQueue<int>* rq = (RingQueue<int>*)arg;while (true){sleep(1);int data = 0;rq->Pop(data);std::cout << "Consumer: " << data << std::endl;} }
此时由于生产者生产的很快,运行代码后一瞬间生产者就将环形队列打满了,此时生产者想要再进行生产,但空间资源已经为0了,于是生产者只能在blank_sem的等待队列下进行阻塞等待,直到由消费者消费完一个数据后对blank_sem进行了V操作,生产者才会被唤醒进而继续进行生产。
但由于生产者的生产速度很快,生产者生产完一个数据后又会进行等待,因此后续生产者和消费者的步调又变成一致的了。
3、【生产者生产的慢,消费者消费的快】
当然我们也可以让生产者每隔一秒进行生产,而消费者不停的进行消费。
void* Producer(void* arg) {RingQueue<int>* rq = (RingQueue<int>*)arg;while (true){sleep(1);int data = rand() % 100 + 1;rq->Push(data);std::cout << "Producer: " << data << std::endl;} } void* Consumer(void* arg) {RingQueue<int>* rq = (RingQueue<int>*)arg;while (true){int data = 0;rq->Pop(data);std::cout << "Consumer: " << data << std::endl;} }
虽然消费者消费的很快,但一开始环形队列当中的数据资源为0,因此消费者只能在data_sem的等待队列下进行阻塞等待,直到生产者生产完一个数据后对data_sem进行了V操作,消费者才会被唤醒进而进行消费。
但由于消费者的消费速度很快,消费者消费完一个数据后又会进行等待,因此后续生产者和消费者的步调又变成一致的了。
4.5、【添加计算任务】
我们这里也可以像上面那样使用Task.hpp来对我们的生产消费模型进行封装。
#include "RingQueue.hpp" #include "Task.hpp" void *Producer(void *args) {// sleep(5);RingQueue<Task> *rq = static_cast<RingQueue<Task> *>(args);while (true){int data1 = rand() % 10; // [1, 10] int data2 = rand() % 10; // [1, 10] char oper = opers[rand() % (opers.size())];Task t(data1, data2, oper);std::cout << "productor task: " << t.PrintTask() << std::endl;rq->Push(t);} }void *Consumer(void *args) {RingQueue<Task> *rq = static_cast<RingQueue<Task> *>(args);while (true){// sleep(1);Task t;rq->Pop(t);t();std::cout << "consumer done, data is : " << t.PrintResult() << std::endl;} } int main() {srand((unsigned int)time(nullptr));pthread_t producer, consumer;//创建单个生产和消费线程RingQueue<Task>* rq = new RingQueue<Task>;pthread_create(&producer, nullptr, Producer, rq);pthread_create(&consumer, nullptr, Consumer, rq);pthread_join(producer, nullptr);pthread_join(consumer, nullptr);delete rq;return 0; }
结果:
4.6、【多线程和LockGuard】
现在我们想让有多个消费者和多个生产者一起去操作。虽然我们确实预先申请了信号量,但是环形队列的实现,生产者有一个位置,消费者有一个位置。所以可能会出现多个生产者都获得信号量,但是只能在同一个位置进行生产,消费者也一样,所以我们要实现生产者与生产者之间,消费者与消费者之间的互斥。
也就是说必须要保持同一位置下只能有一个生产者和一个消费者访问,不能出现第二个生产者和消费者,因此需要添加两把互斥锁来保证,消费者的互斥和生产者的互斥。
两把锁的目的是让生产者和消费者一起进行,如果只有一把锁就同一时间只能有一个进行。
我们代码部分也比较简单啊,定义锁,初始化锁,加锁与解锁,销毁锁就可以了。 最后也可以用我们的LockGuard,如下:
RingQueue.hpp:
#pragma once#include <iostream> #include <unistd.h> #include <pthread.h> #include <semaphore.h> #include <vector>#define NUM 8template<class T> class RingQueue { private://P操作void P(sem_t& s){sem_wait(&s);}//V操作void V(sem_t& s){sem_post(&s);} public:RingQueue(int cap = NUM): _cap(cap), _p_pos(0), _c_pos(0){_q.resize(_cap);sem_init(&_blank_sem, 0, _cap); //blank_sem初始值设置为环形队列的容量sem_init(&_data_sem, 0, 0); //data_sem初始值设置为0pthread_mutex_init(&_p_mutex, nullptr);pthread_mutex_init(&_c_mutex, nullptr);}~RingQueue(){sem_destroy(&_blank_sem);sem_destroy(&_data_sem);pthread_mutex_destroy(&_p_mutex);pthread_mutex_destroy(&_c_mutex);}//向环形队列插入数据(生产者调用)void Push(const T& data){P(_blank_sem); //生产者关注空间资源,本质对空间信号量--// 生产// 这里有个问题:先加锁1,还是先申请信号量?pthread_mutex_lock(&_p_mutex);_q[_p_pos] = data;pthread_mutex_unlock(&_p_mutex);V(_data_sem); //生产,对数据信号量++//更新下一次生产的位置_p_pos++;_p_pos %= _cap;}//从环形队列获取数据(消费者调用)void Pop(T& data){P(_data_sem); //消费者关注数据资源,对数据信号量--pthread_mutex_lock(&_c_mutex);data = _q[_c_pos];pthread_mutex_unlock(&_c_mutex);V(_blank_sem);//对空间信号量++//更新下一次消费的位置_c_pos++;_c_pos %= _cap;} private:std::vector<T> _q; //环形队列int _cap; //环形队列的容量上限int _p_pos; //生产位置int _c_pos; //消费位置sem_t _blank_sem; //描述空间资源sem_t _data_sem; //描述数据资源pthread_mutex_t _p_mutex;pthread_mutex_t _c_mutex; };
main.cc:
#include "RingQueue.hpp" #include "Task.hpp" void *Producer(void *args) {// sleep(5);RingQueue<Task> *rq = static_cast<RingQueue<Task> *>(args);while (true){int data1 = rand() % 10; // [1, 10] // 将来深刻理解生产消费,就要从这里入手,TODOusleep(rand() % 123);int data2 = rand() % 10; // [1, 10] // 将来深刻理解生产消费,就要从这里入手,TODOusleep(rand() % 123);char oper = opers[rand() % (opers.size())];Task t(data1, data2, oper);std::cout << "productor task: " << t.PrintTask() << std::endl;rq->Push(t);} }void *Consumer(void *args) {RingQueue<Task> *rq = static_cast<RingQueue<Task> *>(args);while (true){// sleep(1);Task t;rq->Pop(t);t();std::cout << "consumer done, data is : " << t.PrintResult() << std::endl;} } int main() {srand((unsigned int)time(nullptr));//pthread_t producer, consumer;//创建单个生产和消费线程RingQueue<Task>* rq = new RingQueue<Task>;pthread_t p[2],c[2];//创建多线程pthread_create(&p[0],nullptr,Producer,rq);pthread_create(&c[0],nullptr,Consumer,rq);pthread_create(&p[1],nullptr,Producer,rq);pthread_create(&c[1],nullptr,Consumer,rq);pthread_join(p[0],nullptr);pthread_join(c[0],nullptr);pthread_join(p[1],nullptr);pthread_join(c[1],nullptr);// pthread_create(&producer, nullptr, Producer, rq);// pthread_create(&consumer, nullptr, Consumer, rq);// pthread_join(producer, nullptr);// pthread_join(consumer, nullptr);delete rq;return 0; }
结果:
这里有个问题:
先说结论,两个都可以,但是更推荐使用先申请信号量,因为信号量是用来决定资源的可用数目的,如果我们先加锁,那么如果没有信号量我们什么也做不了,就好像我们去看电影,先到电影院我们再买票一样,如果票卖完了,我们就是白去了,如果先申请信号量,即使后续我们没有竞争到锁,也不碍事,因为我们提前预定了我们的资源,就好像我们看电影,我们应该先买票,预定好座位以后再去。
这里使用LockGuard只需要我们用局部域即可具体如下:
4.7、【信号量保护环形队列的原理】
在blank_sem和data_sem两个信号量的保护后,该环形队列中不可能会出现数据不一致的问题。
因为只有当生产者和消费者指向同一个位置并访问时,才会导致数据不一致的问题,而此时生产者和消费者在对环形队列进行写入或读取数据时,只有两种情况会指向同一个位置:
- 环形队列为空时。
- 环形队列为满时。
但是在这两种情况下,生产者和消费者不会同时对环形队列进行访问:
- 当环形队列为空的时,消费者一定不能进行消费,因为此时数据资源为0。
- 当环形队列为满的时,生产者一定不能进行生产,因为此时空间资源为0。
也就是说,当环形队列为空和满时,我们已经通过信号量保证了生产者和消费者的串行化过程。而除了这两种情况之外,生产者和消费者指向的都不是同一个位置,因此该环形队列当中不可能会出现数据不一致的问题。并且大部分情况下生产者和消费者指向并不是同一个位置,因此大部分情况下该环形队列可以让生产者和消费者并发的执行。
总结
本篇博客到这里也就结束了,感谢你的观看!
..................................................爱情好像流沙,心里的牵挂,不愿放下,oh baby让我这样吧
————《流沙》