欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 健康 > 美食 > Springboot集成Kafka

Springboot集成Kafka

2024/10/24 2:02:16 来源:https://blog.csdn.net/weixin_44825912/article/details/142960184  浏览:    关键词:Springboot集成Kafka

一、添加依赖

        我们使用spring本身支持的spring-kafka依赖,但是需要注意版本问题,不同的springboot版本支持不同的kafka版本,避免因版本不同带来困扰!参考下图:

        或者访问官网查看版本对应关系:Spring for Apache Kafka

        本教程springboot版本使用的是2.6.13,对应kafka版本为2.8.x以上。

<properties><java.version>1.8</java.version><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding><spring-boot.version>2.6.13</spring-boot.version>
</properties>
<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!-- kafka依赖 --><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><version>2.8.10</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency>
</dependencies>

二、配置application.yml

        application.yml文件添加kafka配置,kafka配置有很多,此教程只使用了部分,后续有时间会对其他配置再做研究。

# 应用服务web访问端口
server:port: 8088spring:#kafka配置kafka:bootstrap-servers: 192.168.219.200:9092producer:# 发生错误后,消息重发的次数retries: 0# 键的序列化方式key-serializer: org.apache.kafka.common.serialization.StringSerializer# 值的序列化方式value-serializer: org.apache.kafka.common.serialization.StringSerializerconsumer:group-id: test-consumer-group# 键的反序列化方式key-deserializer: org.apache.kafka.common.serialization.StringDeserializer# 值的反序列化方式value-deserializer: org.apache.kafka.common.serialization.StringDeserializerlistener:# 当消费者监听的topic不存在时,保证项目能够启动。missing-topics-fatal: false

三、代码实现

生产者

package com.studykafka.springkafka.demos.kafka;import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;/*** Kafka生产者** @author xiafan* @Date 2023-09-01 13:48*/
@Component
public class KafkaProducer {private static final Logger log= LoggerFactory.getLogger(KafkaProducer.class);@Autowiredprivate KafkaTemplate<String, Object> kafkaTemplate;/*** 生产者简单发送消息* @param topic* @param msg*/public void send(String topic,String msg){log.info("topic为:{},发送消息内容:{}", topic, msg);kafkaTemplate.send(topic,msg);}
}

消费者

package com.studykafka.springkafka.demos.kafka;import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;/*** kafka消费者* * @author xiafan* @Date 2023-09-01 13:48*/
@Component
public class KafkaConsumer {private static final Logger log= LoggerFactory.getLogger(KafkaConsumer.class);/*** 消费者监听消息** @param record*/@KafkaListener(topics = {"demo"})public void onMessage(ConsumerRecord<?, ?> record){//消费的哪个topic、partition的消息,打印出消息内容log.info("简单消费topic为:{},分区partition为:{},内容为:{}", record.topic(),record.partition(), record.value());}
}

测试类

package com.studykafka.springkafka.demos.web;import com.studykafka.springkafka.demos.kafka.KafkaProducer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;/*** kafka测试类** @author xiafan* @Date 2023-09-01 14:12*/
@RequestMapping("/kafka")
@RestController
public class KafkaController {@Autowiredprivate KafkaProducer kafkaProducer;@GetMapping("/sendMsg")public void sendMsg(@RequestParam String message){kafkaProducer.send("demo", message);}
}

        输入http://ip:port/kafka/sendMsg,观察控制台打印信息,消费者输出的信息为:

        从上结果可以看到 -》topic:demo,partition:8,已经消费了消息。

我们从kafka-ui界面也可以看到分区8增加了一条消息:

        为什么会有10个分区呢?还记得在上一章节Linux安装kafka中,设置了num.partitions=10,所以在发送消息时,kafka会自动创建10个分区,并将消息负载均衡到分区8消费了。

        而offset是当前partition中的数据个数的偏移量,从0开始,Next Offset是下一次消息的偏移量。

再次调用一次,发现消息被分区0消费了(负载均衡)。

        到此为止,已经完成了springboot和kafka的集成!

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com