前言
在学习完线程互斥和同步后,尤其是在学习同步时一定会遇到个问题,为什么条件变量的等待接口为什么要传递一个锁进去?那么本章就会来解决这个问题,不过前提是我们得了解生产消费模型。本章我们还会介绍信号量的使用。
生产消费模型
生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。这个阻塞队列就是用来给生产者和消费者解耦的。
-
为什么生产者和消费者之间存在同步关系?
如果让生产者一直生产数据,当容器被数据塞满后,生产者再进行生产就会导致生产失败。
如果让消费者一直拿取数据,当容器数据被搬空后,消费者再进行消费就会导致消费失败。
这就导致了上文所说的「饥饿问题」,为了解决此问题,应该让生产者和消费者之间在访问容器时具有一定的顺序性。- 当容器的管理者发现容器内数据量下降到标准线之下时,就让消费者停止消费,通知生产者生产数据。
- 当容器中的数据量被填充到标准线之上时,就让生产者停止生产,通知消费者过来消费。
优点:
1. 解耦生产与消费逻辑
- 核心优势:生产者只负责生成数据,消费者只负责处理数据,两者无需直接通信,降低了代码的耦合度。
- 应用场景:
- 日志系统中,生产者(日志生成模块)和消费者(日志写入文件模块)通过队列解耦。
- 消息队列(如 Kafka、RabbitMQ)中,服务间通过队列异步通信。
2. 平衡处理速度差异
- 缓冲作用:通过缓冲区缓解生产者和消费者的速度不匹配问题。
- 生产者快于消费者:缓冲区积累数据,避免生产者因消费者来不及处理而阻塞。
- 消费者快于生产者:缓冲区为空时,消费者等待新数据,避免忙等待(Busy Waiting)。
- 实际案例:
- 视频流处理中,生产者(视频采集)可能因网络波动产生数据速度不稳定,缓冲区可平滑波动。
- 高并发下单系统,订单生成(生产者)和库存扣减(消费者)通过队列削峰填谷。
3. 支持并发与并行
- 并行处理:
- 生产者线程和消费者线程可以同时运行,提高系统吞吐量。
- 可扩展为多生产者、多消费者模式,充分利用多核 CPU 资源。
- 示例:
- Web 服务器中,生产者(接收请求的线程)将请求放入队列,多个消费者线程(处理请求的线程)并行处理。
4. 流量控制与背压(Back Pressure)
- 避免资源耗尽:通过限制缓冲区大小,防止生产者过度生产导致内存溢出(OOM)。
- 当缓冲区满时,生产者被阻塞或丢弃数据(根据策略)。
- 示例:
- 实时数据处理系统中,若下游消费者处理能力不足,可通过有界队列限制上游生产速度。
5. 提高资源利用率
- 减少空闲等待:
- 消费者线程在队列为空时休眠(通过条件变量),不占用 CPU。
- 生产者线程在队列满时休眠,避免忙等待。
- 对比单线程模型:
- 单线程顺序执行时,生成和处理交替进行,无法并行;而生产者-消费者模型允许两者并发执行。
6. 模块化与可扩展性
- 灵活扩展:
- 生产者或消费者可独立增减(如动态调整消费者线程数量)。
- 缓冲区的实现可以替换(如内存队列、磁盘队列、分布式队列)。
- 示例:
- 分布式系统中,生产者(客户端)和消费者(服务端)通过消息队列(如 Redis、Kafka)解耦,支持横向扩展。
7. 容错与异步处理
- 失败重试:若消费者处理失败,数据仍保留在队列中,可重试或转移给其他消费者。
- 异步化:生产者无需等待消费者处理完成即可继续工作,适合高延迟场景(如网络请求)。
基于阻塞队列的生产消费模型
在多线程编程中阻塞队列(Blocking Queue)是一种常用于实现生产者和消费者模型的数据结构。其与普通的队列区别在于,当队列为空时,从队列获取元素的操作将会被阻塞,直到队列中被放入了元素;当队列满时,往队列里存放元素的操作也会被阻塞,直到有元素被从队列中取出(以上的操作都是基于不同的线程来说的,线程在对阻塞队列进程操作时会被阻塞)
代码演示:
咱们先实现一个单生产单消费的示例,并且阻塞队列存放的是一个int类型的整数。那么我们就可以做到生产者生产一个,消费者消费一个。
BlockQueue.hpp
#include <iostream>
#include <string>
#include <queue>
#include <ctime>
#include <unistd.h>
#include <pthread.h>const static int default_max_cap = 5;template<class T>
class BlockQueue
{
private:bool IsEmpty(){return _block_queue.empty();}bool IsFull(){return _block_queue.size() == _max_cap;}public:BlockQueue(int max_cap = default_max_cap): _max_cap(max_cap){// 初始化 锁 + 条件变量pthread_mutex_init(&_mutex, nullptr);pthread_cond_init(&_c_cond, nullptr);pthread_cond_init(&_p_cond, nullptr);}// 生产者放数据void Push(T& in){// 只允许一个线程进入 放 数据pthread_mutex_lock(&_mutex); // 加锁while (IsFull()){// 阻塞等待 消费者消费pthread_cond_wait(&_p_cond, &_mutex);}_block_queue.push(in);pthread_cond_signal(&_c_cond); // 唤醒消费者线程,继续消费pthread_mutex_unlock(&_mutex); // 解锁}// 消费者拿数据void Pull(T& out){// 只允许一个线程进入 拿 数据pthread_mutex_lock(&_mutex); // 加锁while (IsEmpty()){// 阻塞等待 生产者生产pthread_cond_wait(&_c_cond, &_mutex);}out = _block_queue.front();_block_queue.pop();pthread_cond_signal(&_p_cond); // 唤醒生产者线程,继续生产pthread_mutex_unlock(&_mutex); // 解锁}~BlockQueue(){pthread_mutex_destroy(&_mutex);pthread_cond_destroy(&_c_cond);pthread_cond_destroy(&_p_cond);}private:std::queue<T> _block_queue; // 用queue模拟int _max_cap; // 最大容量pthread_mutex_t _mutex; // 互斥锁pthread_cond_t _c_cond; // 消费者的条件变量pthread_cond_t _p_cond; // 生产者的条件变量
};
因为我们本质还是要使用STL中的queue容器,但是我们还需要使用条件变量和互斥锁来实现生产消费的互斥与同步机制,因此我们可以将它们封装起来,统一初始化和销毁。
这里我们还需要注意一点,在条件变量下等待应当是在临界区等待,而使用条件变量接口pthread_cond_wait会传递一个互斥锁进来,而要解释也很好解释。
就是先把锁释放,然后让其他线程也解锁进来这个条件变量下排队等待,因为其他线程也都会拿锁 + 进来判断然后遇到条件变量 + 释放锁,这样就能保证所有线程在条件变量下排队等待,直到条件变量被释放了,如果是单独唤醒一个线程,那么这个线程还得去拿锁,然后再回来执行“等待条件变量”之后的代码。如果是全部线程被唤醒,那么众多线程之间还需要相互竞争去拿锁。
需要在条件变量下无非就两种情况:
- 阻塞队列里面为空,那么就得让消费者去等待,生产者赶紧生产
- 阻塞队列里面为满,那么就得让生产者去等待,消费者赶紧消费
当生产者消费完之后,记得要唤醒生产者。同理生产者在生产完后,还得换新消费者。
🔺同时还存在一个问题,假设你有两个消费者,一个生产者,并且你的阻塞队列最大容量就是1,你只能保证生产一个消费一个!!!
如果阻塞队列只有一个数据,然后你使用broadcast唤醒了两个消费者线程,然后他们去拿数据,那么有一个线程就会拿到锁然后去拿数据,此时的队列就为空的,然后另外一个线程在等前一个线程释放锁后,拿着锁进来查看,就会发现队列为空,但是他依然会去拿数据,这就造成了bug。这就是为什么我们的代码不用if而是while
test.cc:
#include "BlockQueue.hpp"void* Consumer(void* args)
{BlockQueue<int> *bq = (BlockQueue<int>*) args;while(true){int data = 0;bq->Pull(data);std::cout << "Consumer [pull]-> " << data << std::endl;sleep(1);}
}void *Productor(void *args)
{BlockQueue<int> *bq = (BlockQueue<int> *)args;while (true){int data = rand();bq->Push(data);std::cout << "Productor [push]-> " << data << std::endl;sleep(1);}
}void test01()
{BlockQueue<int> *bq = new BlockQueue<int>;pthread_t c_tid, p_tid;pthread_create(&p_tid, nullptr, Productor, (void *)bq); // 创建 生产者线程sleep(2);pthread_create(&c_tid, nullptr, Consumer, (void *)bq); // 创建 消费者线程pthread_join(p_tid, nullptr);pthread_join(c_tid, nullptr);
}int main()
{srand(time(nullptr) ^ getpid());test01(); // 单生产单消费return 0;
}
这样就能保证生产者生产一个消费者消费一个。
当然我们不仅仅可以用int整形来当数据,也可以是一个一个的任务,比如函数来进行数据的传输和获取。
信号量:
当时我在讲解共享内存时,有简单聊过信号量这一概念,可以简单去回顾了解下《简单聊聊System V下的IPC + 内核是如何管理该IPC》。
信号量(Semaphore)是一种用于控制对共享资源的访问的同步机制,我们通过电影院的例子了解到了信号量其实就是一个计数器,而申请信号量也就是一种预定机制!
引入:
我们前面举的那么多例子,不管是测试条件变量还是基于阻塞队列的生产消费模型,我们都是保证,同一时刻只允许一个线程进入临界资源中,所以我们使用互斥锁的方式来实现。
这样做没问题,因为多个线程要访问的是临界资源的同一区域,那么就非常有必要使用互斥锁和条件变量来保证线程之间的互斥与同步。
但是我在想,是否存在一种情况:不同的线程可以访问临界资源的不同区域?
如果存在的话,我们是允许它们同时进行临界资源的访问,这样就大大提高了效率。
比如有10个元素的数组作为临界资源,那么我们创造10个线程,然后10个线程个字访问一块空间,那么我们不就可以提高效率了吗。因此,我们使用信号量来解决。
信号量:信号量的本质是一个计数器,通常用来表示临界资源中,资源数的多少。申请信号量实际上就是对临界资源的预定机制。信号量主要用于同步和互斥。
每个执行流在进入临界区之前都应该先申请信号量,申请成功就有了访问临界资源的权限,当访问完毕后就应该释放信号量。
- 信号量的PV操作:
-
P操作:我们将申请信号量称为P操作。申请信号量的本质就是申请获得临界资源中某块资源的访问权限,当申请成功时临界资源中资源的数目应该减一,因此P操作的本质就是让信号量减一。
-
V操作:我们将释放信号量称为V操作。释放信号量的本质就是归还临界资源中某块资源的访问权限,当释放成功时临界资源中资源的数目就应该加一,因此V操作的本质就是让信号量加一。
函数接口:
初始化信号量:
#include <semaphore.h>int sem_init(sem_t *sem, /* sem 表示需要初始化的信号量 */int pshared, /* 设置 sem 的共享方式,为 0 在线程间共享,非 0 在进程间共享 */unsigned int value); /* 信号量 (计数器) 的初始值 */
销毁信号量:
#include <semaphore.h>int sem_destroy(sem_t *sem); // sem 表示要销毁的信号量
申请信号量 (等待信号量):
申请信号量 (等待信号量) 就是 PV 操作中的 P 操作。
注:申请信号量的 P 操作应该在线程争锁之前进行,只有在确定有资源的情况下才让线程去争锁。
#include <semaphore.h>int sem_wait(sem_t *sem); // sem 表示线程需要申请的信号量
- 调用该函数时,如果申请信号量成功,则让信号量的值 - 1;
- 如果申请信号量失败,则将调用该函数的线程在 sem 信号量的等待队列处挂起等待.
释放信号量 (发布信号量):
释放信号量 (发布信号量) 就是 PV 操作中的 V 操作。
注:释放信号量的 V 操作应该在线程解锁之后进行。
#include <semaphore.h>int sem_post(sem_t *sem); // sem 表示线程需要释放的信号量
- 调用该函数时,如果释放信号量成功,则让信号量的值 + 1。
基于环形队列的生产消费模型
环形队列知识点呢,我曾经在学习数据结构的部分也将结果,其实本质我们是用一个数组来模拟实现的,之前我也有讲解过:好题分享(2023.11.19——2023.11.25)
那我们话说回来,我们不考虑其他的数据结构的策略,故意留一个空位置的那种,我们就算将来这个环形队列能被写满!
如何让多线程同时进行生产和消费?
针对这点,我们有几个前提需要讲解清楚:
- 不能让生产者把消费者套一个圈(不能让生产者超过消费者一圈)
- 不能让消费者超过生产者
- 只有在队列为空 或 为满时,生产者和消费者在同一位置。
因此在实现“同时”这个概念,我们需要保证上面这三点,如何保证呢?
通过信号量来保证
大部分情况其实是,当队列不为空,也不为满时。那么生产者和消费者可以访问队列不同的位置,以实现并发。这样就可以提高效率了。(这点正好验证了我上面多线程访问临界资源的想法)
既然信号量是个计数器,这个计数器又是专门记录临界资源的数目的,那我们就可以利用这个计数器来保证多线程访问临界资源的同步机制了
1、对于生产者,在意的是队列中的空间资源,只要有空间生产者就可以进行生产。空间资源定义成一个生产者需要的信号量(space_sem),在初始化时,它的初始值我们应该设置为环形队列的容量,因为刚开始时环形队列当中全是空间。
2、对于消费者,在意的是队列中的数据资源,只要有数据消费者就可以进行消费。数据资源定义成一个消费者需要的信号量(data_sem),在初始化时,它的初始值我们应该设置为0,因为刚开始时环形队列当中没有数据。
当生产者线程生产数据时,它需要先申请信号量,即对space_sem进行P操作,然后生产数据,放入到队列中。生产完成后,这时,队列中就多出了一个数据资源,需要对data_sem进行V操作。
当消费者线程消费数据时,它也需要先申请信号量,即对data_sem进行P操作,然后消费数据。消费完成后,队列中就会多出一个空间资源,需要对space_sem进行V操作。
代码演示:
RingQueue.hpp:
#pragma once #include <iostream>
#include <string>
#include <vector>
#include <ctime>
#include <unistd.h>
#include <pthread.h>
#include <semaphore.h>const static int g_max_cap = 5;template <class T>
class RingQueue
{
private:void P(sem_t &s){sem_wait(&s);}void V(sem_t &s){sem_post(&s);}public:RingQueue(int max_cap = g_max_cap): _ring_queue(max_cap), _max_cap(max_cap), _p_step(0), _c_step(0){pthread_mutex_init(&_p_mutex, nullptr);pthread_mutex_init(&_c_mutex, nullptr);sem_init(&_data_sem, 0, 0);sem_init(&_space_sem, 0, max_cap);}void Push(T& in){P(_space_sem);pthread_mutex_lock(&_p_mutex);_ring_queue[_p_step++] = in;_p_step %= _max_cap;pthread_mutex_unlock(&_p_mutex);V(_data_sem);}void Pull(T& out){P(_data_sem);pthread_mutex_lock(&_c_mutex);out = _ring_queue[_c_step++];_c_step %= _max_cap;pthread_mutex_unlock(&_c_mutex);V(_space_sem);}~RingQueue(){sem_destroy(&_data_sem);sem_destroy(&_space_sem);pthread_mutex_destroy(&_c_mutex);pthread_mutex_destroy(&_p_mutex);}private:std::vector<T> _ring_queue;int _max_cap;pthread_mutex_t _p_mutex;pthread_mutex_t _c_mutex;int _p_step;int _c_step;sem_t _data_sem; // 消费者关心sem_t _space_sem; // 生产者关心
};
test.cc:
#include "RingQueue.hpp"void* Productor(void* args)
{RingQueue<int> *rq = (RingQueue<int> *)args;while(true){int data = rand() % 11;rq->Push(data);std::cout << "Productor push -> " << data << std::endl;sleep(1);}
}void* Consumer(void* args)
{RingQueue<int> *rq = (RingQueue<int> *)args;while (true){int data = 0;rq->Pull(data);std::cout << "Consumer Pull -> " << data << std::endl;sleep(1);}
}void test01()
{RingQueue<int> *rq = new RingQueue<int>;pthread_t c, p;pthread_create(&p, nullptr, Productor, (void *)rq);pthread_create(&c, nullptr, Consumer, (void *)rq);pthread_join(c, nullptr);pthread_join(p, nullptr);
}int main()
{srand(time(nullptr) ^ getpid());test01(); // 单生产单消费return 0;
}