前言
在分布式系统中,我们经常需要用到消息队列来进行异步操作,其中涉及到消息消费幂等,即只消费一次,不重复消费(这里吐槽一下腾讯游戏扣除信用分通知消息推送,我举报了对面一次,一直给我推送对面扣除信用分的通知,连续好几天一直推送,这就是腾讯游戏扣除信用分的消息被重复消费了),话不多说,上干货
介绍
Redis 位图(Bitmap)占用内存较少的原因主要是因为它是一种紧凑的数据结构,使用每个位(bit)来表示一个布尔值,而不是使用通常的字节(byte)或更大单位来表示状态信息。以下是一些详细解释:
位图的内存效率
紧凑存储:位图使用单个位(bit)来表示状态,每个 bit 只有两种状态:0 或 1。这意味着你可以在一个字节(byte,8 bits)中存储 8 个布尔值,而传统的布尔值存储方式通常会占用一个字节来存储一个布尔值。这就使得位图在表示大量布尔值时更加紧凑和高效。
连续存储:位图将所有的位紧凑地存储在连续的内存块中,这不仅减少了内存碎片,还提高了存取效率。
具体示例
假设我们有一个 Kafka 主题,每个分区有 100,000 条消息。如果我们使用常规布尔数组来记录每条消息的消费状态:
常规布尔数组:每个布尔值占用 1 字节。
位图:每个布尔值占用 1 位。
内存使用对比
常规布尔数组:100,000 条消息 * 1 字节/消息 = 100,000 字节 ≈ 97.66 KB
位图:100,000 条消息 * 1 位/消息 = 100,000 位 / 8 位/字节 = 12,500 字节 ≈ 12.21 KB
通过这个对比可以看出,位图相比常规的布尔数组在内存使用上更加高效
实际使用
代码
RedisBitmapComponent.java
@Component
public class RedisBitmapComponent {@ResourceRedisTemplate redisTemplate;private static final long EXPIRE_TIME_IN_MINUTES = 60; // 1 hourpublic boolean isProcessed(String topic, int partition, long offset) {String key = getBitmapKey(topic, partition);ValueOperations<String, Boolean> operations = redisTemplate.opsForValue();return operations.getBit(key, offset);}public void markAsProcessed(String topic, int partition, long offset) {String key = getBitmapKey(topic, partition);ValueOperations<String, Boolean> operations = redisTemplate.opsForValue();operations.setBit(key, offset, true);redisTemplate.expire(key, EXPIRE_TIME_IN_MINUTES, TimeUnit.MINUTES);}private static String getBitmapKey(String topic, int partition) {return topic + ":" + partition + ":bitmap";}
消费者 NoticeListener.java
@Slf4j
@Component
public class NoticeListener {@AutowiredRedisMessageDistributor redisMessageDistributor;@ResourceRedisBitmapComponent redisBitmapComponent;/*** @param record* @param ack* @author nash.li* @description kafka监听通知,通过sse推送账户通知* @date 2024-06-12 13:51:55**/@KafkaListener(topics = "${kafka.topic.account-notification}",groupId = "${kafka.group.id.default}")public void listenToNotice(ConsumerRecord<String, String> record, Acknowledgment ack) {try {log.info("-------->{}: {}", record.key(), record.value());String key = record.key();String topic = record.topic();int partition = record.partition();long offset = record.offset();//kafka消息幂等,判断消息是否已处理if (!redisBitmapComponent.isProcessed(topic, partition, offset)) {SseEmitter emitter = SseEmitterHolder.getSseEmitter(key);if (emitter != null) {MessageDO messageDO = new MessageDO();messageDO.setSessionKeys(Collections.singletonList(key));messageDO.setMessageText(record.value());redisMessageDistributor.distribute(messageDO);} else {log.info("key not found, skip");}redisBitmapComponent.markAsProcessed(topic, partition, offset);}ack.acknowledge();} catch (Exception ex) {log.error("kafka.topic.account-notification监听异常,异常信息:{}", ex.getLocalizedMessage());}}