结构
分为消费者 Croducer 主题 Topic 生产者 Producer
其中Topic包括
Topic中包括了分区和offset偏移量的概念;
一个简单的Kafka发消息收消息java代码:
消费者:
@Component
public class EventConsumer {@Resourceprivate KafkaTemplate<String,String> kafkaTemplate;// 采用监听得方式接收@KafkaListener(topics = {"test"},groupId = "hello")public void onEvent(String event){System.out.println("读取到了时间消息: " + event);}
}
生产者
@Component
public class EventProducer {@Resourceprivate KafkaTemplate<String,String> kafkaTemplate;// 消息发送public void sendEvent(String message){kafkaTemplate.send("test",message);}
}
消费者偏移量的设置
发送消息的一些方法
// 消息发送
public void sendEvent(String message){kafkaTemplate.send("test",message);
}// 发送message对象
public void sendEvent2(String message){// 创建message对象Message<String> kafka = MessageBuilder.withPayload(message)// 设置topic.setHeader(KafkaHeaders.TOPIC,"test").build();kafkaTemplate.send(kafka);
}public void sendEvent3(String message){// 消费者可以从Headers中取到信息.Headers headers = new RecordHeaders();headers.add("orderId","123".getBytes(StandardCharsets.UTF_8));ProducerRecord<String,String> record = new ProducerRecord<>("test",0,System.currentTimeMillis(),"k1",message,headers);kafkaTemplate.send(record);
}