本博文源于笔者对kafka的学习,先遵循着对kafka的简单学习,然后go操作一下kafka,包括发送消息、消费消息、列出所有topic,与创建topic。内容都已经由笔者亲自测试过。
文章目录
- 1.kafka的学习
- 1.1 启动kafka与zookeeper
- 1.2 创建topic
- 1.3 生产消息
- 1.4 消费之前的消息
- 1.5 指定偏移量消费
- 1.4 消费最新的信息
- 2 go操作
- 2.1 发送消息
- 2.2 消费消息
- 2.3 列出所有topic
- 2.4 创建topic
- 参考文档
1.kafka的学习
1.1 启动kafka与zookeeper
kafka与zookeeper是相关联的
bin/zookeeper-server-start.sh config/zookeeper.properties
与
bin/kafka-server-start.sh config/server.properties
1.2 创建topic
bin/kafka-topics.sh --create --topic hello --bootstrap-server 主机名:9092
1.3 生产消息
bin/kafka-console-producer.sh --broker-list 主机名:9092 --topic hello
运行后可以发送多条,ctrl+c退出
1.4 消费之前的消息
bin/kafka-console-consumer.sh --bootstrap-server 主机名:9092 --from-beginning --topic hello
1.5 指定偏移量消费
bin/kafka-console-consumer.sh --bootstrap-server 主机名:9092 --partition 0 --offset 1 --topic hello
1.4 消费最新的信息
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic hello
2 go操作
2.1 发送消息
// Kafka 配置
const (KafkaBroker = "u8sMaster:9092" // 替换为你的 Kafka Broker 地址KafkaTopic = "k8s-version" // Kafka 主题
)func main() {sendMesgKafka()
}func sendMesgKafka() {w := kafka.NewWriter(kafka.WriterConfig{Brokers: []string{KafkaBroker},Topic: KafkaTopic,Balancer: &kafka.LeastBytes{},})err := w.WriteMessages(context.Background(),kafka.Message{Key: []byte("Key-A"),Value: []byte("one!"),},kafka.Message{Key: []byte("Key-B"),Value: []byte("two!"),},kafka.Message{Key: []byte("Key-C"),Value: []byte("three!"),},)if err != nil {log.Fatal("failed to write messages:", err)}if err := w.Close(); err != nil {log.Fatal("failed to close writer:", err)}fmt.Println("Message sent successfully")}
2.2 消费消息
// to consume messages
topic := "test"
partition := 0conn, err := kafka.DialLeader(context.Background(), "tcp", "u8sMaster:9092", topic, partition)
if err != nil {log.Fatal("failed to dial leader:", err)
}conn.SetReadDeadline(time.Now().Add(10*time.Second))
batch := conn.ReadBatch(10e3, 1e6) // fetch 10KB min, 1MB maxb := make([]byte, 10e3) // 10KB max per message
for {n, err := batch.Read(b)if err != nil {break}fmt.Println(string(b[:n]))
}if err := batch.Close(); err != nil {log.Fatal("failed to close batch:", err)
}if err := conn.Close(); err != nil {log.Fatal("failed to close connection:", err)
}
2.3 列出所有topic
func main() {conn, err := kafka.Dial("tcp", "u8sMaster:9092")if err != nil {panic(err.Error())}defer conn.Close()partitions, err := conn.ReadPartitions()if err != nil {panic(err.Error())}m := map[string]struct{}{}for _, p := range partitions {m[p.Topic] = struct{}{}}for k := range m {fmt.Println(k)}
}
2.4 创建topic
func main() {conn, err := kafka.DialLeader(context.Background(), "tcp", "u9sMaster:9092", "topic2", 0)if err != nil {panic(err.Error())}
}
精准地创建topic
func main() {conn, err := kafka.Dial("tcp", "u8sMaster:9092")if err != nil {panic(err.Error())}defer conn.Close()controller, err := conn.Controller()if err != nil {panic(err.Error())}var connLeader *kafka.ConnconnLeader, err = kafka.Dial("tcp", net.JoinHostPort(controller.Host, strconv.Itoa(controller.Port)))if err != nil {panic(err.Error())}defer connLeader.Close()
}
这里省略了kafka集群的配置,未来有机会补充
参考文档
参考文档一