kafka 发送事件的几种方式
package com.wanfeng.producer;import com.wanfeng.model.GirlFriend;
import jakarta.annotation.Resource;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.kafka.support.SendResult;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;import java.nio.charset.StandardCharsets;
import java.util.concurrent.CompletableFuture;/*** 作者:晚枫* 时间:2024/9/1 8:57*/
@Component
public class EventProducer {@Resourceprivate KafkaTemplate<String, Object> kafkaTemplate;public void sendEvent() {// 参数一:kafka 主题名字// 参数二:需要发送的事件kafkaTemplate.send("hello", "喜欢欣宝");}public void sendEvent2() {Message<String> message = MessageBuilder.withPayload("超级喜欢欣宝")// 在 header 中放 topic 的名字.setHeader(KafkaHeaders.TOPIC, "hello").build();kafkaTemplate.send(message);}public void sendEvent3() {// 可以在头部带一些自定义信息Headers headers = new RecordHeaders();headers.add("生日", "20010424".getBytes(StandardCharsets.UTF_8));// String topic, Integer partition, Long timestamp, K key, V value, Iterable<Header> headersProducerRecord<String, Object> message = new ProducerRecord<>("hello", 0, System.currentTimeMillis(), "姓名", "爱欣宝", headers);kafkaTemplate.send(message);}public void sendEvent4() {// String topic, Integer partition, Long timestamp, K key, V datakafkaTemplate.send("hello", 0, System.currentTimeMillis(), "name", "爱欣宝");}public void sendEvent5() {// Integer partition, Long timestamp, K key, V datakafkaTemplate.sendDefault(0, System.currentTimeMillis(), "address", "广东");}public void sendEvent6() {CompletableFuture<SendResult<String, Object>> sendResultCompletableFuture = kafkaTemplate.sendDefault(0, System.currentTimeMillis(), "name", "欣宝宝");try {// 阻塞等待的方式拿结果SendResult<String, Object> stringStringSendResult = sendResultCompletableFuture.get();if (stringStringSendResult.getRecordMetadata() != null) {System.out.println("消息发送成功:" + stringStringSendResult.getRecordMetadata().toString());}System.out.println("producerRecord:" + stringStringSendResult.getProducerRecord());} catch (Exception e) {throw new RuntimeException(e);}}public void sendEvent7() {CompletableFuture<SendResult<String, Object>> sendResultCompletableFuture = kafkaTemplate.sendDefault(0, System.currentTimeMillis(), "name", "欣宝宝");// 非阻塞方式拿结果sendResultCompletableFuture.thenAccept(sendResult -> {if (sendResult.getRecordMetadata() != null) {System.out.println("消息发送成功:" + sendResult.getRecordMetadata().toString());}System.out.println("producerRecord:" + sendResult.getProducerRecord());});}public void sendEvent8() {GirlFriend myGirlFriend = GirlFriend.builder().name("欣宝宝").birthday("2001-04-24").build();kafkaTemplate.sendDefault(0, System.currentTimeMillis(), "girlFriend", myGirlFriend);}
}
在发送对象类型数据的时候,需要更换序列化方式,因为生产者的值默认使用字符串序列化方式,当我们发送对象类型数据的时候就会报错,所以我们需要更换序列化方式,在 application.yml 配置文件中配置即可
spring:application:# 应用名称name: kafka-01-basekafka:# kafka 连接地址bootstrap-servers: ip:port# consumer:# 让消费者从最早的事件开始读取# auto-offset-reset: earliesttemplate:# 使用模版配置默认 topicdefault-topic: helloproducer:# 生产者 value 的序列化方式value-serializer: org.springframework.kafka.support.serializer.JsonSerializer