Java技术栈 —— Spark入门(三)之实时视频流
创始人
2024-09-26 14:53:09
0

Java技术栈 —— Spark入门(三)之实时视频流转灰度图像

  • 一、将摄像头数据发送至kafka
  • 二、Kafka准备topic
  • 三、spark读取kafka图像数据并处理
  • 四、本地显示灰度图像(存在卡顿现象,待优化)

项目整体结构图如下

在这里插入图片描述

参考文章或视频链接
[1] Architecture-for-real-time-video-streaming-analytics

一、将摄像头数据发送至kafka

这个代码将运行在你有摄像头的机器上,缺依赖就装依赖

import cv2 import kafka import numpy as np  # 设置 Kafka Producer # 注意修改你的kafka地址 producer = kafka.KafkaProducer(bootstrap_servers='localhost:9092')  # 打开摄像头(0 为默认摄像头) cap = cv2.VideoCapture(0)  while True:     # 从摄像头捕获帧     ret, frame = cap.read()     if not ret:         break          # 将图像编码为 JPEG 格式     _, buffer = cv2.imencode('.jpg', frame)      # 将图像作为字节数组发送到 Kafka     producer.send('camera-images', buffer.tobytes())      # 显示当前捕获的帧     cv2.imshow('Video', frame)      # 按 'q' 键退出     if cv2.waitKey(1) & 0xFF == ord('q'):         break  # 释放资源 cap.release() cv2.destroyAllWindows() producer.close() 

二、Kafka准备topic

在准备topic之前,要先配置kafka中的config/server.properties文件,否则其它机器无法联通kafka,配置好后重启kafka。

# 找到这两个选项并修改成如下内容 listeners=PLAINTEXT://0.0.0.0:9092 # 改成你的kafka所在服务器ip advertised.listeners=PLAINTEXT://{your_ip}:9092 

如果你之前创建过topic,那就清空这些topic中的数据

# 设置保留时间为0,相当于立即清空数据 #bin/kafka-configs.sh --bootstrap-server localhost:9092 --alter --entity-type topics --entity-name {your_topic_name} --add-config retention.ms=0 # 恢复原始保留设置,立即清空数据后,将数据的保留时间恢复至原有状态 #bin/kafka-configs.sh --bootstrap-server localhost:9092 --alter --entity-type topics --entity-name {your_topic_name} --add-config retention.ms=604800000   

开始正式创建topic

# 创建输入图片所在topic bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic camera-images --partitions 1 --replication-factor 1 # 创建输出的gray灰度图片所在topic bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic result-gray-images --partitions 1 --replication-factor 1  # 准备好后查看下topic list进行验证 bin/kafka-topics.sh --bootstrap-server localhost:9092 --list # 查看某topic中的数据 bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic {your_topic_name} --from-beginning 

三、spark读取kafka图像数据并处理

首先给你的spark脚本所运行的python环境(这个环境一般可以为conda等虚拟环境),安装必要的依赖库

pip install opencv-python-headless 

准备脚本文件

from pyspark.sql import SparkSession from pyspark.sql.functions import udf from pyspark.sql.types import BinaryType import cv2 import numpy as np  bootstrapServers = "localhost:9092"  # 创建 SparkSession spark = SparkSession.builder \     .appName("Kafka-Spark-OpenCV") \     .getOrCreate()  # 初始化 Kafka Producer,用于发送处理后的图像 # 如果不这样做,会出现PicklingError,因为如果UDF中,包含了无法被序列化的对象,例如线程锁(_thread.RLock)或 Kafka 的 KafkaProducer 实例,序列化就会失败。 # 因此,在每个执行器内部,创建 KafkaProducer 实例 producer = None  # 从 Kafka 读取数据流 df = spark \   .readStream \   .format("kafka") \   .option("kafka.bootstrap.servers", "localhost:9092") \   .option("subscribe", "camera-images") \   .load()  # UDF 用于将图像转换为灰度 def convert_to_gray(image_bytes):     global producer      # 创建 KafkaProducer 实例(在每个执行器上只初始化一次)     if producer is None:         producer = KafkaProducer(bootstrap_servers = bootstrapServers)      # 将字节数组转换为 numpy 数组     nparr = np.frombuffer(image_bytes, np.uint8)     # 将 numpy 数组解码为图像     img = cv2.imdecode(nparr, cv2.IMREAD_COLOR)     # 将图像转换为灰度     gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)     # 将灰度图像编码为 JPEG     _, buffer = cv2.imencode('.jpg', gray)          # 将处理后的图像发送到 Kafka 'result-gray-images' 主题     producer.send('result-gray-images', buffer.tobytes())          return buffer.tobytes()  # 注册 UDF convert_to_gray_udf = udf(convert_to_gray, BinaryType())  # 应用 UDF 对数据进行灰度化处理 gray_df = df.withColumn("gray_image", convert_to_gray_udf("value"))  # 将处理后的数据写入文件或其他输出 query = gray_df.writeStream \     .outputMode("append") \     .format("console") \     .start()      # query = gray_df\ #     .writeStream \ #     .format('kafka') \ #     .outputMode('update') \ #     .option("kafka.bootstrap.servers", bootstrapServers) \ #     .option('checkpointLocation', '/spark/job-checkpoint') \ #     .option("topic", "result-gray-images") \ #     .start()  query.awaitTermination() 

