Gradio 的百万月活用户之旅!
阅读更多Gradio 的百万月活用户之旅!
阅读更多在本指南中,我们将使用 RT-DETR 模型来检测用户上传视频中的物体。我们将使用 Gradio 5.0 中引入的新的视频流式传输功能从服务器流式传输结果。
首先,我们将在我们的系统中安装以下要求
opencv-python
torch
transformers>=4.43.0
spaces
然后,我们将从 Hugging Face Hub 下载模型
from transformers import RTDetrForObjectDetection, RTDetrImageProcessor
image_processor = RTDetrImageProcessor.from_pretrained("PekingU/rtdetr_r50vd")
model = RTDetrForObjectDetection.from_pretrained("PekingU/rtdetr_r50vd").to("cuda")
我们正在将模型移动到 GPU。我们将把我们的模型部署到 Hugging Face Spaces 并在 免费 ZeroGPU 集群 中运行推理。
我们的推理函数将接受一个视频和一个期望的置信度阈值。物体检测模型识别许多物体并为每个物体分配一个置信度分数。置信度越低,误报的可能性越高。因此,我们将让用户设置会议阈值。
我们的函数将迭代视频中的帧,并在每一帧上运行 RT-DETR 模型。然后,我们将在帧中为每个检测到的物体绘制边界框,并将帧保存到新的输出视频中。该函数将以两秒的块为单位生成每个输出视频。
为了尽可能降低 ZeroGPU 上的推理时间(存在基于时间的配额),我们将输出视频中的原始每秒帧数减半,并在运行模型之前将输入帧调整为原始尺寸的一半。
以下是推理函数的代码 - 我们将逐段介绍它。
import spaces
import cv2
from PIL import Image
import torch
import time
import numpy as np
import uuid
from draw_boxes import draw_bounding_boxes
SUBSAMPLE = 2
@spaces.GPU
def stream_object_detection(video, conf_threshold):
cap = cv2.VideoCapture(video)
# This means we will output mp4 videos
video_codec = cv2.VideoWriter_fourcc(*"mp4v") # type: ignore
fps = int(cap.get(cv2.CAP_PROP_FPS))
desired_fps = fps // SUBSAMPLE
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) // 2
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) // 2
iterating, frame = cap.read()
n_frames = 0
# Use UUID to create a unique video file
output_video_name = f"output_{uuid.uuid4()}.mp4"
# Output Video
output_video = cv2.VideoWriter(output_video_name, video_codec, desired_fps, (width, height)) # type: ignore
batch = []
while iterating:
frame = cv2.resize( frame, (0,0), fx=0.5, fy=0.5)
frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
if n_frames % SUBSAMPLE == 0:
batch.append(frame)
if len(batch) == 2 * desired_fps:
inputs = image_processor(images=batch, return_tensors="pt").to("cuda")
with torch.no_grad():
outputs = model(**inputs)
boxes = image_processor.post_process_object_detection(
outputs,
target_sizes=torch.tensor([(height, width)] * len(batch)),
threshold=conf_threshold)
for i, (array, box) in enumerate(zip(batch, boxes)):
pil_image = draw_bounding_boxes(Image.fromarray(array), box, model, conf_threshold)
frame = np.array(pil_image)
# Convert RGB to BGR
frame = frame[:, :, ::-1].copy()
output_video.write(frame)
batch = []
output_video.release()
yield output_video_name
output_video_name = f"output_{uuid.uuid4()}.mp4"
output_video = cv2.VideoWriter(output_video_name, video_codec, desired_fps, (width, height)) # type: ignore
iterating, frame = cap.read()
n_frames += 1
在 python 中创建视频的行业标准之一是 OpenCV,因此我们将在本应用中使用它。
cap
变量是我们如何从输入视频中读取的方式。每当我们调用 cap.read()
时,我们都在读取视频中的下一帧。
为了在 Gradio 中流式传输视频,我们需要为输出视频的每个“块”生成不同的视频文件。我们使用 output_video = cv2.VideoWriter(output_video_name, video_codec, desired_fps, (width, height))
行创建要写入的下一个视频文件。video_codec
是我们指定视频文件类型的方式。目前视频流式传输仅支持“mp4”和“ts”文件。
对于视频中的每一帧,我们将调整其大小为一半大小。OpenCV 以 BGR
格式读取文件,因此将转换为 transfomers 期望的 RGB
格式。这就是 while 循环的前两行正在做的事情。
我们每隔一帧取一帧并将其添加到 batch
列表中,以便输出视频的 FPS 为原始 FPS 的一半。当批次覆盖两秒的视频时,我们将运行模型。选择两秒阈值是为了使每个批次的处理时间足够短,以便视频在服务器中平滑显示,同时又不需要太多单独的前向传递。为了使视频流式传输在 Gradio 中正常工作,批次大小应至少为 1 秒。
我们运行模型的前向传递,然后使用模型的 post_process_object_detection
方法将检测到的边界框缩放到输入帧的大小。
我们使用自定义函数来绘制边界框(来源 此处)。然后我们必须从 RGB
转换为 BGR
,然后再写回输出视频。
完成批次处理后,我们为下一个批次创建一个新的输出视频文件。
UI 代码与其他类型的 Gradio 应用非常相似。我们将使用标准的双列布局,以便用户可以并排看到输入和输出视频。
为了使流式传输工作,我们必须在输出视频中设置 streaming=True
。将视频设置为自动播放不是必需的,但它可以为用户提供更好的体验。
import gradio as gr
with gr.Blocks() as app:
gr.HTML(
"""
<h1 style='text-align: center'>
Video Object Detection with <a href='https://hugging-face.cn/PekingU/rtdetr_r101vd_coco_o365' target='_blank'>RT-DETR</a>
</h1>
""")
with gr.Row():
with gr.Column():
video = gr.Video(label="Video Source")
conf_threshold = gr.Slider(
label="Confidence Threshold",
minimum=0.0,
maximum=1.0,
step=0.05,
value=0.30,
)
with gr.Column():
output_video = gr.Video(label="Processed Video", streaming=True, autoplay=True)
video.upload(
fn=stream_object_detection,
inputs=[video, conf_threshold],
outputs=[output_video],
)
你可以在 Hugging Face Spaces 上查看我们的演示 此处。
它也嵌入在此页面下方