目标:编写一个Python节点读取USB摄像头数据。
依赖条件 web camera 和 cv2
sudo apt install python3-pip
pip install numpy opencv-python pyarrow
新建一个文件夹 yolo_project (后续运行yolo目标检测),从dora 官方 github拷贝文件
wget https://raw.githubusercontent.com/dora-rs/dora/main/examples/python-operator-dataflow/webcam.py
wget https://raw.githubusercontent.com/dora-rs/dora/main/examples/python-operator-dataflow/plot.py
wget https://raw.githubusercontent.com/dora-rs/dora/main/examples/python-operator-dataflow/utils.py
创建数据流文件 dataflow_webcam.yml
nodes:
- id: web_cam
operator:
python: op_webcam/webcam.py
inputs:
tick: dora/timer/millis/30
outputs:
- image
- id: plot
operator:
python: op_webcam/plot.py
inputs:
image: web_cam/image
dora up
dora start dataflow_webcam.yml --attach --hot-reload --name webcam
上述参数中:
–attach:等待数据流完成后再返回。
–hot-reload:在数据流运行时修改Python Operator。
–name:命名数据流(可能比 UUID 简洁)。
程序运行以后会在桌面上新建一个窗口显示图像,持续20秒左右。
若需要结束数据流可以使用 dora stop 指令或是用 ctrl+C
在上述的功能中,从数据流文件 dataflow_webcam.yml可以看出 程序使用了两个python文件分别是 webcam.py 和 plot.py。webcam.py 内容如下,其利用opencv中的capture类读取USB 摄像头数据,将图像数据序列化以后发送到节点 plot.py 显示在窗口中。
通过yml文件中的参数(tick: dora/timer/millis/30)可以控制 webcam.py 中 def on_event(…)函数调用的周期,这里的tick参数有点类似时钟节拍的含义,每来一个节拍就进入 on_event(…)函数一次
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import time
from typing import Callable, Optional
import os
import cv2
import numpy as np
import pyarrow as pa
fr建工程om dora import DoraStatus
CAMERA_WIDTH = 640
CAMERA_HEIGHT = 480
CAMERA_INDEX = int(os.getenv("CAMERA_INDEX", 0))
font = cv2.FONT_HERSHEY_SIMPLEX
class Operator:
"""
Sending image from webcam to the dataflow
"""
def __init__(self):
self.video_capture = cv2.VideoCapture(CAMERA_INDEX)
self.start_time = time.time()
self.video_capture.set(cv2.CAP_PROP_FRAME_WIDTH, CAMERA_WIDTH)
self.video_capture.set(cv2.CAP_PROP_FRAME_HEIGHT, CAMERA_HEIGHT)
def on_event(
self,
dora_event: str,
send_output: Callable[[str, bytes | pa.Array, Optional[dict]], None],
) -> DoraStatus:
match dora_event["type"]:
case "INPUT":
ret, frame = self.video_capture.read()
if ret:
frame = cv2.resize(frame, (CAMERA_WIDTH, CA建工程MERA_HEIGHT))
## Push an error image in case the camera is not available.
else:
frame = np.zeros((CAMERA_HEIGHT, CAMERA_WIDTH, 3), dtype=np.uint8)
cv2.putText(
frame,
"No Webcam was found at index %d" % (CAMERA_INDEX),
(int(30), int(30)),
font,
0.75,
(255, 255, 255),
2,
1,
)
send_output(
"image",
pa.array(frame.ravel()),
dora_event["metadata"],
)
case "STOP":
print("received stop")
case other:
print("received unexpected event:", other)
if time.time() - self.start_time < 20:
return DoraStatus.CONTINUE
else:
return DoraStatus.STOP
def __del__(self):
self.video_capture.release()
[1] https://dora.carsmos.ai/docs/guides/getting-started/webcam_plot
dora-rs目前资料较少 欢迎大家点赞在评论区交流讨论(cenruping@vip.qq.com) O(∩_∩)O
或者加群水一波(1149897304)