spark-submit提交脚本文件:

# 1.提高内存 # 2.调整 Kafka 批次大小,减少单个批次的数据量,从而降低内存使用(这个步骤存疑) /opt/spark-3.5.2-bin-hadoop3/bin/spark-submit \ --executor-memory 4g \ --driver-memory 4g \ --conf "spark.kafka.maxOffsetsPerTrigger=1000" \ --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.2,org.apache.kafka:kafka-clients:3.5.2 \ /opt/spark-3.5.2-bin-hadoop3/jobs/pyjobs/kafka_to_spark.py 

四、本地显示灰度图像(存在卡顿现象,待优化)

import cv2 import numpy as np from kafka import KafkaConsumer  # 设置 Kafka Consumer consumer = KafkaConsumer(     'result-gray-images',     bootstrap_servers='{your_kafka_ip}:9092',     auto_offset_reset='latest',     enable_auto_commit=True,     # group_id='image-display-group' )  # 从 Kafka 主题读取灰度图像并显示 for message in consumer:     # print("reading gray image.... ")     # 将消息转换为 numpy 数组     nparr = np.frombuffer(message.value, np.uint8)     # 解码为图像     gray_img = cv2.imdecode(nparr, cv2.IMREAD_GRAYSCALE)     # 显示灰度图像     cv2.imshow('Gray Video', gray_img)     if cv2.waitKey(1) & 0xFF == ord('q'):         break  # 释放资源 cv2.destroyAllWindows() consumer.close() 

相关内容

热门资讯

每日必看教程透视!(WEPok... 每日必看教程透视!(WEPoke)外挂透明挂辅助透视(透视挂)软件透明挂(2025已更新)(哔哩哔哩...
实测揭晓!wepoker插件功... 《wepoker软件透明挂》是一款多人竞技的wepoker辅助透视游戏,你将微扑克对手来到同一个战场...
解密关于神器!(wePOke)... 解密关于神器!(wePOke)外挂透明挂辅助神器(透视挂)软件透明挂(2021已更新)(哔哩哔哩)是...
一分钟了解(wpk规律)外挂透... 一分钟了解(wpk规律)外挂透明挂辅助作弊(透视)辅助作弊(2023已更新)(哔哩哔哩)是一款可以让...
八分钟合作!(WEPoker)... 八分钟合作!(WEPoker)辅助透视脚本,(透视)wepoker有插件(2024已更新)(哔哩哔哩...
总算明白!(夜猫麻将)外挂透明... 总算明白!(夜猫麻将)外挂透明挂辅助测试,wpk透视辅助哪里下载,详细教程(2024已更新)(哔哩哔...
揭秘真相!wpk系统是否存在作... 揭秘真相!wpk系统是否存在作弊行为,wpk透视是真的吗,技巧教程(有挂教学)-哔哩哔哩是一款可以让...
记者爆料神器!(wepOKE)... 记者爆料神器!(wepOKE)外挂透明挂辅助神器(辅助透视)软件透明挂(2025已更新)(哔哩哔哩)...
大神推荐(wpk测试)外挂透明... 大神推荐(wpk测试)外挂透明挂辅助插件(透视)辅助作弊(2022已更新)(哔哩哔哩);超受欢迎的w...
6分钟猫腻!(wePoKer)... 6分钟猫腻!(wePoKer)辅助透视脚本,(透视)we poker辅助器v3.3(2024已更新)...