Add contol node for the LEDs
This commit is contained in:
@@ -30,10 +30,10 @@ def generate_launch_description():
|
||||
description='Ultrasonic sensor publish rate (Hz)'),
|
||||
|
||||
# ── Camera orientation arguments ──────────────────────────────────
|
||||
DeclareLaunchArgument('pan_center_deg', default_value='90.0',
|
||||
description='Pan angle at startup and shutdown (degrees)'),
|
||||
DeclareLaunchArgument('tilt_center_deg', default_value='60.0',
|
||||
description='Tilt angle at startup and shutdown (degrees)'),
|
||||
DeclareLaunchArgument('pan_center_deg', default_value='0.0',
|
||||
description='Pan angle at startup and shutdown (degrees, ROS convention)'),
|
||||
DeclareLaunchArgument('tilt_center_deg', default_value='-15.0',
|
||||
description='Tilt angle at startup and shutdown (degrees, ROS convention)'),
|
||||
|
||||
# ── TF / robot description ────────────────────────────────────────
|
||||
Node(
|
||||
@@ -95,4 +95,11 @@ def generate_launch_description():
|
||||
output='screen',
|
||||
),
|
||||
|
||||
Node(
|
||||
package='raspbot_v2',
|
||||
executable='led_controller',
|
||||
name='led_controller',
|
||||
output='screen',
|
||||
),
|
||||
|
||||
])
|
||||
|
||||
@@ -0,0 +1,136 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
LED bar controller node.
|
||||
|
||||
Controls the 14-LED WS2812 bar on the Raspbot via two topics:
|
||||
|
||||
Topics
|
||||
------
|
||||
/led/color (std_msgs/ColorRGBA)
|
||||
Set all LEDs to a solid RGB colour. r/g/b are 0.0–1.0; a=0.0 turns
|
||||
the bar off. Publishing this topic also cancels any running effect.
|
||||
|
||||
/led/effect (std_msgs/String)
|
||||
Start a named light effect (runs until a new command arrives):
|
||||
river | breathing | gradient | random_running | starlight | off
|
||||
|
||||
Parameters
|
||||
----------
|
||||
effect_speed float default 0.05 – delay between effect frames (s)
|
||||
effect_color int default 0 – colour index for breathing effect
|
||||
0=red 1=green 2=blue 3=yellow
|
||||
4=purple 5=cyan 6=white
|
||||
"""
|
||||
|
||||
import threading
|
||||
|
||||
import rclpy
|
||||
from rclpy.node import Node
|
||||
from std_msgs.msg import ColorRGBA, String
|
||||
|
||||
VALID_EFFECTS = {'river', 'breathing', 'gradient', 'random_running', 'starlight', 'off'}
|
||||
|
||||
|
||||
class LedNode(Node):
|
||||
|
||||
def __init__(self):
|
||||
super().__init__('led_controller')
|
||||
|
||||
self.declare_parameter('effect_speed', 0.05)
|
||||
self.declare_parameter('effect_color', 0)
|
||||
|
||||
from raspbot_v2_interface.Raspbot_Lib import Raspbot, LightShow
|
||||
self._Raspbot = Raspbot
|
||||
self._LightShow = LightShow
|
||||
self._bot = Raspbot()
|
||||
|
||||
self._light_show: object | None = None
|
||||
self._effect_thread: threading.Thread | None = None
|
||||
|
||||
self.create_subscription(ColorRGBA, 'led/color', self._color_cb, 10)
|
||||
self.create_subscription(String, 'led/effect', self._effect_cb, 10)
|
||||
|
||||
self._bot.Ctrl_WQ2812_ALL(0, 0)
|
||||
self.get_logger().info('LED controller started (14 LEDs, bar off).')
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Callbacks
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
def _color_cb(self, msg: ColorRGBA) -> None:
|
||||
self._stop_effect()
|
||||
if msg.a == 0.0:
|
||||
self._bot.Ctrl_WQ2812_ALL(0, 0)
|
||||
else:
|
||||
r = max(0, min(255, int(msg.r * 255)))
|
||||
g = max(0, min(255, int(msg.g * 255)))
|
||||
b = max(0, min(255, int(msg.b * 255)))
|
||||
self._bot.Ctrl_WQ2812_brightness_ALL(r, g, b)
|
||||
|
||||
def _effect_cb(self, msg: String) -> None:
|
||||
effect = msg.data.strip()
|
||||
if effect not in VALID_EFFECTS:
|
||||
self.get_logger().warn(
|
||||
f"Unknown effect '{effect}'. Valid: {', '.join(sorted(VALID_EFFECTS))}"
|
||||
)
|
||||
return
|
||||
|
||||
self._stop_effect()
|
||||
|
||||
if effect == 'off':
|
||||
self._bot.Ctrl_WQ2812_ALL(0, 0)
|
||||
return
|
||||
|
||||
speed = self.get_parameter('effect_speed').value
|
||||
color = self.get_parameter('effect_color').value
|
||||
|
||||
ls = self._LightShow()
|
||||
self._light_show = ls
|
||||
|
||||
def _run():
|
||||
ls.execute_effect(effect, ls.MAX_TIME, speed, color)
|
||||
|
||||
self._effect_thread = threading.Thread(target=_run, daemon=True, name=f'led_{effect}')
|
||||
self._effect_thread.start()
|
||||
self.get_logger().info(f"LED effect '{effect}' started.")
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Helpers
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
def _stop_effect(self) -> None:
|
||||
if self._light_show is not None:
|
||||
self._light_show.stop()
|
||||
if self._effect_thread is not None and self._effect_thread.is_alive():
|
||||
self._effect_thread.join(timeout=2.0)
|
||||
self._light_show = None
|
||||
self._effect_thread = None
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Shutdown
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
def destroy_node(self) -> None:
|
||||
self.get_logger().info('Shutting down LED controller — turning off bar.')
|
||||
self._stop_effect()
|
||||
try:
|
||||
self._bot.Ctrl_WQ2812_ALL(0, 0)
|
||||
except Exception:
|
||||
pass
|
||||
super().destroy_node()
|
||||
|
||||
|
||||
def main(args=None):
|
||||
rclpy.init(args=args)
|
||||
node = LedNode()
|
||||
try:
|
||||
rclpy.spin(node)
|
||||
except KeyboardInterrupt:
|
||||
pass
|
||||
finally:
|
||||
node.destroy_node()
|
||||
rclpy.shutdown()
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
@@ -19,6 +19,7 @@ setup(
|
||||
'motor_controller = raspbot_v2.motor_controller_node:main',
|
||||
'camera_orientation = raspbot_v2.camera_orientation_node:main',
|
||||
'ultrasonic = raspbot_v2.ultrasonic_node:main',
|
||||
'led_controller = raspbot_v2.led_node:main',
|
||||
],
|
||||
},
|
||||
)
|
||||
Reference in New Issue
Block a user