首先有安装好的 kafka 环境,点我查看安装教程
环境安装
pip install kafka-python -i https://pypi.tuna.tsinghua.edu.cn/simple/
生产者
import json
import traceback
from kafka import KafkaProducer
from kafka.errors import kafka_errorsdef producer_demo():producer = KafkaProducer(bootstrap_servers=['localhost:9092'], key_serializer=lambda k: json.dumps(k).encode(),value_serializer=lambda v: json.dumps(v).encode())future = producer.send('mykafka',key='creater', # 同一个key值,会被送至同一个分区value="{'creater':'zhangsan', 'date':'2023-04-04'}",partition=0) # 向分区1发送消息future.get(timeout=100)if __name__ == "__main__":producer_demo()
消费者
import json
import time
import traceback
from kafka import KafkaConsumer
from kafka.errors import kafka_errorsdef consumer_demo():consumer = KafkaConsumer('mykafka',bootstrap_servers="127.0.0.1:9092",auto_offset_reset='earliest')for message in consumer:print(message)print(json.loads(message.value))# print(a)if __name__ == "__main__":while True:consumer_demo()time.sleep(1)