欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 财经 > 产业 > Linux之线程同步和生产者消费者模型

Linux之线程同步和生产者消费者模型

2025/4/3 10:23:14 来源:https://blog.csdn.net/ZZY5707/article/details/143251695  浏览:    关键词:Linux之线程同步和生产者消费者模型

引入线程同步 

先举个生活的例子, 现在假设有一个自习室, 一次只能进一个人, 现在我第一个拿到钥匙(申请锁)并进入自习室(临界区)自习, 此时如果我中途上厕所(线程切换), 由于锁还在我手上(锁保存在进程上下文中), 其它正在等待自习室位置的人是拿不到钥匙的. 这是线程互斥的概念.

 现在, 假设我自习完成了(临界区访问完毕), 但是转念一想我下次来自习室又要排很长的队, 于是在放回钥匙的瞬间, 我又拿起钥匙进入了自习室, 但是自习1min后我什么都没干又出来了, 又去拿起钥匙, 以此往复长时间占用自习室却什么都没干, 但是我并不违反自习室的使用规则. 

上面的情况就出现了线程的饥饿问题: 多线程运行, 同一份资源其它线程长时间无法拥有, 会出现线程的饥饿问题

解决饥饿问题: 在临界资源安全使用的前提下(互斥), 让多线程执行具备一定的顺序性(同步).

互斥能保证资源的安全, 同步能够较为高效地使用资源 

于是自习室管理者发现这个漏洞后, 重新修改规则:

1. 从自习室出来, 归还钥匙过后,不能再立即申请, 而应该在外面排队

2. 外面的人申请失败的, 也要排队

介绍完生产消费模型, 再来用代码(条件变量)解决饥饿问题. 


生产者消费者模型介绍

现在假设有一家只买泡面的超市, 超市只有一个展架. 超市有多个品牌的泡面供应商(生产者), 也有不同口味偏好的消费者:

 生产者消费者模式本质是: 用来进行执行流之间的数据传递(通信)的  

生产者消费者模型有3种关系, 2个角色, 1个交易场所, 如图所示, 我们清楚地看到:

  • 2个角色分别是 生产者(1或n) 消费者(1或n) --> 线程或进程 ;
  • 1个交易场所超市-->内存空间

下面介绍3种关系:

生产者与消费者的关系 

1. 生产者之间的关系是互斥的, 因为一个生产者在一个位置生产商品, 其它生产者就不能在同一个位置生产了.

2. 消费者之间的关系也是互斥的, 因为此时是一个货架, 可能你拿了我就拿不到了.

3. 生产者与消费者之间的关系是互斥和同步的

互斥: 消费者必须等生产者把商品放到货架上才能拿. 消费者拿商品的时候生产者也不能放.

同步: 必须按照生产者生产后, 消费者才能进行消费的顺序调度. 因为不能让消费者频繁对着空数据去访问, 白白浪费锁等系统资源.

为什么用生产者消费者模式?(好处)

生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。这个阻塞队列就是用来给生产者和消费者解耦的。

解释: 

单进程代码中, 代码的执行是串行的, 如果有一个函数调用运行速度很慢, 那么只能等待这个函数执行完程序才能继续向下执行.

生产者消费者模型就可以进行多执行流之间的解耦, 支持"忙闲不均". 

此时内存空间相当于一块缓存, 生产者生产得很快, 可以一直向内存中放入数据(除非放满缓冲区, 否则线程不需要阻塞). 而消费者执行得很慢, 也没关系, 只要缓存里有数据消费者就往缓存中取数据即可, 实现了生产者与消费者的并发执行, 提高了处理数据的效率.


条件变量

什么是条件变量?

还记得之前自习室里提到的饥饿问题吗? 由于我的竞争能力很强, 可以反复地去抢占自习室却不做什么事情, 管理者修改了相应的规则.

生产者消费者模型中也是存在这样的饥饿问题的, 具体体现在3个关系中的: "生产者与消费者之间的关系是互斥和同步的",  其中同步的原因就是因为假如生产者还来不及向内存空间中放入数据, 而消费者频繁地去访问空的公共资源, 由于线程互斥对临街资源加锁, 导致生产者无法进行生产, 白白浪费了公共资源.

两个例子实际的做法应该是:

