欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 财经 > 创投人物 > SpringBoot整合Kafka

SpringBoot整合Kafka

2025/3/12 15:33:51 来源:https://blog.csdn.net/weixin_46032216/article/details/146114009  浏览:    关键词:SpringBoot整合Kafka

SpringBoot整合Kafka

Kafka是由Apache软件基金会开发的一个开源流处理平台,由Scala和Java编写。Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者在网站中的所有动作流数据。

Windows版本下载地址:https://kafka.apache.org/downloads

启动服务器

Kafka服务器的功能相当于RocketMQ中broker,kafka运行还需要一个类似于命名服务器的服务。在kafka安装目录中自带一个类似于命名服务器的工具,叫做zookeeper,他的作用是注册中心,相关知识请自行百度zookeeper,后续我也会补充zookeeper学习心得。

zookeeper-server-start.bat ..\..\config\zookeeper.properties   #启动zookeeper
kafka-server-start.bat ..\..\config\server.properties          #启动Kafka

运行bin目录下的windows目录下的zookeeper-server-start命令即可启动注册中心,默认对外服务端口:2181。

运行bin目录下的windows目录下的kafka-server-start命令即可启动kafka服务器,默认对外服务端口:9092。

创建主题

和之前操作其他MQ产品相似,Kafka也是基于主题操作,操作之前需要先初始化topic。

#创建topic
kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
#查询topic
kafka-topics.bat --zookeeper 127.0.0.1:2181 --list
#删除topic
kafka-topic.bat --delete --zookeeper localhost:2181 --topic test
测试服务器启动状态

Kafka提供有一套测试服务器功能的测试程序,运行bin目录下的windows目录下的命令即可使用。

kafka-console-producer.bat --broker-list localhost:9092 --topic test    #测试生产消费
kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test --from-beginning   #测试消息消费
Spring Boot整合Kafka
1、引入spring boot整合Kafka的依赖
<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId>
</dependency>
2、配置Kafka的服务器地址
spring:kafka:bootstrap-servers: localhost:9092   #配置服务器地址consumer:group-id: order                 #配置消费组
3、使用KafkaTemplate操作Kafka
@Service
public class KafkaMessageServiceImpl implements MessageService{@Autowriedprivate KafkaTemplate<String, String> template;//同步消息生产推送@Overridevoid sendMessage(String id){System.out.println("准备推送订单消息,已纳入队列 :" + id);//定义topic主题testtemplate.send( "test", id);}
}
4、使用KafkaListener监听器监听消息,处理消息消费.
//消息监听器
@Component
class KafkaMessageListener {//Kafka监听,监听对应的topic主题@KafkaListener(topics="test")void onMessage(ConsumerRecord<String, String> record){//消费业务处理System.out.println("已经完成业务消息消费,消费的数据是:" + record.value());}
}

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com

热搜词