欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 房产 > 家装 > (Python) Structured Streaming读取Kafka源实时处理图像

(Python) Structured Streaming读取Kafka源实时处理图像

2025/3/17 23:47:29 来源:https://blog.csdn.net/qq_56691739/article/details/142413302  浏览:    关键词:(Python) Structured Streaming读取Kafka源实时处理图像

Producer.py

import cv2
from kafka import KafkaProducer
import os
import os.path as osp# Kafka 服务器地址
bootstrap_servers = ['xxx.xxx.xxx.xxx:9092'] #terminal运行ifconfig可以查看localhost# 创建 Kafka 生产者
producer = KafkaProducer(bootstrap_servers=bootstrap_servers)# 图片路径
image_path = 'your_path'files_and_folders = os.listdir(image_path)
for i in files_and_folders:try:# 读取图片image_data = cv2.imread(osp.join(image_path, i))if image_data is None:continue  # 如果图片读取失败,则跳过# 将图片数据转换为字节流_, buffer = cv2.imencode('.jpg', image_data)byte_data = buffer.tobytes()# 将字节流发送到 Kafkaproducer.send('image_test', byte_data)except Exception as e:print(f"Error processing image {i}: {e}")continue  # 即使发生错误,也继续处理下一个文件# 等待所有消息被发送
producer.flush()

从本地文件夹中读取几张图像模拟输入,采用CV的方法进行读取,将数据转化为字节流发送

Consumer.py

from pyspark.sql import SparkSession
import numpy as np
import cv2
import os
import matplotlib.pyplot as plt# 初始化 SparkSession
os.environ["SPARK_HOME"] = r"/usr/local/spark"
os.environ["PYSPARK_PYTHON"] = r"/usr/local/anaconda3/bin/python3"
os.environ["PYSPARK_DRIVER_PYTHON"] = r"/usr/local/anaconda3/bin/python3"
os.environ["JAVA_HOME"] = r"/usr/lib/jvm/java-8-openjdk-amd64"spark = SparkSession.builder \.appName("KafkaSparkStreaming") \.config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.3") \.getOrCreate()
sc = spark.sparkContextsave_path = "save_path"
# Kafka 服务器地址和主题
bootstrap_servers = 'xxx.xxx.xxx.xxx:9092'
kafka_topic = 'image_test'# 创建 Kafka 源
kafka_df = spark \.readStream \.format("kafka") \.option("kafka.bootstrap.servers", bootstrap_servers) \.option("subscribe", kafka_topic) \.load()# 将二进制数据转换为图像
def binary_to_image(binary_data):# 将二进制数据转换为 numpy 数组image_array = np.frombuffer(binary_data, np.uint8)# 使用 OpenCV 解码图像image = cv2.imdecode(image_array, cv2.IMREAD_COLOR)return imagedef plt_show_image(image_list, batch_id):for i, image in enumerate(image_list):# 将BGR图像转换为RGB图像image_rgb = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)plt.imshow(image)plt.title(f'Image {i + 1} in Batch {batch_id}')plt.axis('off')  # 不显示坐标轴plt.show()def transform_to_gray(sc, img_list):"""将原图像转为灰度图:param sc: sparkContext:param img_list::return: img_list"""list_rdd = sc.parallelize(img_list)result = list_rdd.map(lambda img: cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)).collect()return resultdef process_batch(batch_df, batch_id):print(f" batch ID: {batch_id}")# 将二进制数据转换为图像image_list = batch_df.rdd.map(lambda row: binary_to_image(row['value'])).collect()# 转换为灰度图像result = transform_to_gray(sc, image_list)# 展示图像plt_show_image(result, batch_id)query = kafka_df.select("value") \.writeStream \.foreachBatch(process_batch) \.outputMode("append") \.start()
query.awaitTermination()

环境配置正常可以不用os.environ[]配置临时环境 