1. (具有一定的顺序)针对自习室竞争能力强的"我"(消费者), 设置规则:

  • 一个人短时间内只允许使用一次自习室, 从自习室出来, 归还钥匙过后, 不能再立即申请, 而应该在外面排队
  • 外面的人申请失败的, 也要排队, 申请失败也是一种使用

2. (不做无效的锁申请)消费者线程发现共享容器为空时, 就不应当去竞争锁访问资源, 而是阻塞等待, 直到生产者线程将数据生成到容器中.

总结:

多线程互斥访问临界资源时, 为了让线程按一定顺序(同步)访问, 通常会将线程放到条件变量的阻塞队列中, 当其他线程满足唤醒条件(比如生产者生产出资源), 就唤醒该条件变量阻塞队列中的一个/或多个线程去访问临界资源.

条件变量的接口

 先写一段不加锁也没有条件变量的多线程代码, 创建三个线程只打印自己的线程名称:

#include <iostream>
#include <string>
#include <pthread.h>
#include <unistd.h>void* ThreadRoutine(void* arg)
{std::string name = static_cast<const char*>(arg);while(true){std::cout << "I am a new thread : " << name << std::endl;sleep(1);}
}int main()
{pthread_t t1, t2, t3;pthread_create(&t1, nullptr, ThreadRoutine, (void*)"thread-1");pthread_create(&t2, nullptr, ThreadRoutine, (void*)"thread-2");pthread_create(&t3, nullptr, ThreadRoutine, (void*)"thread-3");while(true){sleep(1);}pthread_join(t1, nullptr);pthread_join(t2, nullptr);pthread_join(t3, nullptr);return 0;
}

1.  打印出来的结果是乱序, 因为没有设置条件变量进行同步

2. 输出结果间互相干扰, 因为当前所有线程都使用 cout 向显示器打印, 说明所有线程都是访问同一个设备文件, 所以显示器也是共享资源, 所以可以对使用显示器的临界区加锁(但是实际打印其实并不重要)

void* ThreadRoutine(void* arg)
{std::string name = static_cast<const char*>(arg);while(true){pthread_mutex_lock(&Mutex);std::cout << "I am a new thread : " << name << std::endl;pthread_mutex_unlock(&Mutex);sleep(1);}
}

乱序问题还没有解决, 先来认识条件变量的接口:

一. 初始化条件变量

条件变量同样是一个类(pthread_cond_t),由POSIX线程库维护, 使用的是POSIX标准. 它也可以构造对象 pthread_cond_t cond, cond就是条件变量的对象

条件变量的初始化和之前使用的互斥锁十分类似, 实际上它们本身也是强关联的:

1. 局部条件变量

  • 创建局部条件变量必须使用此函数: 

int pthread_cond_init(pthread_cond_t *restrict cond, const pthread_condattr_t *restrict attr);

头文件:pthread.h

功能:初始化条件变量

参数:pthread_cond_t *restrict cond表示需要被初始化的条件变量的地址, const pthread_condattr_t *restrict attr表示条件变量的属性, 一般都为nullptr.

返回值:取消成功返回0, 取消失败返回错误码。

  • 配套销毁条件变量:

 int pthread_cond_destroy(pthread_cond_t *cond); 

头文件:pthread.h

功能:销毁互斥条件变量

参数:pthread_cond_t *cond 表示需要被销毁的条件变量的地址

返回值:销毁成功返回0, 失败返回错误码

2. 创建全局条件变量, 直接声明即可:

pthread_cond_t cond = PTHREAD_COND_INITIALIZER;

二. 创建好了条件变量, 需要将线程加入到该条件变量的等待队列

int pthread_cond_wait(pthread_cond_t *restrict cond, pthread_mutex_t *restrict mutex);

头文件:pthread.h

功能:将调用该接口的线程放入传入的条件变量等待队列中

参数:pthread_cond_t *restrict cond 是条件变量地址, pthread_mutex_t *restrict mutex互斥锁的地址(为什么传锁以后会解释)

返回值:放入等待队列成功返回0, 失败返回错误码.

三. 唤醒条件变量等待队列中的线程

int pthread_cond_signal(pthread_cond_t *cond);

头文件: pthread.h

功能:由另一个线程(通常是主线程)唤醒指定条件变量等待队列中的一个线程; 如果线程没有在等待该条件变量的, 信号会被忽略

参数:pthread_cond_t *cond表示需要唤醒的线程所在的等待队列的条件变量地址

返回值:唤醒成功返回0, 失败返回错误码

