Files
ros-raspbot-v2/webui/backend/ros_node.py
T
2026-05-01 14:28:50 +00:00

139 lines
4.8 KiB
Python

import threading
import xml.etree.ElementTree as ET
import rclpy
from geometry_msgs.msg import Twist
from rclpy.node import Node
from rclpy.qos import DurabilityPolicy, QoSProfile, ReliabilityPolicy
from sensor_msgs.msg import JointState, Range
from std_msgs.msg import ColorRGBA, String
_TRANSIENT_LOCAL_QOS = QoSProfile(
reliability=ReliabilityPolicy.RELIABLE,
durability=DurabilityPolicy.TRANSIENT_LOCAL,
depth=1,
)
def _parse_joint_limits(urdf_xml: str) -> dict:
"""Extract lower/upper limits for pan and tilt joints from a URDF string."""
try:
root = ET.fromstring(urdf_xml)
limits = {}
for joint in root.findall('joint'):
name = joint.get('name')
if name in ('pan', 'tilt'):
el = joint.find('limit')
if el is not None:
limits[name] = {
'lower': float(el.get('lower', 0)),
'upper': float(el.get('upper', 0)),
}
return limits
except Exception as e:
print(f'Failed to parse URDF joint limits: {e}')
return {}
class RobotBridgeNode(Node):
def __init__(self):
super().__init__('webui_bridge')
self._cmd_vel_pub = self.create_publisher(Twist, '/cmd_vel', 10)
self._joint_cmd_pub = self.create_publisher(JointState, '/joint_command', 10)
self._led_color_pub = self.create_publisher(ColorRGBA, '/led/color', 10)
self._led_effect_pub = self.create_publisher(String, '/led/effect', 10)
self.create_subscription(JointState, '/joint_states', self._on_joint_states, 10)
self.create_subscription(Range, '/ultrasonic/range', self._on_ultrasonic, 10)
self.create_subscription(String, '/robot_description', self._on_robot_description,
_TRANSIENT_LOCAL_QOS)
self.on_joint_states_callbacks: list = []
self.on_ultrasonic_callbacks: list = []
self.on_joint_limits_callbacks: list = []
self._joint_limits: dict | None = None
# ------------------------------------------------------------------
# Publishers
# ------------------------------------------------------------------
def publish_cmd_vel(self, linear_x: float, angular_z: float) -> None:
msg = Twist()
msg.linear.x = float(linear_x)
msg.angular.z = float(angular_z)
self._cmd_vel_pub.publish(msg)
def publish_joint_command(self, pan: float, tilt: float) -> None:
msg = JointState()
msg.header.stamp = self.get_clock().now().to_msg()
msg.name = ['pan', 'tilt']
msg.position = [float(pan), float(tilt)]
self._joint_cmd_pub.publish(msg)
def publish_led_color(self, r: float, g: float, b: float, a: float) -> None:
msg = ColorRGBA(r=float(r), g=float(g), b=float(b), a=float(a))
self._led_color_pub.publish(msg)
def publish_led_effect(self, effect: str) -> None:
self._led_effect_pub.publish(String(data=effect))
# ------------------------------------------------------------------
# Accessors
# ------------------------------------------------------------------
def get_joint_limits(self) -> dict | None:
return self._joint_limits
# ------------------------------------------------------------------
# Subscribers
# ------------------------------------------------------------------
def _on_joint_states(self, msg: JointState) -> None:
data = {'type': 'joint_states', 'positions': dict(zip(msg.name, msg.position))}
for cb in self.on_joint_states_callbacks:
cb(data)
def _on_ultrasonic(self, msg: Range) -> None:
data = {'type': 'ultrasonic', 'range': msg.range}
for cb in self.on_ultrasonic_callbacks:
cb(data)
def _on_robot_description(self, msg: String) -> None:
limits = _parse_joint_limits(msg.data)
if not limits:
self.get_logger().warn('No pan/tilt limits found in URDF — check joint names.')
return
self._joint_limits = limits
data = {'type': 'joint_limits', 'limits': limits}
for cb in self.on_joint_limits_callbacks:
cb(data)
_node: RobotBridgeNode | None = None
_spin_thread: threading.Thread | None = None
def start_ros() -> RobotBridgeNode:
global _node, _spin_thread
rclpy.init()
_node = RobotBridgeNode()
_spin_thread = threading.Thread(target=rclpy.spin, args=(_node,), daemon=True)
_spin_thread.start()
return _node
def get_node() -> RobotBridgeNode:
assert _node is not None, 'ROS node not started'
return _node
def stop_ros() -> None:
global _node
try:
if _node:
_node.destroy_node()
rclpy.shutdown()
except Exception:
pass