Read constraints from the published URDF rather than hard code them.
Also make the webui more responsive for small screens.
This commit is contained in:
+64
-17
@@ -1,11 +1,39 @@
|
||||
import threading
|
||||
import xml.etree.ElementTree as ET
|
||||
|
||||
import rclpy
|
||||
from geometry_msgs.msg import Twist
|
||||
from rclpy.node import Node
|
||||
from rclpy.qos import DurabilityPolicy, QoSProfile, ReliabilityPolicy
|
||||
from sensor_msgs.msg import JointState, Range
|
||||
from std_msgs.msg import ColorRGBA, String
|
||||
|
||||
_TRANSIENT_LOCAL_QOS = QoSProfile(
|
||||
reliability=ReliabilityPolicy.RELIABLE,
|
||||
durability=DurabilityPolicy.TRANSIENT_LOCAL,
|
||||
depth=1,
|
||||
)
|
||||
|
||||
|
||||
def _parse_joint_limits(urdf_xml: str) -> dict:
|
||||
"""Extract lower/upper limits for pan and tilt joints from a URDF string."""
|
||||
try:
|
||||
root = ET.fromstring(urdf_xml)
|
||||
limits = {}
|
||||
for joint in root.findall('joint'):
|
||||
name = joint.get('name')
|
||||
if name in ('pan', 'tilt'):
|
||||
el = joint.find('limit')
|
||||
if el is not None:
|
||||
limits[name] = {
|
||||
'lower': float(el.get('lower', 0)),
|
||||
'upper': float(el.get('upper', 0)),
|
||||
}
|
||||
return limits
|
||||
except Exception as e:
|
||||
print(f'Failed to parse URDF joint limits: {e}')
|
||||
return {}
|
||||
|
||||
|
||||
class RobotBridgeNode(Node):
|
||||
def __init__(self):
|
||||
@@ -16,22 +44,25 @@ class RobotBridgeNode(Node):
|
||||
self._led_color_pub = self.create_publisher(ColorRGBA, '/led/color', 10)
|
||||
self._led_effect_pub = self.create_publisher(String, '/led/effect', 10)
|
||||
|
||||
self.create_subscription(JointState, '/joint_states', self._on_joint_states, 10)
|
||||
self.create_subscription(Range, '/ultrasonic/range', self._on_ultrasonic, 10)
|
||||
self.create_subscription(JointState, '/joint_states', self._on_joint_states, 10)
|
||||
self.create_subscription(Range, '/ultrasonic/range', self._on_ultrasonic, 10)
|
||||
self.create_subscription(String, '/robot_description', self._on_robot_description,
|
||||
_TRANSIENT_LOCAL_QOS)
|
||||
|
||||
self.on_joint_states_callbacks: list = []
|
||||
self.on_ultrasonic_callbacks: list = []
|
||||
self.on_joint_limits_callbacks: list = []
|
||||
self._joint_limits: dict | None = None
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Publishers
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
def publish_cmd_vel(self, linear_x: float, angular_z: float) -> None:
|
||||
# print(f'Publishing cmd_vel: linear_x={linear_x}, angular_z={angular_z}')
|
||||
msg = Twist()
|
||||
msg.linear.x = float(linear_x)
|
||||
msg.angular.z = float(angular_z)
|
||||
try:
|
||||
self._cmd_vel_pub.publish(msg)
|
||||
except Exception as e:
|
||||
print('Failed to publish cmd_vel:', e)
|
||||
# self._cmd_vel_pub.publish(msg)
|
||||
self._cmd_vel_pub.publish(msg)
|
||||
|
||||
def publish_joint_command(self, pan: float, tilt: float) -> None:
|
||||
msg = JointState()
|
||||
@@ -41,17 +72,22 @@ class RobotBridgeNode(Node):
|
||||
self._joint_cmd_pub.publish(msg)
|
||||
|
||||
def publish_led_color(self, r: float, g: float, b: float, a: float) -> None:
|
||||
msg = ColorRGBA()
|
||||
msg.r = float(r)
|
||||
msg.g = float(g)
|
||||
msg.b = float(b)
|
||||
msg.a = float(a)
|
||||
msg = ColorRGBA(r=float(r), g=float(g), b=float(b), a=float(a))
|
||||
self._led_color_pub.publish(msg)
|
||||
|
||||
def publish_led_effect(self, effect: str) -> None:
|
||||
msg = String()
|
||||
msg.data = effect
|
||||
self._led_effect_pub.publish(msg)
|
||||
self._led_effect_pub.publish(String(data=effect))
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Accessors
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
def get_joint_limits(self) -> dict | None:
|
||||
return self._joint_limits
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Subscribers
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
def _on_joint_states(self, msg: JointState) -> None:
|
||||
data = {'type': 'joint_states', 'positions': dict(zip(msg.name, msg.position))}
|
||||
@@ -63,6 +99,17 @@ class RobotBridgeNode(Node):
|
||||
for cb in self.on_ultrasonic_callbacks:
|
||||
cb(data)
|
||||
|
||||
def _on_robot_description(self, msg: String) -> None:
|
||||
limits = _parse_joint_limits(msg.data)
|
||||
self.get_logger().info(f'Received /robot_description, parsed limits: {limits}')
|
||||
if not limits:
|
||||
self.get_logger().warn('No pan/tilt limits found in URDF — check joint names.')
|
||||
return
|
||||
self._joint_limits = limits
|
||||
data = {'type': 'joint_limits', 'limits': limits}
|
||||
for cb in self.on_joint_limits_callbacks:
|
||||
cb(data)
|
||||
|
||||
|
||||
_node: RobotBridgeNode | None = None
|
||||
_spin_thread: threading.Thread | None = None
|
||||
@@ -78,7 +125,7 @@ def start_ros() -> RobotBridgeNode:
|
||||
|
||||
|
||||
def get_node() -> RobotBridgeNode:
|
||||
assert _node is not None, "ROS node not started"
|
||||
assert _node is not None, 'ROS node not started'
|
||||
return _node
|
||||
|
||||
|
||||
|
||||
Reference in New Issue
Block a user