Initial web ui and control for robot
This commit is contained in:
@@ -0,0 +1,76 @@
|
||||
import threading
|
||||
|
||||
import rclpy
|
||||
from geometry_msgs.msg import Twist
|
||||
from rclpy.node import Node
|
||||
from sensor_msgs.msg import JointState, Range
|
||||
|
||||
|
||||
class RobotBridgeNode(Node):
|
||||
def __init__(self):
|
||||
super().__init__('webui_bridge')
|
||||
|
||||
self._cmd_vel_pub = self.create_publisher(Twist, '/cmd_vel', 10)
|
||||
self._joint_cmd_pub = self.create_publisher(JointState, '/joint_command', 10)
|
||||
|
||||
self.create_subscription(JointState, '/joint_states', self._on_joint_states, 10)
|
||||
self.create_subscription(Range, '/ultrasonic/range', self._on_ultrasonic, 10)
|
||||
|
||||
self.on_joint_states_callbacks: list = []
|
||||
self.on_ultrasonic_callbacks: list = []
|
||||
|
||||
def publish_cmd_vel(self, linear_x: float, angular_z: float) -> None:
|
||||
# print(f'Publishing cmd_vel: linear_x={linear_x}, angular_z={angular_z}')
|
||||
msg = Twist()
|
||||
msg.linear.x = float(linear_x)
|
||||
msg.angular.z = float(angular_z)
|
||||
try:
|
||||
self._cmd_vel_pub.publish(msg)
|
||||
except Exception as e:
|
||||
print('Failed to publish cmd_vel:', e)
|
||||
# self._cmd_vel_pub.publish(msg)
|
||||
|
||||
def publish_joint_command(self, pan: float, tilt: float) -> None:
|
||||
msg = JointState()
|
||||
msg.header.stamp = self.get_clock().now().to_msg()
|
||||
msg.name = ['pan', 'tilt']
|
||||
msg.position = [float(pan), float(tilt)]
|
||||
self._joint_cmd_pub.publish(msg)
|
||||
|
||||
def _on_joint_states(self, msg: JointState) -> None:
|
||||
data = {'type': 'joint_states', 'positions': dict(zip(msg.name, msg.position))}
|
||||
for cb in self.on_joint_states_callbacks:
|
||||
cb(data)
|
||||
|
||||
def _on_ultrasonic(self, msg: Range) -> None:
|
||||
data = {'type': 'ultrasonic', 'range': msg.range}
|
||||
for cb in self.on_ultrasonic_callbacks:
|
||||
cb(data)
|
||||
|
||||
|
||||
_node: RobotBridgeNode | None = None
|
||||
_spin_thread: threading.Thread | None = None
|
||||
|
||||
|
||||
def start_ros() -> RobotBridgeNode:
|
||||
global _node, _spin_thread
|
||||
rclpy.init()
|
||||
_node = RobotBridgeNode()
|
||||
_spin_thread = threading.Thread(target=rclpy.spin, args=(_node,), daemon=True)
|
||||
_spin_thread.start()
|
||||
return _node
|
||||
|
||||
|
||||
def get_node() -> RobotBridgeNode:
|
||||
assert _node is not None, "ROS node not started"
|
||||
return _node
|
||||
|
||||
|
||||
def stop_ros() -> None:
|
||||
global _node
|
||||
try:
|
||||
if _node:
|
||||
_node.destroy_node()
|
||||
rclpy.shutdown()
|
||||
except Exception:
|
||||
pass
|
||||
Reference in New Issue
Block a user