int pthread_cond_broadcast(pthread_cond_t *cond);

头文件:pthread.h

功能:由另一个线程(通常是主线程)唤醒指定条件变量等待队列中的所有线程

参数:pthread_cond_t *cond 表示需要唤醒的线程所在的等待队列的条件变量地址

返回值:唤醒成功返回0,失败返回错误码

代码修改:

#include <iostream>
#include <string>
#include <pthread.h>
#include <unistd.h>pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t cond PTHREAD_COND_INITIALIZER;void* ThreadRoutine(void* arg)
{std::string name = static_cast<const char*>(arg);while(true){pthread_mutex_lock(&mutex);pthread_cond_wait(&cond, &mutex);//条件变量std::cout << "I am a new thread : " << name << std::endl;pthread_mutex_unlock(&mutex);sleep(1);}
}int main()
{pthread_t t1, t2, t3;pthread_create(&t1, nullptr, ThreadRoutine, (void*)"thread-1");pthread_create(&t2, nullptr, ThreadRoutine, (void*)"thread-2");pthread_create(&t3, nullptr, ThreadRoutine, (void*)"thread-3");while(true){pthread_cond_signal(&cond);//唤醒线程sleep(1);}pthread_join(t1, nullptr);pthread_join(t2, nullptr);pthread_join(t3, nullptr);return 0;
}

在加锁访问临界资源前, 设置条件变量, 让线程全都阻塞在条件变量下:

主线程每隔一秒唤醒一个线程: 

运行结果, 每隔一秒有一个线程被唤醒, 执行打印工作: 

这个例子更应该关注"线程在条件变量下阻塞, 等待唤醒", 线程按照1 2 3的顺序执行是不确定的, 先创建的线程也未必先入队列阻塞.

 pthread_cond_wait(&cond, &mutex);为什么需要互斥锁?

//1. 线程在进行等待的时候, 会自动的释放锁

            //2. 线程是在临界区被唤醒的,  所以当线程在pthread_cond_wait返回的时候重新申请锁

            //3. 当线程被唤醒的时候, 重新申请锁本质也是要参与锁的竞争的


 基于BlockingQueue的生产者消费者模型

先来实现一个基本的单生产者单消费者模型

1. 代码的主逻辑是创建两个线程, 分别充当生产者线程消费者线程, 所有线程要看到同一份公共资源, 所有要把阻塞队列作为参数传递.

2. 生产消费的过程其实就是向阻塞队列PushPop 数据:

main.cpp  

#include "BlockQueue.hpp"
#include "Task.hpp"
#include <iostream>
#include <unistd.h>using std::cout;
using std::endl;void* consumer(void* arg)
{BlockQueue<int>* pbq = static_cast<BlockQueue<int>*>(arg);while(true){//sleep(1);//消费int data = 0;pbq->Pop(&data);std::cout << "consumer data: " << data << endl;//消费者不断消耗数据}
}void* producer(void* arg)
{BlockQueue<int>* pbq = static_cast<BlockQueue<int>*>(arg);while(true){//1. 拿到数据int data = rand() % 10;//深刻理解生产消费模型, 要从这里入手, TODO//2. 生产pbq->Push(data);std::cout << "producer data: " << data << std::endl;sleep(1);//生产者每隔一秒生产数据}
}int main()
{srand(time(nullptr) ^ getpid() ^ pthread_self());BlockQueue<int> bq;pthread_t con, pro;//生产者和消费者pthread_create(&con, nullptr, consumer, &bq);pthread_create(&pro, nullptr, producer, &bq);// 等待线程执行pthread_join(con, nullptr);pthread_join(pro, nullptr);return 0;
}

为了满足生产者消费者模型的3个关系, 重点在于阻塞队列中对于 临界资源的保护(互斥) 资源的访问顺序(同步).

1. 我们借助标准库容器的queue来实现, 由于会自动扩容, 所以自定义一个_cpacity 设定为队列长度

2. 需要锁和条件变量负责控制生产和消费的互斥和同步.

互斥很容易保证, 在Push和Pop访问临界资源(阻塞队列)时加锁即可.

同步:

  • 对于生产者, 操作是Push, 如果队列满就不要再生产数据, 在生产者的条件变量之下等待队列满足生产条件(可以是有空位就生产, 也可以自己设置"生产者水位线") 再生产.
  • 对于消费者, 操作是Pop, 如果队列空就不要再消费数据, 在消费者的条件变量之下等待队列满足消费条件(可以是有数据就生产, 也可以自己设置"消费者水位线") 再生产. 
  • 对于两者而言, 生产完和消费完数据之后, 记得去检查生产与消费条件, 唤醒对方的条件变量

BlockQueue.hpp

#include "LockGuard.hpp"
#include <pthread.h>
#include <queue>template<class T>
class BlockQueue
{
public:BlockQueue(int capaicty = 10):_capacity(capaicty),_consumer_waterline(_capacity/3 *2)//消费者消费了1/3才唤醒, _producer_waterline(_capacity/3)//生产了1/3才唤醒{pthread_mutex_init(&_mutex, nullptr);pthread_cond_init(&_p_cond, nullptr);pthread_cond_init(&_c_cond, nullptr);}~BlockQueue(){pthread_mutex_destroy(&_mutex);pthread_cond_destroy(&_p_cond);pthread_cond_destroy(&_c_cond);}void Pop(T* pdata){pthread_mutex_lock(&_mutex);if(isEmpty())//if有问题{pthread_cond_wait(&_c_cond, &_mutex);}*pdata = _q.front();_q.pop();//水位线控制if(_q.size() < _consumer_waterline)pthread_cond_signal(&_p_cond);pthread_mutex_unlock(&_mutex);}void Push(const T& data){pthread_mutex_lock(&_mutex);if(isFull()){pthread_cond_wait(&_p_cond, &_mutex);}_q.push(data);//水位线控制if(_q.size() > _producer_waterline)pthread_cond_signal(&_c_cond);pthread_mutex_unlock(&_mutex);}bool isEmpty() const{return _q.size() == 0;}bool isFull() const{return _q.size() == _capacity;}
private:std::queue<T> _q;pthread_mutex_t _mutex;pthread_cond_t _p_cond;//生产者条件变量pthread_cond_t _c_cond;//消费者条件变量int _consumer_waterline = _capacity/3 *2;//消费剩2/3才唤醒int _producer_waterline = _capacity/3;//生产1/3才唤醒int _capacity;
};

可以看到一共三个线程: 主线程, 生产者和消费者线程 :

上面完成了最基本的基于阻塞队列的单生产单消费模型, 但是有几个细节:

1. 进一步理解生产者消费者模型, 比如Push和Pop的过程中, 我们是使用 if 去判断 是否需要基于条件变量进行阻塞等待, 但是如果是多生产者消费者线程, 我们一次性唤醒多个在条件变量下等待的线程, 这些线程会去竞争锁资源, 竞争失败的线程会去等待锁, 结果就是所有被唤醒的线程都脱离了条件变量的控制, 转而去使用或等待锁资源, 等待锁的线程总会拿到锁, 于是会继续向下执行, 假如生产者生产数据很慢, 第一个抢到锁的消费者消费完数据, 生产者没来得及生产, 消费者就会对空的队列进行访问, 造成错误.

 这种情况叫作: 多线程下伪唤醒, 即对应的条件并不满足, 但是线程却被唤醒. 申请锁失败或函数本身执行失败. 为了解决, 我们要把 if 换为 while 判断: while(isEmpty())

2. 我们可以用之前封装的 Lockguard 去加锁:

以Pop为例: 

void Pop(T* pdata){LockGuard lg(&_mutex);//pthread_mutex_lock(&_mutex);while(isEmpty()){pthread_cond_wait(&_c_cond, &_mutex);//1. 进一步理解, 多线程下伪唤醒, 对应的条件并不满足, 但是线程却被唤醒. 申请锁失败或函数本身执行失败}*pdata = _q.front();_q.pop();//水位线控制if(_q.size() < _consumer_waterline)pthread_cond_signal(&_p_cond);//pthread_mutex_unlock(&_mutex);}

3. 生产者消费者模型里交换的不仅可以是基本数据, 也可以是类对象, 比如生产者可以给消费者发放"任务". 现在模拟一个任务, 生产者生产操作数, 分配给消费者任务: 运算得到结果.

#include <iostream>#define EPSILON 0.00001
enum CODE
{OK = 0,DIV_ZERO,MOD_ZERO,UNKNOWN
};class Task
{
public:Task(){}Task(double dataX, double dataY, char oper): _dataX(dataX), _dataY(dataY), _oper(oper), _result(0), _code(OK){}void Run(){switch(_oper){case '+':_result = _dataX + _dataY;break;case '-':_result = _dataX - _dataY;break;case '*':_result = _dataX * _dataY;break;case '/':{if(_dataY >= -EPSILON && _dataY <= EPSILON){_code = DIV_ZERO;break;}else{_result =  _dataX / _dataY;break;}}case '%':{if(_dataY >= -EPSILON && _dataY <= EPSILON){_code = MOD_ZERO;break;}else{_result =  (int)_dataX % (int)_dataY;break;}}default:_code = UNKNOWN;break;}}void PrintTask(){std::cout << _dataX << _oper << _dataY << "=?" << std::endl;}void PrintTaskResult(){std::cout << _dataX << _oper << _dataY << "=" << _result << "[" << _code << "]" <<std::endl;}
private:double _dataX; // 待处理数据1double _dataY; // 待处理数据2double _result;char _oper; // 运算符CODE _code; // 退出码
};

 修改了消费者和生产者的逻辑, 具体如注释所言:

#include "BlockQueue.hpp"
#include "Task.hpp"
#include <iostream>
#include <unistd.h>using std::cout;
using std::endl;
const char* opers = "+-*/%()&!";//故意设置一些错误任务void* consumer(void* arg)
{BlockQueue<Task>* pbq = static_cast<BlockQueue<Task>*>(arg);while(true){sleep(2);//1. 取数据Task t;pbq->Pop(&t);//2. 执行任务t.Run();//3. 打印任务完成结果std::cout << "consumer Task result: ";t.PrintTaskResult();}
}void* producer(void* arg)
{// BlockQueue<int>* pbq = static_cast<BlockQueue<int>*>(arg);BlockQueue<Task>* pbq = static_cast<BlockQueue<Task>*>(arg);while(true){//1. 拿数据int dataX = rand() % 10;usleep(100);int dataY = rand() % 10;usleep(100);char oper = opers[rand() % sizeof(opers)];Task t(dataX, dataY, oper);//2. 生产pbq->Push(t);//3. 打印任务std::cout << "producer Task: ";t.PrintTask();sleep(1);}
}int main()
{srand(time(nullptr) ^ getpid() ^ pthread_self());BlockQueue<Task> bq;pthread_t con, pro;//生产者和消费者pthread_create(&con, nullptr, consumer, &bq);pthread_create(&pro, nullptr, producer, &bq);// 等待线程执行pthread_join(con, nullptr);pthread_join(pro, nullptr);return 0;
}

 4. 生产者消费者模型为什么高效?

 其实生产者生产数据的过程和消费者消费数据的过程本身就是互斥的, 是串行执行的. 生产快消费满, 阻塞队列满生产者依然要等待消费者; 反之依然. 从我们对生产者消费者模型的定义来看, 这个过程本身就是互斥与同步的, 哪里高效?

高效体现在生产者拿数据消费者处理数据上. 拿数据和处理数据本身也有时间消耗, cp的高效并不体现在同步和互斥, 高效体现在: 生产消费者场景下, 可以使获取数据和处理数据的是并发的.

生产者消费者的工作不仅仅在于push和pop数据, 而在于拿数据和处理数据本身.

对于生产者来说, 我生成完一个任务, 不需要等待消费者给我结果, 我继续回去拿数据用于生成即可; 此时消费者可能在处理数据 也可能在取数据, 但是生产者都不关心. 所以生产者消费者模型的高效体现在获取数据和处理数据是并发的.

由此可以得到多生产多消费的意义: 多个生产者可以同时从外部获取数据, 多个消费者可以同时处理数据, 提高了获取数据和处理数据的并发度

5. 代码改为多生产者多消费者:

代码其实基本不需要修改, 因为代码本身就已经保证了PP, CC, PC之间都是互斥的, 只需要创建多个线程即可.

安排了2个消费者和一个生产者:

int main()
{srand(time(nullptr) ^ getpid() ^ pthread_self());BlockQueue<Task> bq;pthread_t con[2], pro[3];//生产者和消费者for(auto& c: con)pthread_create(&c, nullptr, consumer, &bq);for(auto& p: pro)pthread_create(&p, nullptr, producer, &bq);// 等待线程执行for(auto& c: con)pthread_join(c, nullptr);for(auto& p: pro)pthread_join(p, nullptr);return 0;
}

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com

热搜词