欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 文旅 > 美景 > 消息中间件-Kafka3-kafkaJavaClient小例

消息中间件-Kafka3-kafkaJavaClient小例

2025/2/24 7:48:35 来源:https://blog.csdn.net/siyueguoji/article/details/144267495  浏览:    关键词:消息中间件-Kafka3-kafkaJavaClient小例

消息中间件-Kafka3-kafkaJavaClient小例

  • Kafak Java Client
   private static final String KAFKA_TOPIC = "kafak-test";private static String bootstrapServers = "localhost:9092";private static AdminClient client = null;static {Properties config = new Properties();config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);client = AdminClient.create(config);}

在pom.xml 添加kafka client依赖
在这里插入图片描述

  • 客户端创建主题
@Test
public  void createTopic() {try {NewTopic topic = new NewTopic(KAFKA_TOPIC, 1, (short) 1);// 提交创建topic请求client.createTopics(Collections.singleton(topic)).all().get();System.out.println("Topic created successfully");}catch (Exception e) {e.printStackTrace();}finally {if (client != null) client.close();}
}
  • 客户端获取主题
@Test
public  void fetchTopics() {try {ListTopicsResult result = client.listTopics();KafkaFuture<Set<String>> set = result.names();System.out.println(set.get());}catch (Exception e) {e.printStackTrace();}finally {if (client != null) client.close();}}
  • 客户端生产者发送消息
@Test
public  void produceMsg() {Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092"); // Kafka集群地址props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");Producer<String, String> producer = null;try {// 创建生产者实例producer = new KafkaProducer<>(props);// 发送消息producer.send(new ProducerRecord<>(KAFKA_TOPIC, "MSG-1005","Hello, 1005!"), (metadata, exception) ->{if (exception == null) {System.out.println("消息发送成功,主题:" + metadata.topic() + ", 分区:" + metadata.partition());}else {exception.printStackTrace();}});}catch (Exception e) {e.printStackTrace();}finally {// 关闭生成者if (producer != null) producer.close();}
}
  • 客户端消费者消费消息
@Test
public  void consumeMsg() {// 配置消费者Properties properties = new Properties();properties.put("bootstrap.servers", "localhost:9092");properties.put("group.id", "kafka-consumer-group-001");properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");KafkaConsumer<String, String> consumer = null;try {// 创建消费者实例consumer = new KafkaConsumer<>(properties);// 订阅主题consumer.subscribe(Arrays.asList(KAFKA_TOPIC));// 轮询消费消息while(true) {ConsumerRecords<String, String> records = consumer.poll(100); // 每100ms执行一次for (ConsumerRecord record : records) {System.out.printf("Offset: %d, Key: %s, Value: %s\n", record.offset(), record.key(), record.value());}}}catch (Exception e) {e.printStackTrace();}finally {if (consumer != null) consumer.close();}
}

后续将使用这些测试小例来调试Kafka源码,当然也可以执行Kafka自带的可执行脚本与kafka交互,进行源码分析,只是通过java代码的方式更加直观。

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com

热搜词