信号量
【C++并发编程】(八)信号量
QT中的信号量类是QSemaphore
,用法与C++标准中的std::counting_semaphore
类似。不同的是, QSemaphore
无法指定最大计数。为了限定最大计数,可以采用两个QSemaphore
信号量。下面使用一个生成者-消费者例子展示QSemaphore
的用法:
https://github.com/BinaryAI-1024/QtStudy/tree/master/thread/semaphore
// main.cpp
#include <QCoreApplication>
#include <QThread>
#include "producer.h"
#include "consumer.h"// 定义全局变量
const int BufferSize = 5;
QSemaphore emptySlots(BufferSize); // 管理缓冲区的空闲空间
QSemaphore fullSlots(0); // 管理缓冲区的已用空间
int buffer[BufferSize]; int main(int argc, char *argv[]) {QCoreApplication a(argc, argv);// 创建线程QThread producerThread, consumerThread;// 创建生产者和消费者对象Producer producer;Consumer consumer;// 将对象移动到线程producer.moveToThread(&producerThread);consumer.moveToThread(&consumerThread);// 连接线程启动信号QObject::connect(&producerThread, &QThread::started, &producer, &Producer::produce);QObject::connect(&consumerThread, &QThread::started, &consumer, &Consumer::consume);// 启动线程producerThread.start();consumerThread.start();return a.exec();
}
// consumer.h
#ifndef CONSUMER_H
#define CONSUMER_H#include <QObject>
#include <QSemaphore>// main.cpp中的全局变量
extern QSemaphore emptySlots;
extern QSemaphore fullSlots;
extern int buffer[];
extern const int BufferSize; // 添加 BufferSize 声明class Consumer : public QObject {Q_OBJECT
public:explicit Consumer(QObject *parent = nullptr);public slots:void consume();
};#endif // CONSUMER_H
//consumer.cpp
#include "consumer.h"
#include <QDebug>
#include <QThread>Consumer::Consumer(QObject *parent) : QObject(parent) {}void Consumer::consume() {for (int i = 0; i < 10; ++i) {fullSlots.acquire(); // 等待直到有可消费的数据,如果有fullSlots计数-1int data = buffer[i % BufferSize]; // 现在 BufferSize 可用qDebug() << "Consumed:" << data << "by thread" << QThread::currentThread();emptySlots.release(); // 释放空槽位emptySlots计数+1QThread::msleep(1000);}
}
//producer.h
#ifndef PRODUCER_H
#define PRODUCER_H#include <QObject>
#include <QSemaphore>// main.cpp中的全局变量
extern QSemaphore emptySlots;
extern QSemaphore fullSlots;
extern int buffer[];
extern const int BufferSize;class Producer : public QObject {Q_OBJECT
public:explicit Producer(QObject *parent = nullptr);public slots:void produce();
};#endif // PRODUCER_H
//producer.cpp
#include "producer.h"
#include <QDebug>
#include <QThread>Producer::Producer(QObject *parent) : QObject(parent) {}void Producer::produce() {for (int i = 0; i < 10; ++i) {emptySlots.acquire(); // 等待,直到缓冲区有空位,如果有emptySlots计数-1buffer[i % BufferSize] = i;qDebug() << "Produced:" << i << "by thread" << QThread::currentThread();fullSlots.release(); // 生产了一个数据:fullSlots计数+1QThread::msleep(500);}
}
emptySlots.acquire()
确保生产者生产的数据不会超过缓冲区大小。
fullSlots.acquire()
确保消费者不会读取空数据。
emptySlots.release()
和 fullSlots.release()
保持生产消费的平衡。
emptySlots
和fullSlots
的计数有一个加一就有一个减一,保证了emptySlots
计数+fullSlots
计数=BuffferSize
。这确保了生产者生产的数据不会超过缓冲区的大小。
结果:
Produced: 0 by thread QThread(0x77fe98)
Consumed: 0 by thread QThread(0x77fe90)
Produced: 1 by thread QThread(0x77fe98)
Produced: 2 by thread QThread(0x77fe98)
Consumed: 1 by thread QThread(0x77fe90)
Produced: 3 by thread QThread(0x77fe98)
Consumed: 2 by thread QThread(0x77fe90)
Produced: 4 by thread QThread(0x77fe98)
Produced: 5 by thread QThread(0x77fe98)
Consumed: 3 by thread QThread(0x77fe90)
Produced: 6 by thread QThread(0x77fe98)
Produced: 7 by thread QThread(0x77fe98)
Consumed: 4 by thread QThread(0x77fe90)
Produced: 8 by thread QThread(0x77fe98)
Produced: 9 by thread QThread(0x77fe98)
Consumed: 5 by thread QThread(0x77fe90)
Consumed: 6 by thread QThread(0x77fe90)
Consumed: 7 by thread QThread(0x77fe90)
Consumed: 8 by thread QThread(0x77fe90)
Consumed: 9 by thread QThread(0x77fe90)
程序中设置了消费者取走数据的速度比生产者慢,因此可以看到 buffer 被逐步填满,之后再被逐步消费的情况。