Files
ros-raspbot-v2/webui/backend/webrtc.py
T

40 lines
1.1 KiB
Python

import asyncio
import os
import av
import cv2
from aiortc import VideoStreamTrack
CAMERA_DEVICE = int(os.environ.get('CAMERA_DEVICE', '0'))
class CameraVideoTrack(VideoStreamTrack):
def __init__(self):
super().__init__()
self._cap = cv2.VideoCapture(CAMERA_DEVICE)
self._cap.set(cv2.CAP_PROP_FRAME_WIDTH, 640)
self._cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 480)
self._cap.set(cv2.CAP_PROP_FPS, 30)
self._cap.set(cv2.CAP_PROP_BUFFERSIZE, 1)
async def recv(self) -> av.VideoFrame:
pts, time_base = await self.next_timestamp()
loop = asyncio.get_event_loop()
ret, frame = await loop.run_in_executor(None, self._cap.read)
if not ret:
import numpy as np
frame = np.zeros((480, 640, 3), dtype=np.uint8)
else:
frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
video_frame = av.VideoFrame.from_ndarray(frame, format='rgb24')
video_frame.pts = pts
video_frame.time_base = time_base
return video_frame
def __del__(self) -> None:
if self._cap:
self._cap.release()