#!/usr/bin/env python3 """ ROS2 node: /camera/image_raw → WebRTC stream (multi-client). A single encode pipeline feeds all connected browsers simultaneously. Each browser gets its own webrtcbin branch spliced onto a shared tee. appsrc → videoconvert → vp8enc → tee ─┬→ queue → rtpvp8pay → webrtcbin (client 1) └→ queue → rtpvp8pay → webrtcbin (client 2) Signaling via WebSocket on PORT (default 8443). Point client.html at this host. Initialisation order note: the ROS node (DDS) must be created before Gst.init() is called. GLib's GObject type-system conflicts with DDS if GStreamer initialises first. WebRTCStreamer.__init__ enforces this by calling super().__init__() before touching GStreamer. """ import asyncio import itertools import json import os import threading import numpy as np import rclpy from rclpy.executors import SingleThreadedExecutor from rclpy.node import Node from rclpy.qos import QoSProfile, ReliabilityPolicy, HistoryPolicy from sensor_msgs.msg import Image import gi gi.require_version('Gst', '1.0') gi.require_version('GstWebRTC', '1.0') gi.require_version('GstSdp', '1.0') from gi.repository import Gst, GstWebRTC, GstSdp, GLib import websockets IMAGE_TOPIC = os.environ.get('IMAGE_TOPIC', '/camera/image_raw') PORT = int(os.environ.get('PORT', '8443')) BITRATE = int(os.environ.get('BITRATE', '2000000')) _client_ids = itertools.count() # ------------------------------------------------------------------ # # GStreamer pipeline # # ------------------------------------------------------------------ # class StreamManager: """Shared encode pipeline. Clients attach/detach webrtcbin branches dynamically.""" def __init__(self, logger): self._log = logger desc = ( "appsrc name=src format=time is-live=true do-timestamp=true block=false ! " "videoconvert ! " f"vp8enc deadline=1 target-bitrate={BITRATE} keyframe-max-dist=5 cpu-used=8 ! " "tee name=t allow-not-linked=true" ) self.pipeline = Gst.parse_launch(desc) self.appsrc = self.pipeline.get_by_name('src') self.tee = self.pipeline.get_by_name('t') self._clients = {} self._lock = threading.Lock() self._caps_set = False self._bus = self.pipeline.get_bus() self._bus.add_signal_watch() self._bus.connect('message::error', self._on_error) self._bus.connect('message::warning', self._on_warning) def _on_error(self, bus, msg): err, dbg = msg.parse_error() self._log.error(f'GStreamer error: {err.message} — {dbg}') def _on_warning(self, bus, msg): warn, dbg = msg.parse_warning() self._log.warning(f'GStreamer warning: {warn.message} — {dbg}') def start(self): self.pipeline.set_state(Gst.State.PLAYING) def stop(self): self.pipeline.set_state(Gst.State.NULL) def push_frame(self, img): h, w = img.shape[:2] if not self._caps_set: self.appsrc.set_property( 'caps', Gst.Caps.from_string( f"video/x-raw,format=BGR,width={w},height={h},framerate=0/1" ) ) self._caps_set = True buf = Gst.Buffer.new_wrapped(bytes(img)) self.appsrc.emit('push-buffer', buf) def add_client(self, client_id, on_offer, on_ice): """Splice a new queue → rtpvp8pay → webrtcbin branch onto the tee.""" queue = Gst.ElementFactory.make('queue', f'queue_{client_id}') pay = Gst.ElementFactory.make('rtpvp8pay', f'pay_{client_id}') capsfilter = Gst.ElementFactory.make('capsfilter', f'caps_{client_id}') webrtcbin = Gst.ElementFactory.make('webrtcbin', f'webrtc_{client_id}') capsfilter.set_property('caps', Gst.Caps.from_string( 'application/x-rtp,media=video,encoding-name=VP8,payload=96' )) webrtcbin.set_property('bundle-policy', GstWebRTC.WebRTCBundlePolicy.MAX_BUNDLE) webrtcbin.connect('on-negotiation-needed', lambda el: on_offer(el)) webrtcbin.connect('on-ice-candidate', lambda el, idx, cand: on_ice(idx, cand)) with self._lock: for el in (queue, pay, capsfilter, webrtcbin): self.pipeline.add(el) queue.link(pay) pay.link(capsfilter) # GStreamer 1.22+: explicit sink pad request required. # capsfilter.link(webrtcbin) silently succeeds but never triggers # on-negotiation-needed because no transceiver is registered. webrtc_sink = webrtcbin.request_pad_simple('sink_%u') capsfilter.get_static_pad('src').link(webrtc_sink) tee_src = self.tee.request_pad_simple('src_%u') tee_src.link(queue.get_static_pad('sink')) for el in (queue, pay, capsfilter, webrtcbin): el.sync_state_with_parent() self._clients[client_id] = { 'elements': (queue, pay, capsfilter, webrtcbin), 'tee_src': tee_src, 'webrtc_sink': webrtc_sink, } self._log.info(f'Client {client_id} added ({len(self._clients)} connected)') return webrtcbin def remove_client(self, client_id): """Unlink and discard a client's pipeline branch.""" with self._lock: if client_id not in self._clients: return info = self._clients.pop(client_id) tee_src = info['tee_src'] webrtc_sink = info['webrtc_sink'] queue, pay, capsfilter, webrtcbin = info['elements'] tee_src.unlink(queue.get_static_pad('sink')) self.tee.release_request_pad(tee_src) capsfilter.get_static_pad('src').unlink(webrtc_sink) webrtcbin.release_request_pad(webrtc_sink) for el in (webrtcbin, capsfilter, pay, queue): el.set_state(Gst.State.NULL) self.pipeline.remove(el) self._log.info(f'Client {client_id} removed ({len(self._clients)} connected)') # ------------------------------------------------------------------ # # ROS2 node — owns the GStreamer pipeline # # ------------------------------------------------------------------ # class WebRTCStreamer(Node): def __init__(self): # DDS must be initialised before GStreamer — see module docstring. super().__init__('webrtc_streamer') Gst.init(None) self._glib_loop = GLib.MainLoop() threading.Thread(target=self._glib_loop.run, daemon=True).start() self.manager = StreamManager(self.get_logger()) self.manager.start() self._frame_count = 0 qos = QoSProfile( # reliability=ReliabilityPolicy.BEST_EFFORT, history=HistoryPolicy.KEEP_LAST, depth=1, ) self.sub = self.create_subscription(Image, IMAGE_TOPIC, self._on_image, qos) self.get_logger().info(f'Subscribed to {IMAGE_TOPIC}') self.get_logger().info(f'WebRTC signaling on ws://0.0.0.0:{PORT}') def _on_image(self, msg): # Decode the raw sensor_msgs/Image without cv_bridge / OpenCV. # v4l2_camera publishes bgr8 or rgb8; videoconvert handles anything else. img = np.frombuffer(msg.data, dtype=np.uint8).reshape(msg.height, msg.width, -1) if msg.encoding == 'rgb8': img = img[:, :, ::-1] # RGB → BGR (view; bytes() below makes it contiguous) self._frame_count += 1 if self._frame_count % 100 == 0: self.get_logger().info(f'Frames: {self._frame_count}') self.manager.push_frame(img) # ------------------------------------------------------------------ # # WebRTC signaling # # ------------------------------------------------------------------ # class WebRTCClient: def __init__(self, client_id, websocket, manager, loop): self.client_id = client_id self.ws = websocket self.loop = loop self.webrtcbin = manager.add_client( client_id, on_offer=self._on_negotiation_needed, on_ice=self._send_ice, ) def _on_negotiation_needed(self, element): promise = Gst.Promise.new_with_change_func(self._on_offer_created, element) element.emit('create-offer', None, promise) def _on_offer_created(self, promise, element): reply = promise.get_reply() offer = reply.get_value('offer') element.emit('set-local-description', offer, Gst.Promise.new()) msg = json.dumps({'type': 'offer', 'sdp': offer.sdp.as_text()}) asyncio.run_coroutine_threadsafe(self.ws.send(msg), self.loop) def _send_ice(self, mline_index, candidate): msg = json.dumps({'type': 'ice', 'mlineindex': mline_index, 'candidate': candidate}) asyncio.run_coroutine_threadsafe(self.ws.send(msg), self.loop) def handle_answer(self, sdp_text): _, sdp = GstSdp.SDPMessage.new_from_text(sdp_text) answer = GstWebRTC.WebRTCSessionDescription.new(GstWebRTC.WebRTCSDPType.ANSWER, sdp) self.webrtcbin.emit('set-remote-description', answer, Gst.Promise.new()) def handle_ice(self, mline_index, candidate): self.webrtcbin.emit('add-ice-candidate', mline_index, candidate) def stop(self, manager): manager.remove_client(self.client_id) async def handler(websocket, manager): client_id = next(_client_ids) client = WebRTCClient(client_id, websocket, manager, asyncio.get_running_loop()) try: async for raw in websocket: msg = json.loads(raw) if msg['type'] == 'answer': client.handle_answer(msg['sdp']) elif msg['type'] == 'ice': client.handle_ice(msg['mlineindex'], msg['candidate']) except websockets.exceptions.ConnectionClosed: pass finally: client.stop(manager) # ------------------------------------------------------------------ # # Entry point # # ------------------------------------------------------------------ # async def run_signaling(manager): async with websockets.serve( lambda ws, *_: handler(ws, manager), '0.0.0.0', PORT ): await asyncio.Future() def main(): rclpy.init() # WebRTCStreamer.__init__ creates the ROS node first (DDS), then GStreamer. node = WebRTCStreamer() executor = SingleThreadedExecutor() executor.add_node(node) def spin(): while rclpy.ok(): try: executor.spin_once(timeout_sec=0.1) except AttributeError: pass # known rclpy/Kilted executor bug; non-fatal threading.Thread(target=spin, daemon=True).start() try: asyncio.run(run_signaling(node.manager)) finally: node.manager.stop() executor.shutdown() node.destroy_node() rclpy.shutdown() if __name__ == '__main__': main()