笔者尝试直接用CV2展示图像,但只能展示处理后的第一批的图像,且会影响重新运行Producer接收数据,于是改用Plt进行展示,由于Producer采用CV的方式读取图片,其为BGR图像,而Plt为RGB图像,需要先进行一步转化

本案例中采用简单的灰度转化进行测试

附上原图和结果:

文件夹下有两张图片,这里只展示一张

重新运行一次Producer

两个微批的两张照片都成功并接收

附上更多的图像处理方法以供测试

def process_clahe(image):b, g, r = cv2.split(image)clahe = cv2.createCLAHE(clipLimit=2, tileGridSize=(8, 8))clahe_b = clahe.apply(b)clahe_g = clahe.apply(g)clahe_r = clahe.apply(r)filtered_image = cv2.merge((clahe_b, clahe_g, clahe_r))return filtered_imagedef process_hole(image):gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)# 转换为二值图ret, thresh1 = cv2.threshold(gray, 127, 255, cv2.THRESH_BINARY)# 转换为布尔值thresh1 = thresh1 > 1# 去除外部噪点stage1 = morphology.remove_small_objects(thresh1, min_size=256, connectivity=2)stage2 = morphology.remove_small_holes(stage1, area_threshold=5000, connectivity=1)stage2 = stage2.astype('uint8')filtered_image = cv2.cvtColor(stage2 * 255, cv2.COLOR_GRAY2RGB)# # 转换为灰度图# gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)# # 转换为二值图# ret, thresh = cv2.threshold(gray, 127, 255, cv2.THRESH_BINARY)## # 去除外部噪点(使用开运算)# kernel = np.ones((3, 3), np.uint8)# opening = cv2.morphologyEx(thresh, cv2.MORPH_OPEN, kernel)## # 去除小孔(使用闭运算)# kernel_large = np.ones((20, 20), np.uint8)  # 根据需要调整内核大小# closing = cv2.morphologyEx(opening, cv2.MORPH_CLOSE, kernel_large)## # 将处理后的图像转换回原始图像的大小和类型# stage2 = cv2.resize(closing, (image.shape[1], image.shape[0]), interpolation=cv2.INTER_AREA)# stage2 = stage2.astype('uint8')## # 将单通道图像转换为三通道图像# filtered_image = cv2.cvtColor(stage2, cv2.COLOR_GRAY2RGB)return filtered_imagedef filter_image(sc, img_list, mode):"""按照mode处理图像:param sc: sparkContext:param img_list::param mode: 处理图像模式1.GaussianBlur2.medianBlur3.SHARPEN4.CLAHE5.Hole:return: img_list"""list_rdd = sc.parallelize(img_list)global result# 高斯滤波if mode == "GaussianBlur":image_rdd = list_rdd.map(lambda img: cv2.GaussianBlur(img, (5, 5), 0))result = image_rdd.collect()# 中值滤波if mode == "medianBlur":image_rdd = list_rdd.map(lambda img: cv2.medianBlur(img, 5))result = image_rdd.collect()# 锐化if mode == "SHARPEN":kernel = np.array([[0, -1, 0],[-1, 5, -1],[0, -1, 0]], dtype=int)# 4邻域模板# kernel = np.array([# [0, -1, 0],# [-1, 4, -1],# [0, -1, 0]], dtype=int)# 拉普拉斯模板# kernel = np.array([# [-1, -1, -1],# [-1, 8, -1],# [-1, -1, -1]], dtype=int)# 8邻域模板# kernel = np.array([# [-1, -1, -1],# [-1, 9, -1],# [-1, -1, -1]], dtype=int)image_rdd = list_rdd.map(lambda img: cv2.filter2D(img, -1, kernel))result = image_rdd.collect()# CLAHEif mode == "CLAHE":image_rdd = list_rdd.map(process_clahe)result = image_rdd.collect()# 孔洞填充if mode == "Hole":image_rdd = list_rdd.map(process_hole)result = image_rdd.collect()return result

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com

热搜词