1、引入依赖
maven
<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>2.8.0</version>
</dependency>
gradle
dependencies {compile "org.springframework.kafka:spring-kafka
}
2、添加配置
在application.properties或application.yml中配置Kafka
spring: kafka: bootstrap-servers: localhost:9092 # Kafka集群的地址,格式为host:port,多个地址用逗号分隔 consumer: group-id: myGroup # 消费者群组ID,用于标识一组消费者实例 auto-offset-reset: earliest # 当Kafka中没有初始偏移量或当前偏移量不再存在时(例如,数据被删除),从何处开始读取:earliest表示从最早的记录开始,latest表示从最新的记录开始 key-deserializer: org.apache.kafka.common.serialization.StringDeserializer # 键的反序列化器,用于将字节转换为Java对象 value-deserializer: org.apache.kafka.common.serialization.StringDeserializer # 值的反序列化器,用于将字节转换为Java对象 producer: key-serializer: org.apache.kafka.common.serialization.StringSerializer # 键的序列化器,用于将Java对象转换为字节 value-serializer: org.apache.kafka.common.serialization.StringSerializer # 值的序列化器,用于将Java对象转换为字节
3、代码
1、方法
异步调用
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback; public class KafkaMessageSender { @Autowired private KafkaTemplate<String, String> kafkaTemplate; public void sendMessage(String topic, String message) { // 同步发送消息并等待响应 ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, message); try { SendResult<String, String> result = future.get(); // 阻塞等待发送结果 System.out.println("Sent message=[" + message + "] with topic=[" + topic + "]"); } catch (Exception e) { e.printStackTrace(); } } public void sendMessageWithCallback(String topic, String message) { // 异步发送消息并注册回调 ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, message); future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() { @Override public void onSuccess(SendResult<String, String> result) { System.out.println("Sent message=[" + message + "] with topic=[" + topic + "]"); } @Override public void onFailure(Throwable ex) { System.err.println("Failed to send message=[" + message + "] with topic=[" + topic + "]: " + ex.getMessage()); } }); } // 其他方法...
}
调用方法
public class SomeService { @Autowired private KafkaMessageSender kafkaMessageSender; public void someMethod() { String topic = "some-topic"; String message = "Hello, Kafka!"; kafkaMessageSender.sendMessageWithCallback(topic, message); }
}
同步调用
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;@Service
public class MessageProducer {private final KafkaTemplate<String, String> kafkaTemplate;public MessageProducer(KafkaTemplate<String, String> kafkaTemplate) {this.kafkaTemplate = kafkaTemplate;}public void sendMessage(String topic, String message) {try {kafkaTemplate.send(topic, message).get();System.out.println("Message sent successfully.");} catch (InterruptedException | ExecutionException e) {System.err.println("Failed to send message: " + e.getMessage());Thread.currentThread().interrupt();}}
}
2、send
1、发送简单的消息
ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, message);
2、发送带有键的消息
ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, key, message);
3、发送带有分区号的消息
``java
ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, partition, key, message);
4、发送带有时间戳的消息
```java
ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, partition, key, message, timestamp);
5、发送带有自定义头信息的消息
Map<String, Object> headers = new HashMap<>();
headers.put("myHeaderKey", "myHeaderValue");
ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, partition, key, message, timestamp, headers);
6、使用ProducerRecord发送消息
ProducerRecord<String, String> record = new ProducerRecord<>(topic, key, message);
ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(record);
7、使用Message发送消息(Spring Kafka 2.2及更高版本)
Message<String> message = MessageBuilder.withPayload(payload).setHeader(KafkaHeaders.TOPIC, topic).build();
ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(message);