Files
ros-raspbot-v2/webrtc_streamer/webrtc_streamer.py
T
m5p3nc3r ef78f19e72 Add image and streaming from USB camera
Plus a little freshen up of the readme's
2026-05-07 16:38:36 +00:00

294 lines
11 KiB
Python

#!/usr/bin/env python3
"""
ROS2 node: /camera/image_raw → WebRTC stream (multi-client).
A single encode pipeline feeds all connected browsers simultaneously.
Each browser gets its own webrtcbin branch spliced onto a shared tee.
appsrc → videoconvert → vp8enc → tee ─┬→ queue → rtpvp8pay → webrtcbin (client 1)
└→ queue → rtpvp8pay → webrtcbin (client 2)
Signaling via WebSocket on PORT (default 8443). Point client.html at this host.
Initialisation order note: the ROS node (DDS) must be created before Gst.init()
is called. GLib's GObject type-system conflicts with DDS if GStreamer initialises
first. WebRTCStreamer.__init__ enforces this by calling super().__init__() before
touching GStreamer.
"""
import asyncio
import itertools
import json
import os
import threading
import numpy as np
import rclpy
from rclpy.executors import SingleThreadedExecutor
from rclpy.node import Node
from rclpy.qos import QoSProfile, ReliabilityPolicy, HistoryPolicy
from sensor_msgs.msg import Image
import gi
gi.require_version('Gst', '1.0')
gi.require_version('GstWebRTC', '1.0')
gi.require_version('GstSdp', '1.0')
from gi.repository import Gst, GstWebRTC, GstSdp, GLib
import websockets
IMAGE_TOPIC = os.environ.get('IMAGE_TOPIC', '/camera/image_raw')
PORT = int(os.environ.get('PORT', '8443'))
BITRATE = int(os.environ.get('BITRATE', '2000000'))
_client_ids = itertools.count()
# ------------------------------------------------------------------ #
# GStreamer pipeline #
# ------------------------------------------------------------------ #
class StreamManager:
"""Shared encode pipeline. Clients attach/detach webrtcbin branches dynamically."""
def __init__(self, logger):
self._log = logger
desc = (
"appsrc name=src format=time is-live=true do-timestamp=true block=false ! "
"videoconvert ! "
f"vp8enc deadline=1 target-bitrate={BITRATE} keyframe-max-dist=5 cpu-used=8 ! "
"tee name=t allow-not-linked=true"
)
self.pipeline = Gst.parse_launch(desc)
self.appsrc = self.pipeline.get_by_name('src')
self.tee = self.pipeline.get_by_name('t')
self._clients = {}
self._lock = threading.Lock()
self._caps_set = False
self._bus = self.pipeline.get_bus()
self._bus.add_signal_watch()
self._bus.connect('message::error', self._on_error)
self._bus.connect('message::warning', self._on_warning)
def _on_error(self, bus, msg):
err, dbg = msg.parse_error()
self._log.error(f'GStreamer error: {err.message}{dbg}')
def _on_warning(self, bus, msg):
warn, dbg = msg.parse_warning()
self._log.warning(f'GStreamer warning: {warn.message}{dbg}')
def start(self):
self.pipeline.set_state(Gst.State.PLAYING)
def stop(self):
self.pipeline.set_state(Gst.State.NULL)
def push_frame(self, img):
h, w = img.shape[:2]
if not self._caps_set:
self.appsrc.set_property(
'caps',
Gst.Caps.from_string(
f"video/x-raw,format=BGR,width={w},height={h},framerate=0/1"
)
)
self._caps_set = True
buf = Gst.Buffer.new_wrapped(bytes(img))
self.appsrc.emit('push-buffer', buf)
def add_client(self, client_id, on_offer, on_ice):
"""Splice a new queue → rtpvp8pay → webrtcbin branch onto the tee."""
queue = Gst.ElementFactory.make('queue', f'queue_{client_id}')
pay = Gst.ElementFactory.make('rtpvp8pay', f'pay_{client_id}')
capsfilter = Gst.ElementFactory.make('capsfilter', f'caps_{client_id}')
webrtcbin = Gst.ElementFactory.make('webrtcbin', f'webrtc_{client_id}')
capsfilter.set_property('caps', Gst.Caps.from_string(
'application/x-rtp,media=video,encoding-name=VP8,payload=96'
))
webrtcbin.set_property('bundle-policy', GstWebRTC.WebRTCBundlePolicy.MAX_BUNDLE)
webrtcbin.connect('on-negotiation-needed', lambda el: on_offer(el))
webrtcbin.connect('on-ice-candidate', lambda el, idx, cand: on_ice(idx, cand))
with self._lock:
for el in (queue, pay, capsfilter, webrtcbin):
self.pipeline.add(el)
queue.link(pay)
pay.link(capsfilter)
# GStreamer 1.22+: explicit sink pad request required.
# capsfilter.link(webrtcbin) silently succeeds but never triggers
# on-negotiation-needed because no transceiver is registered.
webrtc_sink = webrtcbin.request_pad_simple('sink_%u')
capsfilter.get_static_pad('src').link(webrtc_sink)
tee_src = self.tee.request_pad_simple('src_%u')
tee_src.link(queue.get_static_pad('sink'))
for el in (queue, pay, capsfilter, webrtcbin):
el.sync_state_with_parent()
self._clients[client_id] = {
'elements': (queue, pay, capsfilter, webrtcbin),
'tee_src': tee_src,
'webrtc_sink': webrtc_sink,
}
self._log.info(f'Client {client_id} added ({len(self._clients)} connected)')
return webrtcbin
def remove_client(self, client_id):
"""Unlink and discard a client's pipeline branch."""
with self._lock:
if client_id not in self._clients:
return
info = self._clients.pop(client_id)
tee_src = info['tee_src']
webrtc_sink = info['webrtc_sink']
queue, pay, capsfilter, webrtcbin = info['elements']
tee_src.unlink(queue.get_static_pad('sink'))
self.tee.release_request_pad(tee_src)
capsfilter.get_static_pad('src').unlink(webrtc_sink)
webrtcbin.release_request_pad(webrtc_sink)
for el in (webrtcbin, capsfilter, pay, queue):
el.set_state(Gst.State.NULL)
self.pipeline.remove(el)
self._log.info(f'Client {client_id} removed ({len(self._clients)} connected)')
# ------------------------------------------------------------------ #
# ROS2 node — owns the GStreamer pipeline #
# ------------------------------------------------------------------ #
class WebRTCStreamer(Node):
def __init__(self):
# DDS must be initialised before GStreamer — see module docstring.
super().__init__('webrtc_streamer')
Gst.init(None)
self._glib_loop = GLib.MainLoop()
threading.Thread(target=self._glib_loop.run, daemon=True).start()
self.manager = StreamManager(self.get_logger())
self.manager.start()
self._frame_count = 0
qos = QoSProfile(
# reliability=ReliabilityPolicy.BEST_EFFORT,
history=HistoryPolicy.KEEP_LAST,
depth=1,
)
self.sub = self.create_subscription(Image, IMAGE_TOPIC, self._on_image, qos)
self.get_logger().info(f'Subscribed to {IMAGE_TOPIC}')
self.get_logger().info(f'WebRTC signaling on ws://0.0.0.0:{PORT}')
def _on_image(self, msg):
# Decode the raw sensor_msgs/Image without cv_bridge / OpenCV.
# v4l2_camera publishes bgr8 or rgb8; videoconvert handles anything else.
img = np.frombuffer(msg.data, dtype=np.uint8).reshape(msg.height, msg.width, -1)
if msg.encoding == 'rgb8':
img = img[:, :, ::-1] # RGB → BGR (view; bytes() below makes it contiguous)
self._frame_count += 1
if self._frame_count % 100 == 0:
self.get_logger().info(f'Frames: {self._frame_count}')
self.manager.push_frame(img)
# ------------------------------------------------------------------ #
# WebRTC signaling #
# ------------------------------------------------------------------ #
class WebRTCClient:
def __init__(self, client_id, websocket, manager, loop):
self.client_id = client_id
self.ws = websocket
self.loop = loop
self.webrtcbin = manager.add_client(
client_id,
on_offer=self._on_negotiation_needed,
on_ice=self._send_ice,
)
def _on_negotiation_needed(self, element):
promise = Gst.Promise.new_with_change_func(self._on_offer_created, element)
element.emit('create-offer', None, promise)
def _on_offer_created(self, promise, element):
reply = promise.get_reply()
offer = reply.get_value('offer')
element.emit('set-local-description', offer, Gst.Promise.new())
msg = json.dumps({'type': 'offer', 'sdp': offer.sdp.as_text()})
asyncio.run_coroutine_threadsafe(self.ws.send(msg), self.loop)
def _send_ice(self, mline_index, candidate):
msg = json.dumps({'type': 'ice', 'mlineindex': mline_index, 'candidate': candidate})
asyncio.run_coroutine_threadsafe(self.ws.send(msg), self.loop)
def handle_answer(self, sdp_text):
_, sdp = GstSdp.SDPMessage.new_from_text(sdp_text)
answer = GstWebRTC.WebRTCSessionDescription.new(GstWebRTC.WebRTCSDPType.ANSWER, sdp)
self.webrtcbin.emit('set-remote-description', answer, Gst.Promise.new())
def handle_ice(self, mline_index, candidate):
self.webrtcbin.emit('add-ice-candidate', mline_index, candidate)
def stop(self, manager):
manager.remove_client(self.client_id)
async def handler(websocket, manager):
client_id = next(_client_ids)
client = WebRTCClient(client_id, websocket, manager, asyncio.get_running_loop())
try:
async for raw in websocket:
msg = json.loads(raw)
if msg['type'] == 'answer':
client.handle_answer(msg['sdp'])
elif msg['type'] == 'ice':
client.handle_ice(msg['mlineindex'], msg['candidate'])
except websockets.exceptions.ConnectionClosed:
pass
finally:
client.stop(manager)
# ------------------------------------------------------------------ #
# Entry point #
# ------------------------------------------------------------------ #
async def run_signaling(manager):
async with websockets.serve(
lambda ws, *_: handler(ws, manager), '0.0.0.0', PORT
):
await asyncio.Future()
def main():
rclpy.init()
# WebRTCStreamer.__init__ creates the ROS node first (DDS), then GStreamer.
node = WebRTCStreamer()
executor = SingleThreadedExecutor()
executor.add_node(node)
def spin():
while rclpy.ok():
try:
executor.spin_once(timeout_sec=0.1)
except AttributeError:
pass # known rclpy/Kilted executor bug; non-fatal
threading.Thread(target=spin, daemon=True).start()
try:
asyncio.run(run_signaling(node.manager))
finally:
node.manager.stop()
executor.shutdown()
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()