废话不多说直接上代码:
1.pom
<!-- https://mvnrepository.com/artifact/org.springframework.kafka/spring-kafka --><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><version>3.1.1</version></dependency>
2.配置
spring:application:name: mqdemokafka:bootstrap-servers: 10.228.48.28:39092,10.228.48.19:39092,10.228.48.21:39092consumer:group-id: my-groupauto-offset-reset: earliestproducer:key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializer
3.service
package com.example.demo.service;import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;/*** @author wangjn* @Description* @createTime 2024-06-13 14:02:00*/
@Service
public class KafkaService {private final KafkaTemplate<String, String> kafkaTemplate;public KafkaService(KafkaTemplate<String, String> kafkaTemplate) {this.kafkaTemplate = kafkaTemplate;}public void sendMessage(String topic, String message) {kafkaTemplate.send(topic, message);}@KafkaListener(topics = "my-topic")public void listen(String message) {System.out.println("Received message: " + message);}
}
4.controller
package com.example.demo.web;import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;/*** @author wangjn* @Description* @createTime 2024-06-13 14:08:00*/
@RestController
public class KafkaController {private static final Logger LOGGER = LoggerFactory.getLogger(KafkaController.class);@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;@PostMapping("/sendToKafka")public String sendMessageToKafka(@RequestParam("message") String message) {kafkaTemplate.send("my-topic", message);return "Message sent to Kafka successfully!";}@GetMapping("/consumeFromKafka")public String simulateConsumeFromKafka() {return "Checking for messages...";}@KafkaListener(topics = "my-topic")public void listenToKafka(String message) {LOGGER.info("Message received from Kafka: {}", message);}
}