1. 引入 jar
<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId>
</dependency>
2. yml 配置
spring:kafka:bootstrap-servers: localhost:9092,localhost:9093,localhost:9094 producer:retries: 0 acks: 1 batch-size: 16384 buffer-memory: 33554432 key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer consumer:enable-auto-commit: false auto-commit-interval: 100 auto-offset-reset: earliestmax-poll-records: 500 key-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializergroup-id: ${APP_NAME}listener:ack-mode: manual_immediate poll-timeout: 500ms
3. 实现代码
生产者:
@Slf4j
@Component
public class KafkaProducer {private final ExecutorService executorService = Executors.newFixedThreadPool(10);@Resourceprivate KafkaTemplate<String, String> kafkaTemplate;public void send(String topic, String msg) {executorService.submit(() -> kafkaTemplate.send(topic, msg).addCallback(result -> {if (result != null && result.getRecordMetadata() != null) {log.info("消息发送成功,offset = {}", result.getRecordMetadata().offset());} else {log.warn("消息发送完成,但结果或其元数据为空");}}, throwable -> log.error("消息发送失败,原因 = {}", throwable.getMessage())));}@PreDestroypublic void shutdown() {executorService.shutdown();try {if (!executorService.awaitTermination(500, TimeUnit.MILLISECONDS)) {executorService.shutdownNow();}} catch (InterruptedException e) {executorService.shutdownNow();Thread.currentThread().interrupt();}}
}
消费者:
@Component
public class KafkaConsumer {@KafkaListener(topics = "myTopic",groupId = "xxx", properties = "max.poll.records:5", concurrency = "3") public void listen(ConsumerRecord<?, ?> record) { ack.acknowledge();}
}