欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 新闻 > 社会 > kafak推送消息。

kafak推送消息。

2024/11/30 3:49:38 来源:https://blog.csdn.net/qq_42478982/article/details/141594533  浏览:    关键词:kafak推送消息。

1、引入依赖

maven

<dependency>  <groupId>org.springframework.kafka</groupId>  <artifactId>spring-kafka</artifactId>  <version>2.8.0</version>  
</dependency>

gradle

dependencies {compile "org.springframework.kafka:spring-kafka
}

2、添加配置

在application.properties或application.yml中配置Kafka

spring:  kafka:  bootstrap-servers: localhost:9092 # Kafka集群的地址,格式为host:port,多个地址用逗号分隔  consumer:  group-id: myGroup # 消费者群组ID,用于标识一组消费者实例  auto-offset-reset: earliest # 当Kafka中没有初始偏移量或当前偏移量不再存在时(例如,数据被删除),从何处开始读取:earliest表示从最早的记录开始,latest表示从最新的记录开始  key-deserializer: org.apache.kafka.common.serialization.StringDeserializer # 键的反序列化器,用于将字节转换为Java对象  value-deserializer: org.apache.kafka.common.serialization.StringDeserializer # 值的反序列化器,用于将字节转换为Java对象  producer:  key-serializer: org.apache.kafka.common.serialization.StringSerializer # 键的序列化器,用于将Java对象转换为字节  value-serializer: org.apache.kafka.common.serialization.StringSerializer # 值的序列化器,用于将Java对象转换为字节

3、代码

1、方法

异步调用

import org.springframework.beans.factory.annotation.Autowired;  
import org.springframework.kafka.core.KafkaTemplate;  
import org.springframework.kafka.support.SendResult;  
import org.springframework.util.concurrent.ListenableFuture;  
import org.springframework.util.concurrent.ListenableFutureCallback;  public class KafkaMessageSender {  @Autowired  private KafkaTemplate<String, String> kafkaTemplate;  public void sendMessage(String topic, String message) {  // 同步发送消息并等待响应  ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, message);  try {  SendResult<String, String> result = future.get(); // 阻塞等待发送结果  System.out.println("Sent message=[" + message + "] with topic=[" + topic + "]");  } catch (Exception e) {  e.printStackTrace();  }  }  public void sendMessageWithCallback(String topic, String message) {  // 异步发送消息并注册回调  ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, message);  future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {  @Override  public void onSuccess(SendResult<String, String> result) {  System.out.println("Sent message=[" + message + "] with topic=[" + topic + "]");  }  @Override  public void onFailure(Throwable ex) {  System.err.println("Failed to send message=[" + message + "] with topic=[" + topic + "]: " + ex.getMessage());  }  });  }  // 其他方法...  
}

调用方法

public class SomeService {  @Autowired  private KafkaMessageSender kafkaMessageSender;  public void someMethod() {  String topic = "some-topic";  String message = "Hello, Kafka!";  kafkaMessageSender.sendMessageWithCallback(topic, message);  }  
}

同步调用

import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;@Service
public class MessageProducer {private final KafkaTemplate<String, String> kafkaTemplate;public MessageProducer(KafkaTemplate<String, String> kafkaTemplate) {this.kafkaTemplate = kafkaTemplate;}public void sendMessage(String topic, String message) {try {kafkaTemplate.send(topic, message).get();System.out.println("Message sent successfully.");} catch (InterruptedException | ExecutionException e) {System.err.println("Failed to send message: " + e.getMessage());Thread.currentThread().interrupt();}}
}
2、send

1、发送简单的消息

ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, message);

2、发送带有键的消息

ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, key, message);

3、发送带有分区号的消息
``java
ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, partition, key, message);

4、发送带有时间戳的消息
```java
ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, partition, key, message, timestamp);

5、发送带有自定义头信息的消息

Map<String, Object> headers = new HashMap<>();  
headers.put("myHeaderKey", "myHeaderValue");  
ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, partition, key, message, timestamp, headers);

6、使用ProducerRecord发送消息

ProducerRecord<String, String> record = new ProducerRecord<>(topic, key, message);  
ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(record);

7、使用Message发送消息(Spring Kafka 2.2及更高版本)

Message<String> message = MessageBuilder.withPayload(payload).setHeader(KafkaHeaders.TOPIC, topic).build();  
ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(message);

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com