欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 文旅 > 游戏 > 使用redis中的bitmap实现kafka消息消费幂等

使用redis中的bitmap实现kafka消息消费幂等

2024/10/24 4:31:53 来源:https://blog.csdn.net/u013531166/article/details/139826978  浏览:    关键词:使用redis中的bitmap实现kafka消息消费幂等

前言

在分布式系统中,我们经常需要用到消息队列来进行异步操作,其中涉及到消息消费幂等,即只消费一次,不重复消费(这里吐槽一下腾讯游戏扣除信用分通知消息推送,我举报了对面一次,一直给我推送对面扣除信用分的通知,连续好几天一直推送,这就是腾讯游戏扣除信用分的消息被重复消费了),话不多说,上干货

介绍

Redis 位图(Bitmap)占用内存较少的原因主要是因为它是一种紧凑的数据结构,使用每个位(bit)来表示一个布尔值,而不是使用通常的字节(byte)或更大单位来表示状态信息。以下是一些详细解释:

位图的内存效率

紧凑存储:位图使用单个位(bit)来表示状态,每个 bit 只有两种状态:0 或 1。这意味着你可以在一个字节(byte,8 bits)中存储 8 个布尔值,而传统的布尔值存储方式通常会占用一个字节来存储一个布尔值。这就使得位图在表示大量布尔值时更加紧凑和高效。

连续存储:位图将所有的位紧凑地存储在连续的内存块中,这不仅减少了内存碎片,还提高了存取效率。

具体示例

假设我们有一个 Kafka 主题,每个分区有 100,000 条消息。如果我们使用常规布尔数组来记录每条消息的消费状态:

常规布尔数组:每个布尔值占用 1 字节。
位图:每个布尔值占用 1 位。
内存使用对比
常规布尔数组:100,000 条消息 * 1 字节/消息 = 100,000 字节 ≈ 97.66 KB
位图:100,000 条消息 * 1 位/消息 = 100,000 位 / 8 位/字节 = 12,500 字节 ≈ 12.21 KB
通过这个对比可以看出,位图相比常规的布尔数组在内存使用上更加高效

实际使用

代码

RedisBitmapComponent.java

@Component
public class RedisBitmapComponent {@ResourceRedisTemplate redisTemplate;private static final long EXPIRE_TIME_IN_MINUTES = 60; // 1 hourpublic boolean isProcessed(String topic, int partition, long offset) {String key = getBitmapKey(topic, partition);ValueOperations<String, Boolean> operations = redisTemplate.opsForValue();return operations.getBit(key, offset);}public void markAsProcessed(String topic, int partition, long offset) {String key = getBitmapKey(topic, partition);ValueOperations<String, Boolean> operations = redisTemplate.opsForValue();operations.setBit(key, offset, true);redisTemplate.expire(key, EXPIRE_TIME_IN_MINUTES, TimeUnit.MINUTES);}private static String getBitmapKey(String topic, int partition) {return topic + ":" + partition + ":bitmap";}

消费者 NoticeListener.java

@Slf4j
@Component
public class NoticeListener {@AutowiredRedisMessageDistributor redisMessageDistributor;@ResourceRedisBitmapComponent redisBitmapComponent;/*** @param record* @param ack* @author nash.li* @description kafka监听通知,通过sse推送账户通知* @date 2024-06-12 13:51:55**/@KafkaListener(topics = "${kafka.topic.account-notification}",groupId = "${kafka.group.id.default}")public void listenToNotice(ConsumerRecord<String, String> record, Acknowledgment ack) {try {log.info("-------->{}: {}", record.key(), record.value());String key = record.key();String topic = record.topic();int partition = record.partition();long offset = record.offset();//kafka消息幂等,判断消息是否已处理if (!redisBitmapComponent.isProcessed(topic, partition, offset)) {SseEmitter emitter = SseEmitterHolder.getSseEmitter(key);if (emitter != null) {MessageDO messageDO = new MessageDO();messageDO.setSessionKeys(Collections.singletonList(key));messageDO.setMessageText(record.value());redisMessageDistributor.distribute(messageDO);} else {log.info("key not found, skip");}redisBitmapComponent.markAsProcessed(topic, partition, offset);}ack.acknowledge();} catch (Exception ex) {log.error("kafka.topic.account-notification监听异常,异常信息:{}", ex.getLocalizedMessage());}}

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com