欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 科技 > 能源 > kafka 消费者线程安全问题详细探讨

kafka 消费者线程安全问题详细探讨

2025/3/12 22:08:28 来源:https://blog.csdn.net/happycao123/article/details/142467217  浏览:    关键词:kafka 消费者线程安全问题详细探讨

内容概要

图片

主要内容

常见错误案例

下面这段代码大概逻辑

  • 初始化时 实例化KafkaConsumer, 开启线程拉取消息并且处理

  • 资源释放回调 停止线程、调用kafkaConsumer.close进行资源释放

表面上没有问题,但实际上可能出现线程安全问题,因为poll 和 close 两个操作可能同时执行,因此存在线程安全问题, 如何修改,读者自己思考下。

    @PostConstructpublic void consumer(){kafkaConsumer = new KafkaConsumer(getConfig());kafkaConsumer.subscribe(Arrays.asList("test_partition_num"));new Thread(new Runnable() {@Overridepublic void run() {while(running){ConsumerRecords<String,String> records = kafkaConsumer.poll(Duration.ofMillis(1000));records.forEach(record->{System.out.println(" partition =" + record.partition()  +" offset  = " + record.offset() + " value = " + record.value());});}}}).start();}@PreDestroypublic void close(){running = false;if(kafkaConsumer != null){kafkaConsumer.close();}}

消费者非线程安全代码解读

kafka生成者是线程安全的,但消费者是非线程安全的。KafkaConsumer

  • 相关操作前

    • 调用acquire()方法,校验线程安全问题,如果发现其他线程也在操作,则直接抛出异常。

  • 操作完成后

    • 调用release()清除痕迹

acquire()相对于加锁,release()相当于释放锁。

参看poll 方法实现,一目了然。

    private void acquire() {long threadId = Thread.currentThread().getId();if (threadId != this.currentThread.get() && !this.currentThread.compareAndSet(-1L, threadId)) {throw new ConcurrentModificationException("KafkaConsumer is not safe for multi-threaded access");} else {this.refcount.incrementAndGet();}}private void release() {if (this.refcount.decrementAndGet() == 0) {this.currentThread.set(-1L);}}

图片

poll源码

如何实现消费者多线程消费消息呢

思路1

每次实例化一个 KafkaConsumer

这种方式实现简单,但每次都需要建立TCP 链接


思路2

相关操作方法 加上  synchronized,获取使用Lock 加锁保证线程安全

这种方式性能较差

思路3

拉取消息使用一个线程, 消息处理使用多线程

因为通常拉取消息比较快,消息处理比较耗时,由于消息处理不涉及KafkaConsumer 相关API 操作,因此不存在线程安全问题。这种方式建议消息位移设置自动提交,否则编程复杂度较高。

示例代码

ConsumerRecords<String,String> records = kafkaConsumer.poll(Duration.ofMillis(1000));executorService.execute(()->{//处理消息records.forEach(record->{System.out.println(" partition =" + record.partition()  +" offset  = " + record.offset() + " value = " + record.value());});
});

旁敲侧击 举一反三

面试题回顾 Dubbo 线程模型

通常我们线程分为两类

  • IO 线程:负责网络通信的读写操作,接收和发送请求与响应。

  • 业务线程:处理具体的业务逻辑,避免因业务处理耗时过长而阻塞 IO 线程。


Dubbo 线程模型有几种你还记得否?该如何选择?

  • AllDispatcher:所有消息都派发到线程池,包括请求、响应、连接事件、断开事件、心跳等。

  • DirectDispatcher:所有消息都不派发到线程池,全部在 IO 线程上直接执行。

  • MessageOnlyDispatcher:只有请求和响应消息派发到线程池,其它连接断开、心跳等消息直接在 IO 线程上执行。

  • ExecutionDispatcher:只把请求消息派发到线程池,响应和其它连接、断开、心跳等消息直接在 IO 线程上执行。

其实选择的依据 业务处理的快慢,如果业务处理很快则建议让业务处理逻辑放到 IO线程中执行,这样避免线程上下文切换影响性能。反之则处理逻辑需要放到具体的业务线程中执行。

一般来说业务执行需要查询数据库,绝大数场景建议使用默认的 AllDispatcher 

是不是又和我一起温故知新了,加油吧 少年 !!!

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com

热搜词