欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 文旅 > 文化 > 基于发布-订阅模型的音视频流分发框架

基于发布-订阅模型的音视频流分发框架

2024/10/24 9:24:11 来源:https://blog.csdn.net/qq_42161913/article/details/141868571  浏览:    关键词:基于发布-订阅模型的音视频流分发框架

有时需要同时网络推流和把流封装为某格式,或做一些其它操作。这就需要一个分发流的机制,把同一路流分发给多个使用者去操作,下面实现了一个简易的线程安全的音视频流分发框架。代码如下:

avStreamHub.h

#ifndef STREAMHUB_H
#define STREAMHUB_H#include <stddef.h>
#include <pthread.h>typedef void (*AVStreamCallback)(void *data, size_t size, void *arg);typedef struct Subscriber
{AVStreamCallback callback;void *arg;
} Subscriber;typedef struct AVStreamHub
{Subscriber *subscribers; // 存储订阅者的数组size_t size;             // 当前订阅者数量size_t capacity;         // 动态数组容量pthread_mutex_t lock;    // 互斥锁
} AVStreamHub;/*** @brief 初始化AVStreamHub* @param hub 指向AVStreamHub结构体的指针* @param initial_capacity 初始分配的订阅者容量* @return 0 成功,-1 失败*/
int avStreamHub_init(AVStreamHub *hub, size_t initial_capacity);/*** @brief 添加订阅者* @param hub 指向AVStreamHub结构体的指针* @param callback 订阅者的回调函数* @param arg 回调携带的参数* @return 0 成功,< 0 失败*/
int avStreamHub_addSub(AVStreamHub *hub, AVStreamCallback callback, void *arg);/*** @brief 删除订阅者* @param hub 指向AVStreamHub结构体的指针* @param callback 要移除的订阅者的回调函数* @return 无*/
void avStreamHub_removeSub(AVStreamHub *hub, AVStreamCallback callback);/*** @brief 将数据流分发给所有已注册的订阅者* @param hub 指向AVStreamHub结构体的指针* @param data 指向要分发的数据的指针* @param size 数据的大小* @return 无*/
void avStreamHub_publish(AVStreamHub *hub, void *data, size_t size);/*** @brief 销毁AVStreamHub* @param hub 指向AVStreamHub结构体的指针* @return 无*/
void avStreamHub_destroy(AVStreamHub *hub);#endif // STREAMHUB_H

avStreamHub.c

#include "avStreamHub.h"
#include <stdlib.h>
#include <string.h>#define INITIAL_CAPACITY 2int avStreamHub_init(AVStreamHub *hub, size_t initial_capacity)
{hub->size = 0;hub->capacity = 0;if (initial_capacity == 0){initial_capacity = INITIAL_CAPACITY;}hub->subscribers = (Subscriber *)malloc(initial_capacity * sizeof(Subscriber));if (!hub->subscribers){return -1;}memset(hub->subscribers, 0, initial_capacity * sizeof(Subscriber));hub->capacity = initial_capacity;pthread_mutex_init(&hub->lock, NULL);return 0;
}// 添加订阅者
int avStreamHub_addSub(AVStreamHub *hub, AVStreamCallback callback, void *arg)
{pthread_mutex_lock(&hub->lock);// 检查该回调是否已存在for (size_t i = 0; i < hub->size; i++){if (hub->subscribers[i].callback == callback){pthread_mutex_unlock(&hub->lock);return -1; // 回调已存在}}// 如果容量不足则扩展容量if (hub->size == hub->capacity){hub->subscribers = (Subscriber *)realloc(hub->subscribers,hub->capacity * 2 * sizeof(Subscriber));if (!hub->subscribers){pthread_mutex_unlock(&hub->lock);return -2;}hub->capacity *= 2;}// 添加新订阅者hub->subscribers[hub->size].callback = callback;hub->subscribers[hub->size].arg = arg;hub->size++;pthread_mutex_unlock(&hub->lock);return 0;
}// 删除订阅者
void avStreamHub_removeSub(AVStreamHub *hub, AVStreamCallback callback)
{pthread_mutex_lock(&hub->lock);for (size_t i = 0; i < hub->size; i++){if (hub->subscribers[i].callback == callback){size_t j = i;for (; j < hub->size - 1; j++){hub->subscribers[j] = hub->subscribers[j + 1];}hub->subscribers[j].callback = NULL;hub->subscribers[j].arg = NULL;hub->size--;break;}}pthread_mutex_unlock(&hub->lock);
}// 分发数据
void avStreamHub_publish(AVStreamHub *hub, void *data, size_t size)
{pthread_mutex_lock(&hub->lock);for (size_t i = 0; i < hub->size; i++){hub->subscribers[i].callback(data, size, hub->subscribers[i].arg);}pthread_mutex_unlock(&hub->lock);
}// 销毁AVStreamHub
void avStreamHub_destroy(AVStreamHub *hub)
{free(hub->subscribers);hub->subscribers = NULL;hub->size = 0;hub->capacity = 0;pthread_mutex_destroy(&hub->lock);
}

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com