欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 文旅 > 美景 > 【redis】发布订阅

【redis】发布订阅

2025/3/15 7:48:57 来源:https://blog.csdn.net/u022812849/article/details/146238099  浏览:    关键词:【redis】发布订阅

Redis的发布订阅(Pub/Sub)是一种基于消息多播的通信机制,它允许消息的**发布者(Publisher)向特定频道发送消息,而订阅者(Subscriber)**通过订阅频道或模式来接收消息。

其核心特点如下:

  1. 轻量级:无需额外组件,直接通过Redis服务实现

  2. 实时性:消息即时推送,无轮询延迟

  3. 广播模式:一个消息可被多个订阅者同时接收

  4. 无状态性:不存储历史消息,订阅者只能接收订阅后的消息

发布订阅命令的使用

有关发布订阅的命令可以通过help @pubsub命令来查看。有关命令的使用可以通过help 命令来查看,例如help publish

基础命令速查表

命令作用示例
SUBSCRIBE订阅一个或多个频道SUBSCRIBE news sports
PSUBSCRIBE使用模式匹配订阅频道PSUBSCRIBE sensor.*
PUBLISH向指定频道发送消息PUBLISH news "Hello"
UNSUBSCRIBE退订指定频道UNSUBSCRIBE news
PUNSUBSCRIBE退订模式订阅PUNSUBSCRIBE sensor.*
PUBSUB CHANNELS查看活跃频道列表PUBSUB CHANNELS "sensor.*"

操作示例

# 订阅者A(终端1)
127.0.0.1:6379> subscribe notifications
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "notifications"
3) (integer) 1# 订阅者B(终端2) 
127.0.0.1:6379> psubscribe system.*
Reading messages... (press Ctrl-C to quit)
1) "psubscribe"
2) "system.*"
3) (integer) 1# 发布消息(终端3)
127.0.0.1:6379> publish notifications "Service will be upgraded soon"
(integer) 1127.0.0.1:6379> publish system.alert "CPU usage exceeds 90%"
(integer) 1# 订阅者A收到:
1) "message"
2) "notifications"
3) "Service will be upgraded soon"# 订阅者B收到: 
1) "pmessage"
2) "system.*"
3) "system.alert"
4) "CPU usage exceeds 90%"

发布订阅的使用场景与优缺点

适用场景

  1. 实时通知系统:用户在线状态更新,即时聊天消息推送

  2. 事件驱动架构:缓存失效广播,分布式配置更新

  3. 轻量级监控:服务器状态报警,业务指标异常通知

优点

  • 极低延迟(平均<1ms)

  • 支持百万级TPS消息吞吐

  • 模式匹配订阅实现灵活路由

  • 零外部依赖(仅需Redis服务)

缺点

消息不可靠性:不保证送达,离线订阅者会丢失消息

无持久化机制:重启后所有订阅关系丢失

客户端阻塞:订阅操作会占用连接线程(需异步处理)

替代方案建议:需要可靠消息时,使用Redis Streams(支持消息持久化、消费者组)或RabbitMQ/Kafka

在Java中使用RedisTemplate实现

配置RedisTemplate

package com.morris.redis.demo.pubsub;import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.GenericJackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;/*** 对redis的键值进行序列化*/
@Configuration
public class RedisConfig {@Beanpublic RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) {RedisTemplate<String, Object> template = new RedisTemplate<>();template.setConnectionFactory(factory);// 使用 String 序列化 keytemplate.setKeySerializer(new StringRedisSerializer());// 使用 JSON 序列化 value(需要额外依赖 jackson)template.setValueSerializer(new GenericJackson2JsonRedisSerializer());// 对于 Hash 结构同理template.setHashKeySerializer(new StringRedisSerializer());template.setHashValueSerializer(new GenericJackson2JsonRedisSerializer());return template;}
}

实现消息发布者

package com.morris.redis.demo.pubsub;import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;import javax.annotation.Resource;/*** 消息发布者*/
@Service
public class MessagePublisher {@Resourceprivate RedisTemplate<String, Object> redisTemplate;public void sendNotification(String channel, String message) {redisTemplate.convertAndSend(channel, message);}
}

实现消息订阅者

package com.morris.redis.demo.pubsub;import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;import javax.annotation.Resource;/*** 消息订阅者*/
@Component
public class MessageSubscriber implements MessageListener {@Resourceprivate RedisTemplate redisTemplate;@Overridepublic void onMessage(Message message, byte[] pattern) {String channel = new String(message.getChannel());String body = (String) redisTemplate.getValueSerializer().deserialize(message.getBody());System.out.printf("收到频道[%s]的消息: %s\n", channel, body);}
}

配置订阅监听

package com.morris.redis.demo.pubsub;import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.listener.ChannelTopic;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;/*** 配置redis消息订阅监听器*/
@Configuration
@Slf4j
public class RedisPubSubConfig {@Beanpublic RedisMessageListenerContainer redisMessageListenerContainer(RedisConnectionFactory factory, MessageSubscriber messageSubscriber) {RedisMessageListenerContainer container = new RedisMessageListenerContainer();container.setConnectionFactory(factory);// 订阅具体频道container.addMessageListener(messageSubscriber, new ChannelTopic("notifications"));// 订阅模式匹配container.addMessageListener(messageSubscriber, new PatternTopic("system.*"));// 异常处理container.setErrorHandler((e) -> {log.error("[listen message] error ", e);});return container;}
}

使用示例

package com.morris.redis.demo.pubsub;import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;import javax.annotation.Resource;/*** 使用接口发布消息*/
@RestController
@RequestMapping("/pubsub")
public class PubSubDemoController {@Resourceprivate MessagePublisher publisher;// 发布告警@GetMapping("/alert")public String sendAlert(@RequestParam String message) {publisher.sendNotification("system.alert", message);return "警报已发送";}// 发布通知@GetMapping("/notify")public String sendNotify(@RequestParam String message) {publisher.sendNotification("notifications", message);return "通知已发送";}
}

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com

热搜词