ec554bcf2c
Uses the mecanum controller properly across physical and virtual There is a timing issue with i2c which is why the control update is limited to 10hz The sonar and LED's arent yet working, this will come soon.
140 lines
4.8 KiB
Python
140 lines
4.8 KiB
Python
import threading
|
|
import xml.etree.ElementTree as ET
|
|
|
|
import rclpy
|
|
from geometry_msgs.msg import TwistStamped
|
|
from rclpy.node import Node
|
|
from rclpy.qos import DurabilityPolicy, QoSProfile, ReliabilityPolicy
|
|
from sensor_msgs.msg import JointState, Range
|
|
from std_msgs.msg import ColorRGBA, String
|
|
|
|
_TRANSIENT_LOCAL_QOS = QoSProfile(
|
|
reliability=ReliabilityPolicy.RELIABLE,
|
|
durability=DurabilityPolicy.TRANSIENT_LOCAL,
|
|
depth=1,
|
|
)
|
|
|
|
|
|
def _parse_joint_limits(urdf_xml: str) -> dict:
|
|
"""Extract lower/upper limits for pan and tilt joints from a URDF string."""
|
|
try:
|
|
root = ET.fromstring(urdf_xml)
|
|
limits = {}
|
|
for joint in root.findall('joint'):
|
|
name = joint.get('name')
|
|
if name in ('pan', 'tilt'):
|
|
el = joint.find('limit')
|
|
if el is not None:
|
|
limits[name] = {
|
|
'lower': float(el.get('lower', 0)),
|
|
'upper': float(el.get('upper', 0)),
|
|
}
|
|
return limits
|
|
except Exception as e:
|
|
print(f'Failed to parse URDF joint limits: {e}')
|
|
return {}
|
|
|
|
|
|
class RobotBridgeNode(Node):
|
|
def __init__(self):
|
|
super().__init__('webui_bridge')
|
|
|
|
self._cmd_vel_pub = self.create_publisher(TwistStamped, '/cmd_vel', 10)
|
|
self._joint_cmd_pub = self.create_publisher(JointState, '/joint_command', 10)
|
|
self._led_color_pub = self.create_publisher(ColorRGBA, '/led/color', 10)
|
|
self._led_effect_pub = self.create_publisher(String, '/led/effect', 10)
|
|
|
|
self.create_subscription(JointState, '/joint_states', self._on_joint_states, 10)
|
|
self.create_subscription(Range, '/ultrasonic/range', self._on_ultrasonic, 10)
|
|
self.create_subscription(String, '/robot_description', self._on_robot_description,
|
|
_TRANSIENT_LOCAL_QOS)
|
|
|
|
self.on_joint_states_callbacks: list = []
|
|
self.on_ultrasonic_callbacks: list = []
|
|
self.on_joint_limits_callbacks: list = []
|
|
self._joint_limits: dict | None = None
|
|
|
|
# ------------------------------------------------------------------
|
|
# Publishers
|
|
# ------------------------------------------------------------------
|
|
|
|
def publish_cmd_vel(self, linear_x: float, angular_z: float) -> None:
|
|
msg = TwistStamped()
|
|
msg.header.stamp = self.get_clock().now().to_msg()
|
|
msg.twist.linear.x = float(linear_x)
|
|
msg.twist.angular.z = float(angular_z)
|
|
self._cmd_vel_pub.publish(msg)
|
|
|
|
def publish_joint_command(self, pan: float, tilt: float) -> None:
|
|
msg = JointState()
|
|
msg.header.stamp = self.get_clock().now().to_msg()
|
|
msg.name = ['pan', 'tilt']
|
|
msg.position = [float(pan), float(tilt)]
|
|
self._joint_cmd_pub.publish(msg)
|
|
|
|
def publish_led_color(self, r: float, g: float, b: float, a: float) -> None:
|
|
msg = ColorRGBA(r=float(r), g=float(g), b=float(b), a=float(a))
|
|
self._led_color_pub.publish(msg)
|
|
|
|
def publish_led_effect(self, effect: str) -> None:
|
|
self._led_effect_pub.publish(String(data=effect))
|
|
|
|
# ------------------------------------------------------------------
|
|
# Accessors
|
|
# ------------------------------------------------------------------
|
|
|
|
def get_joint_limits(self) -> dict | None:
|
|
return self._joint_limits
|
|
|
|
# ------------------------------------------------------------------
|
|
# Subscribers
|
|
# ------------------------------------------------------------------
|
|
|
|
def _on_joint_states(self, msg: JointState) -> None:
|
|
data = {'type': 'joint_states', 'positions': dict(zip(msg.name, msg.position))}
|
|
for cb in self.on_joint_states_callbacks:
|
|
cb(data)
|
|
|
|
def _on_ultrasonic(self, msg: Range) -> None:
|
|
data = {'type': 'ultrasonic', 'range': msg.range}
|
|
for cb in self.on_ultrasonic_callbacks:
|
|
cb(data)
|
|
|
|
def _on_robot_description(self, msg: String) -> None:
|
|
limits = _parse_joint_limits(msg.data)
|
|
if not limits:
|
|
self.get_logger().warn('No pan/tilt limits found in URDF — check joint names.')
|
|
return
|
|
self._joint_limits = limits
|
|
data = {'type': 'joint_limits', 'limits': limits}
|
|
for cb in self.on_joint_limits_callbacks:
|
|
cb(data)
|
|
|
|
|
|
_node: RobotBridgeNode | None = None
|
|
_spin_thread: threading.Thread | None = None
|
|
|
|
|
|
def start_ros() -> RobotBridgeNode:
|
|
global _node, _spin_thread
|
|
rclpy.init()
|
|
_node = RobotBridgeNode()
|
|
_spin_thread = threading.Thread(target=rclpy.spin, args=(_node,), daemon=True)
|
|
_spin_thread.start()
|
|
return _node
|
|
|
|
|
|
def get_node() -> RobotBridgeNode:
|
|
assert _node is not None, 'ROS node not started'
|
|
return _node
|
|
|
|
|
|
def stop_ros() -> None:
|
|
global _node
|
|
try:
|
|
if _node:
|
|
_node.destroy_node()
|
|
rclpy.shutdown()
|
|
except Exception:
|
|
pass
|