Files
m5p3nc3r ef78f19e72 Add image and streaming from USB camera
Plus a little freshen up of the readme's
2026-05-07 16:38:36 +00:00

125 lines
3.9 KiB
Python

#!/usr/bin/env python3
"""
Minimal ROS2 camera publisher using GStreamer v4l2src.
Publishes sensor_msgs/Image to /<CAMERA_NAMESPACE>/image_raw.
No OpenCV or cv_bridge required.
"""
import os
import threading
import gi
gi.require_version('Gst', '1.0')
from gi.repository import Gst, GLib
import rclpy
from rclpy.executors import SingleThreadedExecutor
from rclpy.node import Node
from rclpy.qos import QoSProfile, ReliabilityPolicy, HistoryPolicy
from sensor_msgs.msg import Image
DEVICE = os.environ.get('VIDEO_DEVICE', '/dev/video0')
WIDTH = int(os.environ.get('WIDTH', '640'))
HEIGHT = int(os.environ.get('HEIGHT', '480'))
FPS = int(os.environ.get('FPS', '30'))
FORMAT = os.environ.get('CAMERA_FORMAT', 'raw').lower()
NAMESPACE = os.environ.get('CAMERA_NAMESPACE', '/camera')
TOPIC = f'{NAMESPACE}/image_raw'
class CameraPublisher(Node):
def __init__(self):
# DDS must initialise before GStreamer — same constraint as webrtc_streamer.
super().__init__('camera_publisher')
qos = QoSProfile(
# reliability=ReliabilityPolicy.BEST_EFFORT,
history=HistoryPolicy.KEEP_LAST,
depth=1,
)
self.pub = self.create_publisher(Image, TOPIC, qos)
self.get_logger().info(f'Publishing to {TOPIC}')
Gst.init(None)
glib_loop = GLib.MainLoop()
threading.Thread(target=glib_loop.run, daemon=True).start()
if FORMAT == 'mjpeg':
src = (
f'v4l2src device={DEVICE} ! '
f'image/jpeg,width={WIDTH},height={HEIGHT},framerate={FPS}/1 ! '
'jpegdec ! '
)
else:
src = (
f'v4l2src device={DEVICE} ! '
f'video/x-raw,width={WIDTH},height={HEIGHT},framerate={FPS}/1 ! '
)
desc = src + 'videoconvert ! video/x-raw,format=BGR ! appsink name=sink emit-signals=true sync=false max-buffers=1 drop=true'
self.get_logger().info(f'Pipeline: {desc}')
self._pipeline = Gst.parse_launch(desc)
sink = self._pipeline.get_by_name('sink')
sink.connect('new-sample', self._on_frame)
self._bus = self._pipeline.get_bus()
self._bus.add_signal_watch()
self._bus.connect('message::error', self._on_error)
self._bus.connect('message::warning', self._on_warning)
self._pipeline.set_state(Gst.State.PLAYING)
self._frame_count = 0
def _on_error(self, bus, msg):
err, debug = msg.parse_error()
self.get_logger().error(f'GStreamer error: {err.message}')
if debug:
self.get_logger().error(f'GStreamer debug: {debug}')
def _on_warning(self, bus, msg):
self.get_logger().warning(f'GStreamer warning: {msg.parse_warning()[0].message}')
def _on_frame(self, sink):
sample = sink.emit('pull-sample')
buf = sample.get_buffer()
caps = sample.get_caps().get_structure(0)
w = caps.get_value('width')
h = caps.get_value('height')
msg = Image()
msg.header.stamp = self.get_clock().now().to_msg()
msg.header.frame_id = 'camera'
msg.height = h
msg.width = w
msg.encoding = 'bgr8'
msg.step = w * 3
msg.is_bigendian = False
msg.data = buf.extract_dup(0, buf.get_size())
self.pub.publish(msg)
self._frame_count += 1
if self._frame_count % 100 == 0:
self.get_logger().info(f'Frames published: {self._frame_count}')
return Gst.FlowReturn.OK
def main():
rclpy.init()
node = CameraPublisher()
executor = SingleThreadedExecutor()
executor.add_node(node)
try:
executor.spin()
finally:
node._pipeline.set_state(Gst.State.NULL)
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()