ef78f19e72
Plus a little freshen up of the readme's
294 lines
11 KiB
Python
294 lines
11 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
ROS2 node: /camera/image_raw → WebRTC stream (multi-client).
|
|
|
|
A single encode pipeline feeds all connected browsers simultaneously.
|
|
Each browser gets its own webrtcbin branch spliced onto a shared tee.
|
|
|
|
appsrc → videoconvert → vp8enc → tee ─┬→ queue → rtpvp8pay → webrtcbin (client 1)
|
|
└→ queue → rtpvp8pay → webrtcbin (client 2)
|
|
|
|
Signaling via WebSocket on PORT (default 8443). Point client.html at this host.
|
|
|
|
Initialisation order note: the ROS node (DDS) must be created before Gst.init()
|
|
is called. GLib's GObject type-system conflicts with DDS if GStreamer initialises
|
|
first. WebRTCStreamer.__init__ enforces this by calling super().__init__() before
|
|
touching GStreamer.
|
|
"""
|
|
|
|
import asyncio
|
|
import itertools
|
|
import json
|
|
import os
|
|
import threading
|
|
|
|
import numpy as np
|
|
|
|
import rclpy
|
|
from rclpy.executors import SingleThreadedExecutor
|
|
from rclpy.node import Node
|
|
from rclpy.qos import QoSProfile, ReliabilityPolicy, HistoryPolicy
|
|
from sensor_msgs.msg import Image
|
|
|
|
import gi
|
|
gi.require_version('Gst', '1.0')
|
|
gi.require_version('GstWebRTC', '1.0')
|
|
gi.require_version('GstSdp', '1.0')
|
|
from gi.repository import Gst, GstWebRTC, GstSdp, GLib
|
|
|
|
import websockets
|
|
|
|
IMAGE_TOPIC = os.environ.get('IMAGE_TOPIC', '/camera/image_raw')
|
|
PORT = int(os.environ.get('PORT', '8443'))
|
|
BITRATE = int(os.environ.get('BITRATE', '2000000'))
|
|
|
|
_client_ids = itertools.count()
|
|
|
|
|
|
# ------------------------------------------------------------------ #
|
|
# GStreamer pipeline #
|
|
# ------------------------------------------------------------------ #
|
|
|
|
class StreamManager:
|
|
"""Shared encode pipeline. Clients attach/detach webrtcbin branches dynamically."""
|
|
|
|
def __init__(self, logger):
|
|
self._log = logger
|
|
desc = (
|
|
"appsrc name=src format=time is-live=true do-timestamp=true block=false ! "
|
|
"videoconvert ! "
|
|
f"vp8enc deadline=1 target-bitrate={BITRATE} keyframe-max-dist=5 cpu-used=8 ! "
|
|
"tee name=t allow-not-linked=true"
|
|
)
|
|
self.pipeline = Gst.parse_launch(desc)
|
|
self.appsrc = self.pipeline.get_by_name('src')
|
|
self.tee = self.pipeline.get_by_name('t')
|
|
self._clients = {}
|
|
self._lock = threading.Lock()
|
|
self._caps_set = False
|
|
|
|
self._bus = self.pipeline.get_bus()
|
|
self._bus.add_signal_watch()
|
|
self._bus.connect('message::error', self._on_error)
|
|
self._bus.connect('message::warning', self._on_warning)
|
|
|
|
def _on_error(self, bus, msg):
|
|
err, dbg = msg.parse_error()
|
|
self._log.error(f'GStreamer error: {err.message} — {dbg}')
|
|
|
|
def _on_warning(self, bus, msg):
|
|
warn, dbg = msg.parse_warning()
|
|
self._log.warning(f'GStreamer warning: {warn.message} — {dbg}')
|
|
|
|
def start(self):
|
|
self.pipeline.set_state(Gst.State.PLAYING)
|
|
|
|
def stop(self):
|
|
self.pipeline.set_state(Gst.State.NULL)
|
|
|
|
def push_frame(self, img):
|
|
h, w = img.shape[:2]
|
|
if not self._caps_set:
|
|
self.appsrc.set_property(
|
|
'caps',
|
|
Gst.Caps.from_string(
|
|
f"video/x-raw,format=BGR,width={w},height={h},framerate=0/1"
|
|
)
|
|
)
|
|
self._caps_set = True
|
|
buf = Gst.Buffer.new_wrapped(bytes(img))
|
|
self.appsrc.emit('push-buffer', buf)
|
|
|
|
def add_client(self, client_id, on_offer, on_ice):
|
|
"""Splice a new queue → rtpvp8pay → webrtcbin branch onto the tee."""
|
|
queue = Gst.ElementFactory.make('queue', f'queue_{client_id}')
|
|
pay = Gst.ElementFactory.make('rtpvp8pay', f'pay_{client_id}')
|
|
capsfilter = Gst.ElementFactory.make('capsfilter', f'caps_{client_id}')
|
|
webrtcbin = Gst.ElementFactory.make('webrtcbin', f'webrtc_{client_id}')
|
|
|
|
capsfilter.set_property('caps', Gst.Caps.from_string(
|
|
'application/x-rtp,media=video,encoding-name=VP8,payload=96'
|
|
))
|
|
webrtcbin.set_property('bundle-policy', GstWebRTC.WebRTCBundlePolicy.MAX_BUNDLE)
|
|
webrtcbin.connect('on-negotiation-needed', lambda el: on_offer(el))
|
|
webrtcbin.connect('on-ice-candidate', lambda el, idx, cand: on_ice(idx, cand))
|
|
|
|
with self._lock:
|
|
for el in (queue, pay, capsfilter, webrtcbin):
|
|
self.pipeline.add(el)
|
|
queue.link(pay)
|
|
pay.link(capsfilter)
|
|
# GStreamer 1.22+: explicit sink pad request required.
|
|
# capsfilter.link(webrtcbin) silently succeeds but never triggers
|
|
# on-negotiation-needed because no transceiver is registered.
|
|
webrtc_sink = webrtcbin.request_pad_simple('sink_%u')
|
|
capsfilter.get_static_pad('src').link(webrtc_sink)
|
|
tee_src = self.tee.request_pad_simple('src_%u')
|
|
tee_src.link(queue.get_static_pad('sink'))
|
|
for el in (queue, pay, capsfilter, webrtcbin):
|
|
el.sync_state_with_parent()
|
|
self._clients[client_id] = {
|
|
'elements': (queue, pay, capsfilter, webrtcbin),
|
|
'tee_src': tee_src,
|
|
'webrtc_sink': webrtc_sink,
|
|
}
|
|
self._log.info(f'Client {client_id} added ({len(self._clients)} connected)')
|
|
return webrtcbin
|
|
|
|
def remove_client(self, client_id):
|
|
"""Unlink and discard a client's pipeline branch."""
|
|
with self._lock:
|
|
if client_id not in self._clients:
|
|
return
|
|
info = self._clients.pop(client_id)
|
|
tee_src = info['tee_src']
|
|
webrtc_sink = info['webrtc_sink']
|
|
queue, pay, capsfilter, webrtcbin = info['elements']
|
|
tee_src.unlink(queue.get_static_pad('sink'))
|
|
self.tee.release_request_pad(tee_src)
|
|
capsfilter.get_static_pad('src').unlink(webrtc_sink)
|
|
webrtcbin.release_request_pad(webrtc_sink)
|
|
for el in (webrtcbin, capsfilter, pay, queue):
|
|
el.set_state(Gst.State.NULL)
|
|
self.pipeline.remove(el)
|
|
self._log.info(f'Client {client_id} removed ({len(self._clients)} connected)')
|
|
|
|
|
|
# ------------------------------------------------------------------ #
|
|
# ROS2 node — owns the GStreamer pipeline #
|
|
# ------------------------------------------------------------------ #
|
|
|
|
class WebRTCStreamer(Node):
|
|
def __init__(self):
|
|
# DDS must be initialised before GStreamer — see module docstring.
|
|
super().__init__('webrtc_streamer')
|
|
|
|
Gst.init(None)
|
|
self._glib_loop = GLib.MainLoop()
|
|
threading.Thread(target=self._glib_loop.run, daemon=True).start()
|
|
|
|
self.manager = StreamManager(self.get_logger())
|
|
self.manager.start()
|
|
|
|
self._frame_count = 0
|
|
qos = QoSProfile(
|
|
# reliability=ReliabilityPolicy.BEST_EFFORT,
|
|
history=HistoryPolicy.KEEP_LAST,
|
|
depth=1,
|
|
)
|
|
self.sub = self.create_subscription(Image, IMAGE_TOPIC, self._on_image, qos)
|
|
|
|
self.get_logger().info(f'Subscribed to {IMAGE_TOPIC}')
|
|
self.get_logger().info(f'WebRTC signaling on ws://0.0.0.0:{PORT}')
|
|
|
|
def _on_image(self, msg):
|
|
# Decode the raw sensor_msgs/Image without cv_bridge / OpenCV.
|
|
# v4l2_camera publishes bgr8 or rgb8; videoconvert handles anything else.
|
|
img = np.frombuffer(msg.data, dtype=np.uint8).reshape(msg.height, msg.width, -1)
|
|
if msg.encoding == 'rgb8':
|
|
img = img[:, :, ::-1] # RGB → BGR (view; bytes() below makes it contiguous)
|
|
self._frame_count += 1
|
|
if self._frame_count % 100 == 0:
|
|
self.get_logger().info(f'Frames: {self._frame_count}')
|
|
self.manager.push_frame(img)
|
|
|
|
|
|
# ------------------------------------------------------------------ #
|
|
# WebRTC signaling #
|
|
# ------------------------------------------------------------------ #
|
|
|
|
class WebRTCClient:
|
|
def __init__(self, client_id, websocket, manager, loop):
|
|
self.client_id = client_id
|
|
self.ws = websocket
|
|
self.loop = loop
|
|
self.webrtcbin = manager.add_client(
|
|
client_id,
|
|
on_offer=self._on_negotiation_needed,
|
|
on_ice=self._send_ice,
|
|
)
|
|
|
|
def _on_negotiation_needed(self, element):
|
|
promise = Gst.Promise.new_with_change_func(self._on_offer_created, element)
|
|
element.emit('create-offer', None, promise)
|
|
|
|
def _on_offer_created(self, promise, element):
|
|
reply = promise.get_reply()
|
|
offer = reply.get_value('offer')
|
|
element.emit('set-local-description', offer, Gst.Promise.new())
|
|
msg = json.dumps({'type': 'offer', 'sdp': offer.sdp.as_text()})
|
|
asyncio.run_coroutine_threadsafe(self.ws.send(msg), self.loop)
|
|
|
|
def _send_ice(self, mline_index, candidate):
|
|
msg = json.dumps({'type': 'ice', 'mlineindex': mline_index, 'candidate': candidate})
|
|
asyncio.run_coroutine_threadsafe(self.ws.send(msg), self.loop)
|
|
|
|
def handle_answer(self, sdp_text):
|
|
_, sdp = GstSdp.SDPMessage.new_from_text(sdp_text)
|
|
answer = GstWebRTC.WebRTCSessionDescription.new(GstWebRTC.WebRTCSDPType.ANSWER, sdp)
|
|
self.webrtcbin.emit('set-remote-description', answer, Gst.Promise.new())
|
|
|
|
def handle_ice(self, mline_index, candidate):
|
|
self.webrtcbin.emit('add-ice-candidate', mline_index, candidate)
|
|
|
|
def stop(self, manager):
|
|
manager.remove_client(self.client_id)
|
|
|
|
|
|
async def handler(websocket, manager):
|
|
client_id = next(_client_ids)
|
|
client = WebRTCClient(client_id, websocket, manager, asyncio.get_running_loop())
|
|
try:
|
|
async for raw in websocket:
|
|
msg = json.loads(raw)
|
|
if msg['type'] == 'answer':
|
|
client.handle_answer(msg['sdp'])
|
|
elif msg['type'] == 'ice':
|
|
client.handle_ice(msg['mlineindex'], msg['candidate'])
|
|
except websockets.exceptions.ConnectionClosed:
|
|
pass
|
|
finally:
|
|
client.stop(manager)
|
|
|
|
|
|
# ------------------------------------------------------------------ #
|
|
# Entry point #
|
|
# ------------------------------------------------------------------ #
|
|
|
|
async def run_signaling(manager):
|
|
async with websockets.serve(
|
|
lambda ws, *_: handler(ws, manager), '0.0.0.0', PORT
|
|
):
|
|
await asyncio.Future()
|
|
|
|
|
|
def main():
|
|
rclpy.init()
|
|
|
|
# WebRTCStreamer.__init__ creates the ROS node first (DDS), then GStreamer.
|
|
node = WebRTCStreamer()
|
|
|
|
executor = SingleThreadedExecutor()
|
|
executor.add_node(node)
|
|
|
|
def spin():
|
|
while rclpy.ok():
|
|
try:
|
|
executor.spin_once(timeout_sec=0.1)
|
|
except AttributeError:
|
|
pass # known rclpy/Kilted executor bug; non-fatal
|
|
|
|
threading.Thread(target=spin, daemon=True).start()
|
|
|
|
try:
|
|
asyncio.run(run_signaling(node.manager))
|
|
finally:
|
|
node.manager.stop()
|
|
executor.shutdown()
|
|
node.destroy_node()
|
|
rclpy.shutdown()
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|