Add lider ROS node and move robot control into its own directory

This commit is contained in:
2026-04-21 11:39:01 +00:00
parent 1d49e45240
commit f5ea157e21
28 changed files with 221 additions and 31 deletions
+46
View File
@@ -0,0 +1,46 @@
# syntax=docker/dockerfile:1
# ── Stage 1: build ────────────────────────────────────────────────────────────
FROM ros:kilted AS builder
SHELL ["/bin/bash", "-c"]
RUN apt-get update && apt-get install -y --no-install-recommends \
python3-colcon-common-extensions \
&& rm -rf /var/lib/apt/lists/*
WORKDIR /ws
# Copy the ROS package into the standard colcon src/ layout and build it
COPY robot/src/raspbot_v2/ src/raspbot_v2/
RUN source /opt/ros/${ROS_DISTRO}/setup.bash && \
colcon build --packages-select raspbot_v2
# ── Stage 2: runtime ──────────────────────────────────────────────────────────
FROM ros:kilted-ros-core
RUN apt-get update && apt-get install -y --no-install-recommends \
ros-${ROS_DISTRO}-rclpy \
ros-${ROS_DISTRO}-geometry-msgs \
ros-${ROS_DISTRO}-std-msgs \
ros-${ROS_DISTRO}-sensor-msgs \
ros-${ROS_DISTRO}-launch-ros \
python3-smbus \
&& rm -rf /var/lib/apt/lists/*
# Install the Raspbot hardware library directly into site-packages
COPY robot/raspbot_v2_interface/ /usr/local/lib/python3.12/dist-packages/raspbot_v2_interface/
# Bring across the built ROS overlay
COPY --from=builder /ws/install /ws/install
# Source both ROS base and the workspace overlay on every shell/exec
RUN echo "source /opt/ros/${ROS_DISTRO}/setup.bash" >> /etc/bash.bashrc && \
echo "source /ws/install/setup.bash" >> /etc/bash.bashrc
COPY docker-entrypoint.sh /docker-entrypoint.sh
RUN chmod +x /docker-entrypoint.sh
ENTRYPOINT ["/docker-entrypoint.sh"]
CMD ["ros2", "launch", "raspbot_v2", "robot.launch.py"]
+5
View File
@@ -0,0 +1,5 @@
## Installation Steps (安装步骤)
cd py_install
sudo python3 setup.py install
@@ -0,0 +1,480 @@
#!/usr/bin/env python3
# coding: utf-8
import smbus
import time,random
import math
PI5Car_I2CADDR = 0x2B
class Raspbot():
def get_i2c_device(self, address, i2c_bus):
self._addr = address
if i2c_bus is None:
return smbus.SMBus(1)
else:
return smbus.SMBus(i2c_bus)
def __init__(self):
# Create I2C device.
self._device = self.get_i2c_device(PI5Car_I2CADDR, 0)
#写数据
def write_u8(self, reg, data):
try:
self._device.write_byte_data(self._addr, reg, data)
except:
print ('write_u8 I2C error')
def write_reg(self, reg):
try:
self._device.write_byte(self._addr, reg)
except:
print ('write_u8 I2C error')
def write_array(self, reg, data):
try:
# self._device.write_block_data(self._addr, reg, data)
self._device.write_i2c_block_data(self._addr, reg, data)
except:
print ('write_array I2C error')
#读数据
def read_data_byte(self):
try:
buf = self._device.write_byte(self._addr)
return buf
except:
print ('read_u8 I2C error')
def read_data_array(self,reg,len):
try:
buf = self._device.read_i2c_block_data(self._addr,reg,len)
return buf
except:
print ('read_u8 I2C error')
#控制电机
def Ctrl_Car(self, motor_id, motor_dir,motor_speed):
try:
if(motor_dir !=1)and(motor_dir != 0): #参数非法,方向默认前进
motor_dir = 0
if(motor_speed>255):
motor_speed = 255
elif(motor_speed<0):
motor_speed = 0
reg = 0x01
data = [motor_id, motor_dir, motor_speed]
self.write_array(reg, data)
except:
print ('Ctrl_Car I2C error')
#控制电机正反(-255~255)
def Ctrl_Muto(self, motor_id, motor_speed):
try:
if(motor_speed>255):
motor_speed = 255
if(motor_speed<-255):
motor_speed = -255
if(motor_speed < 0 and motor_speed >= -255): #速度如果是负数则后退
motor_dir = 1
else:motor_dir = 0
reg = 0x01
data = [motor_id, motor_dir, abs(motor_speed)]
self.write_array(reg, data)
except:
print ('Ctrl_Car I2C error')
#控制舵机
def Ctrl_Servo(self, id, angle):
try:
reg = 0x02
data = [id, angle]
if angle < 0:
angle = 0
elif angle > 180:
angle = 180
if(id==2 and angle > 100):angle = 100
self.write_array(reg, data)
except:
print ('Ctrl_Servo I2C error')
#控制灯珠(全部)
def Ctrl_WQ2812_ALL(self, state, color):
try:
reg = 0x03
data = [state, color]
if state < 0:
state = 0
elif state > 1:
state = 1
self.write_array(reg, data)
except:
print ('Ctrl_WQ2812 I2C error')
#单独控制灯珠
def Ctrl_WQ2812_Alone(self, number,state, color):
try:
reg = 0x04
data = [number,state, color]
if state < 0:
state = 0
elif state > 1:
state = 1
self.write_array(reg, data)
except:
print ('Ctrl_WQ2812_Alone I2C error')
#控制亮度(全部)
def Ctrl_WQ2812_brightness_ALL(self, R, G, B):
try:
reg = 0x08
data = [R,G,B]
if R >255:
R =255
if G > 255:
G = 255
if B >255:
B=255
self.write_array(reg, data)
except:
print ('Ctrl_WQ2812 I2C error')
#单独灯珠亮度
def Ctrl_WQ2812_brightness_Alone(self, number, R, G, B):
try:
reg = 0x09
data = [number,R,G,B]
if R >255:
R =255
if G > 255:
G = 255
if B >255:
B=255
self.write_array(reg, data)
except:
print ('Ctrl_WQ2812_Alone I2C error')
#控制红外遥控器开关
def Ctrl_IR_Switch(self, state):
try:
reg = 0x05
data = [state]
if state < 0:
state = 0
elif state > 1:
state = 1
self.write_array(reg, data)
except:
print ('Ctrl_IR_Switch I2C error')
#控制蜂鸣器开关
def Ctrl_BEEP_Switch(self, state):
try:
reg = 0x06
data = [state]
if state < 0:
state = 0
elif state > 1:
state = 1
self.write_array(reg, data)
except:
print ('Ctrl_BEEP_Switch I2C error')
#控制超声波测距开关
def Ctrl_Ulatist_Switch(self, state):
try:
reg = 0x07
data = [state]
if state < 0:
state = 0
elif state > 1:
state = 1
self.write_array(reg, data)
except:
print ('Ctrl_getDis_Switch I2C error')
#控制灯珠特效
class LightShow:
def __init__(self):
self.num_lights = 14
self.last_val = 0
self.MAX_TIME=999999
self.bot=Raspbot()
self.running = True
def execute_effect(self, effect_name,effect_duration,speed,current_color):
try:
if effect_name == 'river':
self.run_river_light(effect_duration,speed)
elif effect_name == 'breathing':
self.breathing_light(effect_duration,speed,current_color)
elif effect_name == 'gradient':
self.gradient_light(effect_duration,speed)
elif effect_name == 'random_running':
self.random_running_light(effect_duration,speed)
elif effect_name == 'starlight':
self.starlight_shimmer(effect_duration,speed)
else:
print("Unknown effect name.")
except KeyboardInterrupt:
self.turn_off_all_lights()
self.running = False
def turn_off_all_lights(self):
self.bot.Ctrl_WQ2812_ALL(0, 0)
def run_river_light(self,effect_duration,speed):
# speed = 0.01
colors = [0, 1, 2, 3, 4, 5, 6]
color_index = 0
end_time = time.time()
while self.running and time.time() - end_time < effect_duration:
for i in range(self.num_lights - 2):
self.bot.Ctrl_WQ2812_Alone(i, 1, colors[color_index])
self.bot.Ctrl_WQ2812_Alone(i+1, 1, colors[color_index])
self.bot.Ctrl_WQ2812_Alone(i+2, 1, colors[color_index])
time.sleep(speed)
self.bot.Ctrl_WQ2812_ALL(0, 0)
time.sleep(speed)
color_index = (color_index + 1) % len(colors)
self.turn_off_all_lights()
def breathing_light(self, effect_duration,speed,current_color):
breath_direction = 0
breath_count = 0
end_time = time.time()
while self.running and time.time() - end_time < effect_duration:
if current_color == 0: # Red
r, g, b = breath_count, 0, 0
elif current_color == 1: # Green
r, g, b = 0, breath_count, 0
elif current_color == 2: # Blue
r, g, b = 0, 0, breath_count
elif current_color == 3: # Yellow
r, g, b = breath_count, breath_count, 0
elif current_color == 4: # Purple
r, g, b = breath_count, 0, breath_count
elif current_color == 5: # Cyan
r, g, b = 0, breath_count, breath_count
elif current_color == 6: # White
r, g, b = breath_count, breath_count, breath_count
else:
r, g, b = 0, 0, 0 # Default to black if invalid color code
self.bot.Ctrl_WQ2812_brightness_ALL(r, g, b)
time.sleep(speed)
if breath_direction == 0:
breath_count += 2
if breath_count >= 255:
breath_count=255
breath_direction = 1
else:
breath_count -= 2
if breath_count < 0:
breath_direction = 0
breath_count = 0
self.turn_off_all_lights()
def random_running_light(self,effect_duration,speed):
# on_time = 0.01
# off_time = 0.01
end_time = time.time()
while self.running and time.time() - end_time < effect_duration:
for i in range(self.num_lights):
color = random.randint(0, 6)
self.bot.Ctrl_WQ2812_Alone(i, 1, color)
time.sleep(speed)
self.turn_off_all_lights()
def starlight_shimmer(self,effect_duration,speed):
min_lights_on = 1
max_lights_on = 7
colors = [0, 1, 2, 3, 4, 5, 6]
end_time = time.time()
while self.running and time.time() - end_time < effect_duration:
for color in colors:
start_time = time.time()
while time.time() - start_time < 1:
for i in range(self.num_lights):
self.bot.Ctrl_WQ2812_Alone(i, 0, 0)
lights_on = random.sample(range(self.num_lights), k=random.randint(min_lights_on, max_lights_on))
for i in lights_on:
self.bot.Ctrl_WQ2812_Alone(i, 1, color)
time.sleep(speed)
for i in range(self.num_lights):
self.bot.Ctrl_WQ2812_Alone(i, 0, 0)
self.turn_off_all_lights()
def gradient_light(self, effect_duration, speed):
grad_color = 0
grad_index = 0
end_time = time.time()
while self.running and time.time() - end_time < effect_duration:
if grad_color % 2 == 0:
gt_red = random.randint(0, 255)
gt_green = random.randint(0, 255)
gt_blue = random.randint(0, 255)
gt_green = self.rgb_remix(gt_green)
gt_red, gt_green, gt_blue = self.rgb_remix_u8(gt_red, gt_green, gt_blue)
grad_color += 1
if grad_color == 1:
if grad_index < 14:
number = (grad_index % 14) + 1 # Adjusting for 1-based indexing
self.bot.Ctrl_WQ2812_brightness_Alone(number, gt_red, gt_green, gt_blue)
grad_index += 1
if grad_index >= 14:
grad_color = 2
grad_index = 0
elif grad_color == 3:
if grad_index < 14:
number = ((14 - grad_index) % 14) # Reverse mapping, adjusted for 1-based indexing
self.bot.Ctrl_WQ2812_brightness_Alone(number, gt_red, gt_green, gt_blue)
grad_index += 1
if grad_index >= 14:
grad_color = 0
grad_index = 0
time.sleep(speed)
self.turn_off_all_lights()
def rgb_remix(self, val):
if abs(val - self.last_val) < val % 30:
val = (val + self.last_val) % 255
self.last_val = val % 255
return self.last_val
def rgb_remix_u8(self, r, g, b):
if r > 50 and g > 50 and b > 50:
index = random.randint(0, 2)
if index == 0:
r = 0
elif index == 1:
g = 0
elif index == 2:
b = 0
return r, g, b
def calculate_breath_color(self, color_code, breath_count):
max_brightness = 255
if color_code == 0: # Red
return max_brightness, breath_count, 0
elif color_code == 1: # Green
return max_brightness, 0, breath_count
elif color_code == 2: # Blue
return max_brightness, 0, 0, breath_count
elif color_code == 3: # Yellow
return max_brightness, breath_count, breath_count, 0
elif color_code == 4: # Purple
return max_brightness, breath_count, 0, breath_count
elif color_code == 5: # Cyan
return max_brightness, 0, breath_count, breath_count
elif color_code == 6: # White
return max_brightness, breath_count, breath_count, breath_count
else:
return 0, 0, 0 # Default to black if invalid color code
def stop(self):
self.running = False
# test
#car = Raspbot()
#读取巡线传感器地址 ,此值只有1位
# track =car.read_data_array(0x0a,1)
# track = int(track[0])
# x1 = (track>>3)&0x01
# x2 = (track>>2)&0x01
# x3 = (track>>1)&0x01
# x4 = (track)&0x01
# print(track,x1,x2,x3,x4)
# 读取超声传感器地址,此值只有2位
# car.Ctrl_Ulatist_Switch(1)#open
# time.sleep(1)
# diss_H =car.read_data_array(0x1b,1)[0]
# diss_L =car.read_data_array(0x1a,1)[0]
# dis = diss_H<< 8 | diss_L
# print(dis+"mm")
# car.Ctrl_Ulatist_Switch(0)#close
#读取红外遥控的值
# car.Ctrl_IR_Switch(1)
# time.sleep(3)
# data =car.read_data_array(0x0c,1)
# print(data)
# car.Ctrl_IR_Switch(0)
#蜂鸣器测试
# car.Ctrl_BEEP_Switch(1)
# time.sleep(1)
# car.Ctrl_BEEP_Switch(0)
# time.sleep(1)
#电机测试
# car.Ctrl_Car(0,0,150) #L1电机 前进 150速度
# car.Ctrl_Car(1,0,150) #L2电机 前进 150速度
# car.Ctrl_Car(2,0,150) #R1电机 前进 150速度
# car.Ctrl_Car(3,0,150) #R2电机 前进 150速度
# time.sleep(1)
# car.Ctrl_Car(0,1,50) #L1电机 后退 50速度
# time.sleep(1)
# car.Ctrl_Car(0,0,0) #L1电机 停止
# car.Ctrl_Car(1,0,0) #L2电机 停止
# car.Ctrl_Car(2,0,0) #R1电机 停止
# car.Ctrl_Car(3,0,0) #R2电机 停止
#舵机测试
# car.Ctrl_Servo(1,0) #s1 0度
# car.Ctrl_Servo(2,180) #s2 180度
# time.sleep(1)
# car.Ctrl_Servo(1,180) #s1 180度
# car.Ctrl_Servo(2,0) #s2 0度
# time.sleep(1)
#灯条测试
# car.Ctrl_WQ2812_ALL(1,0)#红色
# time.sleep(1)
# car.Ctrl_WQ2812_ALL(1,1)#绿色
# time.sleep(1)
# car.Ctrl_WQ2812_ALL(1,2)#蓝色
# time.sleep(1)
# car.Ctrl_WQ2812_ALL(1,3)#黄色
# time.sleep(1)
# car.Ctrl_WQ2812_ALL(0,0) #关闭
#单个灯测试
# car.Ctrl_WQ2812_Alone(1,1,0)#1号红色
# time.sleep(1)
# car.Ctrl_WQ2812_Alone(2,1,3)#1号黄色
# time.sleep(1)
# car.Ctrl_WQ2812_Alone(1,0,3)#1号关
# time.sleep(1)
# car.Ctrl_WQ2812_Alone(14,1,2)#14号绿色
# time.sleep(1)
# car.Ctrl_WQ2812_ALL(0,0) #关闭
#控制亮度测试 全部
# for i in range(255):
# car.Ctrl_WQ2812_brightness_ALL(i,0,0)
# time.sleep(0.01)
@@ -0,0 +1,488 @@
#!/usr/bin/env python3
# coding: utf-8
import smbus
import time,random
import math
PI5Car_I2CADDR = 0x2B
class Raspbot():
def get_i2c_device(self, address, i2c_bus):
self._addr = address
if i2c_bus is None:
return smbus.SMBus(1)
else:
return smbus.SMBus(i2c_bus)
def __init__(self):
# Create I2C device.
self._device = self.get_i2c_device(PI5Car_I2CADDR, 1)
#写数据
def write_u8(self, reg, data):
try:
self._device.write_byte_data(self._addr, reg, data)
except:
print ('write_u8 I2C error')
def write_reg(self, reg):
try:
self._device.write_byte(self._addr, reg)
except:
print ('write_u8 I2C error')
def write_array(self, reg, data):
try:
# self._device.write_block_data(self._addr, reg, data)
self._device.write_i2c_block_data(self._addr, reg, data)
except:
print ('write_array I2C error')
#读数据
def read_data_byte(self):
try:
buf = self._device.write_byte(self._addr)
return buf
except:
print ('read_u8 I2C error')
def read_data_array(self,reg,len):
try:
buf = self._device.read_i2c_block_data(self._addr,reg,len)
return buf
except:
print ('read_u8 I2C error')
#控制电机
def Ctrl_Car(self, motor_id, motor_dir,motor_speed):
try:
if(motor_dir !=1)and(motor_dir != 0): #参数非法,方向默认前进
motor_dir = 0
if(motor_speed>255):
motor_speed = 255
elif(motor_speed<0):
motor_speed = 0
reg = 0x01
data = [motor_id, motor_dir, motor_speed]
self.write_array(reg, data)
except:
print ('Ctrl_Car I2C error')
#控制电机正反(-255~255)
def Ctrl_Muto(self, motor_id, motor_speed):
try:
if(motor_speed>255):
motor_speed = 255
if(motor_speed<-255):
motor_speed = -255
if(motor_speed < 0 and motor_speed >= -255): #速度如果是负数则后退
motor_dir = 1
else:motor_dir = 0
reg = 0x01
data = [motor_id, motor_dir, abs(motor_speed)]
self.write_array(reg, data)
except:
print ('Ctrl_Car I2C error')
#控制舵机
def Ctrl_Servo(self, id, angle):
try:
reg = 0x02
data = [id, angle]
if angle < 0:
angle = 0
elif angle > 180:
angle = 180
if(id==2 and angle > 110):angle = 110
self.write_array(reg, data)
except:
print ('Ctrl_Servo I2C error')
#控制灯珠(全部)
def Ctrl_WQ2812_ALL(self, state, color):
try:
reg = 0x03
data = [state, color]
if state < 0:
state = 0
elif state > 1:
state = 1
self.write_array(reg, data)
except:
print ('Ctrl_WQ2812 I2C error')
#单独控制灯珠
def Ctrl_WQ2812_Alone(self, number,state, color):
try:
reg = 0x04
data = [number,state, color]
if state < 0:
state = 0
elif state > 1:
state = 1
self.write_array(reg, data)
except:
print ('Ctrl_WQ2812_Alone I2C error')
#控制亮度(全部)
def Ctrl_WQ2812_brightness_ALL(self, R, G, B):
try:
reg = 0x08
data = [R,G,B]
if R >255:
R =255
if G > 255:
G = 255
if B >255:
B=255
self.write_array(reg, data)
except:
print ('Ctrl_WQ2812 I2C error')
#单独灯珠亮度
def Ctrl_WQ2812_brightness_Alone(self, number, R, G, B):
try:
reg = 0x09
data = [number,R,G,B]
if R >255:
R =255
if G > 255:
G = 255
if B >255:
B=255
self.write_array(reg, data)
except:
print ('Ctrl_WQ2812_Alone I2C error')
#控制红外遥控器开关
def Ctrl_IR_Switch(self, state):
try:
reg = 0x05
data = [state]
if state < 0:
state = 0
elif state > 1:
state = 1
self.write_array(reg, data)
except:
print ('Ctrl_IR_Switch I2C error')
#控制蜂鸣器开关
def Ctrl_BEEP_Switch(self, state):
try:
reg = 0x06
data = [state]
if state < 0:
state = 0
elif state > 1:
state = 1
self.write_array(reg, data)
except:
print ('Ctrl_BEEP_Switch I2C error')
#控制超声波测距开关
def Ctrl_Ulatist_Switch(self, state):
try:
reg = 0x07
data = [state]
if state < 0:
state = 0
elif state > 1:
state = 1
self.write_array(reg, data)
except:
print ('Ctrl_getDis_Switch I2C error')
def Read_Ultrasonic(self):
try:
diss_H = self.read_data_array(0x1b, 1)[0]
diss_L = self.read_data_array(0x1a, 1)[0]
dis = (diss_H << 8) | diss_L
return dis
except:
print('Read_Ultrasonic I2C error')
return None
#控制灯珠特效
class LightShow:
def __init__(self):
self.num_lights = 14
self.last_val = 0
self.MAX_TIME=999999
self.bot=Raspbot()
self.running = True
def execute_effect(self, effect_name,effect_duration,speed,current_color):
try:
if effect_name == 'river':
self.run_river_light(effect_duration,speed)
elif effect_name == 'breathing':
self.breathing_light(effect_duration,speed,current_color)
elif effect_name == 'gradient':
self.gradient_light(effect_duration,speed)
elif effect_name == 'random_running':
self.random_running_light(effect_duration,speed)
elif effect_name == 'starlight':
self.starlight_shimmer(effect_duration,speed)
else:
print("Unknown effect name.")
except KeyboardInterrupt:
self.turn_off_all_lights()
self.running = False
def turn_off_all_lights(self):
self.bot.Ctrl_WQ2812_ALL(0, 0)
def run_river_light(self,effect_duration,speed):
# speed = 0.01
colors = [0, 1, 2, 3, 4, 5, 6]
color_index = 0
end_time = time.time()
while self.running and time.time() - end_time < effect_duration:
for i in range(self.num_lights - 2):
self.bot.Ctrl_WQ2812_Alone(i, 1, colors[color_index])
self.bot.Ctrl_WQ2812_Alone(i+1, 1, colors[color_index])
self.bot.Ctrl_WQ2812_Alone(i+2, 1, colors[color_index])
time.sleep(speed)
self.bot.Ctrl_WQ2812_ALL(0, 0)
time.sleep(speed)
color_index = (color_index + 1) % len(colors)
self.turn_off_all_lights()
def breathing_light(self, effect_duration,speed,current_color):
breath_direction = 0
breath_count = 0
end_time = time.time()
while self.running and time.time() - end_time < effect_duration:
if current_color == 0: # Red
r, g, b = breath_count, 0, 0
elif current_color == 1: # Green
r, g, b = 0, breath_count, 0
elif current_color == 2: # Blue
r, g, b = 0, 0, breath_count
elif current_color == 3: # Yellow
r, g, b = breath_count, breath_count, 0
elif current_color == 4: # Purple
r, g, b = breath_count, 0, breath_count
elif current_color == 5: # Cyan
r, g, b = 0, breath_count, breath_count
elif current_color == 6: # White
r, g, b = breath_count, breath_count, breath_count
else:
r, g, b = 0, 0, 0 # Default to black if invalid color code
self.bot.Ctrl_WQ2812_brightness_ALL(r, g, b)
time.sleep(speed)
if breath_direction == 0:
breath_count += 2
if breath_count >= 255:
breath_count=255
breath_direction = 1
else:
breath_count -= 2
if breath_count < 0:
breath_direction = 0
breath_count = 0
self.turn_off_all_lights()
def random_running_light(self,effect_duration,speed):
# on_time = 0.01
# off_time = 0.01
end_time = time.time()
while self.running and time.time() - end_time < effect_duration:
for i in range(self.num_lights):
color = random.randint(0, 6)
self.bot.Ctrl_WQ2812_Alone(i, 1, color)
time.sleep(speed)
self.turn_off_all_lights()
def starlight_shimmer(self,effect_duration,speed):
min_lights_on = 1
max_lights_on = 7
colors = [0, 1, 2, 3, 4, 5, 6]
end_time = time.time()
while self.running and time.time() - end_time < effect_duration:
for color in colors:
start_time = time.time()
while time.time() - start_time < 1:
for i in range(self.num_lights):
self.bot.Ctrl_WQ2812_Alone(i, 0, 0)
lights_on = random.sample(range(self.num_lights), k=random.randint(min_lights_on, max_lights_on))
for i in lights_on:
self.bot.Ctrl_WQ2812_Alone(i, 1, color)
time.sleep(speed)
for i in range(self.num_lights):
self.bot.Ctrl_WQ2812_Alone(i, 0, 0)
self.turn_off_all_lights()
def gradient_light(self, effect_duration, speed):
grad_color = 0
grad_index = 0
end_time = time.time()
while self.running and time.time() - end_time < effect_duration:
if grad_color % 2 == 0:
gt_red = random.randint(0, 255)
gt_green = random.randint(0, 255)
gt_blue = random.randint(0, 255)
gt_green = self.rgb_remix(gt_green)
gt_red, gt_green, gt_blue = self.rgb_remix_u8(gt_red, gt_green, gt_blue)
grad_color += 1
if grad_color == 1:
if grad_index < 14:
number = (grad_index % 14) + 1 # Adjusting for 1-based indexing
self.bot.Ctrl_WQ2812_brightness_Alone(number, gt_red, gt_green, gt_blue)
grad_index += 1
if grad_index >= 14:
grad_color = 2
grad_index = 0
elif grad_color == 3:
if grad_index < 14:
number = ((14 - grad_index) % 14) # Reverse mapping, adjusted for 1-based indexing
self.bot.Ctrl_WQ2812_brightness_Alone(number, gt_red, gt_green, gt_blue)
grad_index += 1
if grad_index >= 14:
grad_color = 0
grad_index = 0
time.sleep(speed)
self.turn_off_all_lights()
def rgb_remix(self, val):
if abs(val - self.last_val) < val % 30:
val = (val + self.last_val) % 255
self.last_val = val % 255
return self.last_val
def rgb_remix_u8(self, r, g, b):
if r > 50 and g > 50 and b > 50:
index = random.randint(0, 2)
if index == 0:
r = 0
elif index == 1:
g = 0
elif index == 2:
b = 0
return r, g, b
def calculate_breath_color(self, color_code, breath_count):
max_brightness = 255
if color_code == 0: # Red
return max_brightness, breath_count, 0
elif color_code == 1: # Green
return max_brightness, 0, breath_count
elif color_code == 2: # Blue
return max_brightness, 0, 0, breath_count
elif color_code == 3: # Yellow
return max_brightness, breath_count, breath_count, 0
elif color_code == 4: # Purple
return max_brightness, breath_count, 0, breath_count
elif color_code == 5: # Cyan
return max_brightness, 0, breath_count, breath_count
elif color_code == 6: # White
return max_brightness, breath_count, breath_count, breath_count
else:
return 0, 0, 0 # Default to black if invalid color code
def stop(self):
self.running = False
# test
#car = Raspbot()
#读取巡线传感器地址 ,此值只有1位
# track =car.read_data_array(0x0a,1)
# track = int(track[0])
# x1 = (track>>3)&0x01
# x2 = (track>>2)&0x01
# x3 = (track>>1)&0x01
# x4 = (track)&0x01
# print(track,x1,x2,x3,x4)
# 读取超声传感器地址,此值只有2位
# car.Ctrl_Ulatist_Switch(1)#open
# time.sleep(1)
# diss_H =car.read_data_array(0x1b,1)[0]
# diss_L =car.read_data_array(0x1a,1)[0]
# dis = diss_H<< 8 | diss_L
# print(dis+"mm")
# car.Ctrl_Ulatist_Switch(0)#close
#读取红外遥控的值
# car.Ctrl_IR_Switch(1)
# time.sleep(3)
# data =car.read_data_array(0x0c,1)
# print(data)
# car.Ctrl_IR_Switch(0)
#蜂鸣器测试
# car.Ctrl_BEEP_Switch(1)
# time.sleep(1)
# car.Ctrl_BEEP_Switch(0)
# time.sleep(1)
#电机测试
# car.Ctrl_Car(0,0,150) #L1电机 前进 150速度
# car.Ctrl_Car(1,0,150) #L2电机 前进 150速度
# car.Ctrl_Car(2,0,150) #R1电机 前进 150速度
# car.Ctrl_Car(3,0,150) #R2电机 前进 150速度
# time.sleep(1)
# car.Ctrl_Car(0,1,50) #L1电机 后退 50速度
# time.sleep(1)
# car.Ctrl_Car(0,0,0) #L1电机 停止
# car.Ctrl_Car(1,0,0) #L2电机 停止
# car.Ctrl_Car(2,0,0) #R1电机 停止
# car.Ctrl_Car(3,0,0) #R2电机 停止
#舵机测试
# car.Ctrl_Servo(1,0) #s1 0度
# car.Ctrl_Servo(2,180) #s2 180度
# time.sleep(1)
# car.Ctrl_Servo(1,180) #s1 180度
# car.Ctrl_Servo(2,0) #s2 0度
# time.sleep(1)
#灯条测试
# car.Ctrl_WQ2812_ALL(1,0)#红色
# time.sleep(1)
# car.Ctrl_WQ2812_ALL(1,1)#绿色
# time.sleep(1)
# car.Ctrl_WQ2812_ALL(1,2)#蓝色
# time.sleep(1)
# car.Ctrl_WQ2812_ALL(1,3)#黄色
# time.sleep(1)
# car.Ctrl_WQ2812_ALL(0,0) #关闭
#单个灯测试
# car.Ctrl_WQ2812_Alone(1,1,0)#1号红色
# time.sleep(1)
# car.Ctrl_WQ2812_Alone(2,1,3)#1号黄色
# time.sleep(1)
# car.Ctrl_WQ2812_Alone(1,0,3)#1号关
# time.sleep(1)
# car.Ctrl_WQ2812_Alone(10,1,2)#10号绿色
# time.sleep(1)
# car.Ctrl_WQ2812_ALL(0,0) #关闭
#控制亮度测试 全部
# for i in range(255):
# car.Ctrl_WQ2812_brightness_ALL(i,0,0)
# time.sleep(0.01)
@@ -0,0 +1 @@
from .Raspbot_Lib import Raspbot,LightShow
+3
View File
@@ -0,0 +1,3 @@
from .motor_controller import MotorController
__all__ = ["MotorController"]
@@ -0,0 +1,480 @@
#!/usr/bin/env python3
# coding: utf-8
import smbus
import time,random
import math
PI5Car_I2CADDR = 0x2B
class Raspbot():
def get_i2c_device(self, address, i2c_bus):
self._addr = address
if i2c_bus is None:
return smbus.SMBus(1)
else:
return smbus.SMBus(i2c_bus)
def __init__(self):
# Create I2C device.
self._device = self.get_i2c_device(PI5Car_I2CADDR, 1)
#写数据
def write_u8(self, reg, data):
try:
self._device.write_byte_data(self._addr, reg, data)
except:
print ('write_u8 I2C error')
def write_reg(self, reg):
try:
self._device.write_byte(self._addr, reg)
except:
print ('write_u8 I2C error')
def write_array(self, reg, data):
try:
# self._device.write_block_data(self._addr, reg, data)
self._device.write_i2c_block_data(self._addr, reg, data)
except:
print ('write_array I2C error')
#读数据
def read_data_byte(self):
try:
buf = self._device.write_byte(self._addr)
return buf
except:
print ('read_u8 I2C error')
def read_data_array(self,reg,len):
try:
buf = self._device.read_i2c_block_data(self._addr,reg,len)
return buf
except:
print ('read_u8 I2C error')
#控制电机
def Ctrl_Car(self, motor_id, motor_dir,motor_speed):
try:
if(motor_dir !=1)and(motor_dir != 0): #参数非法,方向默认前进
motor_dir = 0
if(motor_speed>255):
motor_speed = 255
elif(motor_speed<0):
motor_speed = 0
reg = 0x01
data = [motor_id, motor_dir, motor_speed]
self.write_array(reg, data)
except:
print ('Ctrl_Car I2C error')
#控制电机正反(-255~255)
def Ctrl_Muto(self, motor_id, motor_speed):
try:
if(motor_speed>255):
motor_speed = 255
if(motor_speed<-255):
motor_speed = -255
if(motor_speed < 0 and motor_speed >= -255): #速度如果是负数则后退
motor_dir = 1
else:motor_dir = 0
reg = 0x01
data = [motor_id, motor_dir, abs(motor_speed)]
self.write_array(reg, data)
except:
print ('Ctrl_Car I2C error')
#控制舵机
def Ctrl_Servo(self, id, angle):
try:
reg = 0x02
data = [id, angle]
if angle < 0:
angle = 0
elif angle > 180:
angle = 180
if(id==2 and angle > 110):angle = 110
self.write_array(reg, data)
except:
print ('Ctrl_Servo I2C error')
#控制灯珠(全部)
def Ctrl_WQ2812_ALL(self, state, color):
try:
reg = 0x03
data = [state, color]
if state < 0:
state = 0
elif state > 1:
state = 1
self.write_array(reg, data)
except:
print ('Ctrl_WQ2812 I2C error')
#单独控制灯珠
def Ctrl_WQ2812_Alone(self, number,state, color):
try:
reg = 0x04
data = [number,state, color]
if state < 0:
state = 0
elif state > 1:
state = 1
self.write_array(reg, data)
except:
print ('Ctrl_WQ2812_Alone I2C error')
#控制亮度(全部)
def Ctrl_WQ2812_brightness_ALL(self, R, G, B):
try:
reg = 0x08
data = [R,G,B]
if R >255:
R =255
if G > 255:
G = 255
if B >255:
B=255
self.write_array(reg, data)
except:
print ('Ctrl_WQ2812 I2C error')
#单独灯珠亮度
def Ctrl_WQ2812_brightness_Alone(self, number, R, G, B):
try:
reg = 0x09
data = [number,R,G,B]
if R >255:
R =255
if G > 255:
G = 255
if B >255:
B=255
self.write_array(reg, data)
except:
print ('Ctrl_WQ2812_Alone I2C error')
#控制红外遥控器开关
def Ctrl_IR_Switch(self, state):
try:
reg = 0x05
data = [state]
if state < 0:
state = 0
elif state > 1:
state = 1
self.write_array(reg, data)
except:
print ('Ctrl_IR_Switch I2C error')
#控制蜂鸣器开关
def Ctrl_BEEP_Switch(self, state):
try:
reg = 0x06
data = [state]
if state < 0:
state = 0
elif state > 1:
state = 1
self.write_array(reg, data)
except:
print ('Ctrl_BEEP_Switch I2C error')
#控制超声波测距开关
def Ctrl_Ulatist_Switch(self, state):
try:
reg = 0x07
data = [state]
if state < 0:
state = 0
elif state > 1:
state = 1
self.write_array(reg, data)
except:
print ('Ctrl_getDis_Switch I2C error')
#控制灯珠特效
class LightShow:
def __init__(self):
self.num_lights = 14
self.last_val = 0
self.MAX_TIME=999999
self.bot=Raspbot()
self.running = True
def execute_effect(self, effect_name,effect_duration,speed,current_color):
try:
if effect_name == 'river':
self.run_river_light(effect_duration,speed)
elif effect_name == 'breathing':
self.breathing_light(effect_duration,speed,current_color)
elif effect_name == 'gradient':
self.gradient_light(effect_duration,speed)
elif effect_name == 'random_running':
self.random_running_light(effect_duration,speed)
elif effect_name == 'starlight':
self.starlight_shimmer(effect_duration,speed)
else:
print("Unknown effect name.")
except KeyboardInterrupt:
self.turn_off_all_lights()
self.running = False
def turn_off_all_lights(self):
self.bot.Ctrl_WQ2812_ALL(0, 0)
def run_river_light(self,effect_duration,speed):
# speed = 0.01
colors = [0, 1, 2, 3, 4, 5, 6]
color_index = 0
end_time = time.time()
while self.running and time.time() - end_time < effect_duration:
for i in range(self.num_lights - 2):
self.bot.Ctrl_WQ2812_Alone(i, 1, colors[color_index])
self.bot.Ctrl_WQ2812_Alone(i+1, 1, colors[color_index])
self.bot.Ctrl_WQ2812_Alone(i+2, 1, colors[color_index])
time.sleep(speed)
self.bot.Ctrl_WQ2812_ALL(0, 0)
time.sleep(speed)
color_index = (color_index + 1) % len(colors)
self.turn_off_all_lights()
def breathing_light(self, effect_duration,speed,current_color):
breath_direction = 0
breath_count = 0
end_time = time.time()
while self.running and time.time() - end_time < effect_duration:
if current_color == 0: # Red
r, g, b = breath_count, 0, 0
elif current_color == 1: # Green
r, g, b = 0, breath_count, 0
elif current_color == 2: # Blue
r, g, b = 0, 0, breath_count
elif current_color == 3: # Yellow
r, g, b = breath_count, breath_count, 0
elif current_color == 4: # Purple
r, g, b = breath_count, 0, breath_count
elif current_color == 5: # Cyan
r, g, b = 0, breath_count, breath_count
elif current_color == 6: # White
r, g, b = breath_count, breath_count, breath_count
else:
r, g, b = 0, 0, 0 # Default to black if invalid color code
self.bot.Ctrl_WQ2812_brightness_ALL(r, g, b)
time.sleep(speed)
if breath_direction == 0:
breath_count += 2
if breath_count >= 255:
breath_count=255
breath_direction = 1
else:
breath_count -= 2
if breath_count < 0:
breath_direction = 0
breath_count = 0
self.turn_off_all_lights()
def random_running_light(self,effect_duration,speed):
# on_time = 0.01
# off_time = 0.01
end_time = time.time()
while self.running and time.time() - end_time < effect_duration:
for i in range(self.num_lights):
color = random.randint(0, 6)
self.bot.Ctrl_WQ2812_Alone(i, 1, color)
time.sleep(speed)
self.turn_off_all_lights()
def starlight_shimmer(self,effect_duration,speed):
min_lights_on = 1
max_lights_on = 7
colors = [0, 1, 2, 3, 4, 5, 6]
end_time = time.time()
while self.running and time.time() - end_time < effect_duration:
for color in colors:
start_time = time.time()
while time.time() - start_time < 1:
for i in range(self.num_lights):
self.bot.Ctrl_WQ2812_Alone(i, 0, 0)
lights_on = random.sample(range(self.num_lights), k=random.randint(min_lights_on, max_lights_on))
for i in lights_on:
self.bot.Ctrl_WQ2812_Alone(i, 1, color)
time.sleep(speed)
for i in range(self.num_lights):
self.bot.Ctrl_WQ2812_Alone(i, 0, 0)
self.turn_off_all_lights()
def gradient_light(self, effect_duration, speed):
grad_color = 0
grad_index = 0
end_time = time.time()
while self.running and time.time() - end_time < effect_duration:
if grad_color % 2 == 0:
gt_red = random.randint(0, 255)
gt_green = random.randint(0, 255)
gt_blue = random.randint(0, 255)
gt_green = self.rgb_remix(gt_green)
gt_red, gt_green, gt_blue = self.rgb_remix_u8(gt_red, gt_green, gt_blue)
grad_color += 1
if grad_color == 1:
if grad_index < 14:
number = (grad_index % 14) + 1 # Adjusting for 1-based indexing
self.bot.Ctrl_WQ2812_brightness_Alone(number, gt_red, gt_green, gt_blue)
grad_index += 1
if grad_index >= 14:
grad_color = 2
grad_index = 0
elif grad_color == 3:
if grad_index < 14:
number = ((14 - grad_index) % 14) # Reverse mapping, adjusted for 1-based indexing
self.bot.Ctrl_WQ2812_brightness_Alone(number, gt_red, gt_green, gt_blue)
grad_index += 1
if grad_index >= 14:
grad_color = 0
grad_index = 0
time.sleep(speed)
self.turn_off_all_lights()
def rgb_remix(self, val):
if abs(val - self.last_val) < val % 30:
val = (val + self.last_val) % 255
self.last_val = val % 255
return self.last_val
def rgb_remix_u8(self, r, g, b):
if r > 50 and g > 50 and b > 50:
index = random.randint(0, 2)
if index == 0:
r = 0
elif index == 1:
g = 0
elif index == 2:
b = 0
return r, g, b
def calculate_breath_color(self, color_code, breath_count):
max_brightness = 255
if color_code == 0: # Red
return max_brightness, breath_count, 0
elif color_code == 1: # Green
return max_brightness, 0, breath_count
elif color_code == 2: # Blue
return max_brightness, 0, 0, breath_count
elif color_code == 3: # Yellow
return max_brightness, breath_count, breath_count, 0
elif color_code == 4: # Purple
return max_brightness, breath_count, 0, breath_count
elif color_code == 5: # Cyan
return max_brightness, 0, breath_count, breath_count
elif color_code == 6: # White
return max_brightness, breath_count, breath_count, breath_count
else:
return 0, 0, 0 # Default to black if invalid color code
def stop(self):
self.running = False
# test
#car = Raspbot()
#读取巡线传感器地址 ,此值只有1位
# track =car.read_data_array(0x0a,1)
# track = int(track[0])
# x1 = (track>>3)&0x01
# x2 = (track>>2)&0x01
# x3 = (track>>1)&0x01
# x4 = (track)&0x01
# print(track,x1,x2,x3,x4)
# 读取超声传感器地址,此值只有2位
# car.Ctrl_Ulatist_Switch(1)#open
# time.sleep(1)
# diss_H =car.read_data_array(0x1b,1)[0]
# diss_L =car.read_data_array(0x1a,1)[0]
# dis = diss_H<< 8 | diss_L
# print(dis+"mm")
# car.Ctrl_Ulatist_Switch(0)#close
#读取红外遥控的值
# car.Ctrl_IR_Switch(1)
# time.sleep(3)
# data =car.read_data_array(0x0c,1)
# print(data)
# car.Ctrl_IR_Switch(0)
#蜂鸣器测试
# car.Ctrl_BEEP_Switch(1)
# time.sleep(1)
# car.Ctrl_BEEP_Switch(0)
# time.sleep(1)
#电机测试
# car.Ctrl_Car(0,0,150) #L1电机 前进 150速度
# car.Ctrl_Car(1,0,150) #L2电机 前进 150速度
# car.Ctrl_Car(2,0,150) #R1电机 前进 150速度
# car.Ctrl_Car(3,0,150) #R2电机 前进 150速度
# time.sleep(1)
# car.Ctrl_Car(0,1,50) #L1电机 后退 50速度
# time.sleep(1)
# car.Ctrl_Car(0,0,0) #L1电机 停止
# car.Ctrl_Car(1,0,0) #L2电机 停止
# car.Ctrl_Car(2,0,0) #R1电机 停止
# car.Ctrl_Car(3,0,0) #R2电机 停止
#舵机测试
# car.Ctrl_Servo(1,0) #s1 0度
# car.Ctrl_Servo(2,180) #s2 180度
# time.sleep(1)
# car.Ctrl_Servo(1,180) #s1 180度
# car.Ctrl_Servo(2,0) #s2 0度
# time.sleep(1)
#灯条测试
# car.Ctrl_WQ2812_ALL(1,0)#红色
# time.sleep(1)
# car.Ctrl_WQ2812_ALL(1,1)#绿色
# time.sleep(1)
# car.Ctrl_WQ2812_ALL(1,2)#蓝色
# time.sleep(1)
# car.Ctrl_WQ2812_ALL(1,3)#黄色
# time.sleep(1)
# car.Ctrl_WQ2812_ALL(0,0) #关闭
#单个灯测试
# car.Ctrl_WQ2812_Alone(1,1,0)#1号红色
# time.sleep(1)
# car.Ctrl_WQ2812_Alone(2,1,3)#1号黄色
# time.sleep(1)
# car.Ctrl_WQ2812_Alone(1,0,3)#1号关
# time.sleep(1)
# car.Ctrl_WQ2812_Alone(10,1,2)#10号绿色
# time.sleep(1)
# car.Ctrl_WQ2812_ALL(0,0) #关闭
#控制亮度测试 全部
# for i in range(255):
# car.Ctrl_WQ2812_brightness_ALL(i,0,0)
# time.sleep(0.01)
@@ -0,0 +1 @@
from .Raspbot_Lib import Raspbot,LightShow
Binary file not shown.
Binary file not shown.
@@ -0,0 +1,109 @@
from .Raspbot_Lib import Raspbot
# Motor ID assignments (from Raspbot_Lib comments)
_MOTOR_FL = 0 # L1 — front left
_MOTOR_RL = 1 # L2 — rear left
_MOTOR_FR = 2 # R1 — front right
_MOTOR_RR = 3 # R2 — rear right
_DRIVER_MAX = 255
class MotorController:
"""
Thin adapter between the ROS 2 motor controller node and Raspbot_Lib.
Speed convention: the node passes values in the range
[-max_speed, max_speed] (default max_speed = 1.0). This class scales
them linearly to the driver's 0-255 range, so max_speed should match
whatever normalised unit you use (1.0 → full throttle).
"""
def __init__(self, max_speed: float = 1.0):
self._max_speed = max_speed
self._bot: Raspbot | None = None
# Shadow register — Raspbot_Lib has no read-back, so we track state.
self._speeds = [0.0, 0.0, 0.0, 0.0] # [FL, FR, RL, RR]
# ------------------------------------------------------------------
# Lifecycle
# ------------------------------------------------------------------
def initialize(self) -> None:
"""Create the I2C connection and ensure all motors are stopped."""
self._bot = Raspbot()
self._stop_all_motors()
def shutdown(self) -> None:
"""Stop motors and release the driver handle."""
if self._bot is not None:
self._stop_all_motors()
self._bot = None
# ------------------------------------------------------------------
# Motor control
# ------------------------------------------------------------------
def set_speed(
self,
front_left: float,
front_right: float,
rear_left: float,
rear_right: float,
) -> None:
"""
Set all four wheel speeds simultaneously.
Parameters
----------
front_left, front_right, rear_left, rear_right:
Desired speeds in the range [-max_speed, max_speed].
Positive = forward, negative = backward.
"""
self._require_initialized()
mapping = [
(_MOTOR_FL, front_left),
(_MOTOR_RL, rear_left),
(_MOTOR_FR, front_right),
(_MOTOR_RR, rear_right),
]
for motor_id, speed in mapping:
self._bot.Ctrl_Muto(motor_id, self._to_driver_units(speed))
self._speeds = [front_left, front_right, rear_left, rear_right]
def stop_all(self) -> None:
"""Immediately stop all motors."""
self._require_initialized()
self._stop_all_motors()
self._speeds = [0.0, 0.0, 0.0, 0.0]
def get_speeds(self) -> list[float]:
"""
Return the last commanded wheel speeds as [FL, FR, RL, RR].
The Raspbot driver provides no encoder/tachometer read-back, so this
returns the most recently written values.
"""
return list(self._speeds)
# ------------------------------------------------------------------
# Helpers
# ------------------------------------------------------------------
def _to_driver_units(self, speed: float) -> int:
"""Scale a speed value to the driver's integer range [-255, 255]."""
if self._max_speed == 0:
return 0
scaled = (speed / self._max_speed) * _DRIVER_MAX
return int(max(-_DRIVER_MAX, min(_DRIVER_MAX, scaled)))
def _stop_all_motors(self) -> None:
for motor_id in (_MOTOR_FL, _MOTOR_RL, _MOTOR_FR, _MOTOR_RR):
self._bot.Ctrl_Muto(motor_id, 0)
def _require_initialized(self) -> None:
if self._bot is None:
raise RuntimeError(
"MotorController has not been initialized. Call initialize() first."
)
+11
View File
@@ -0,0 +1,11 @@
from setuptools import find_packages, setup
setup(
name='Raspbot_Lib',
version='0.0.2',
py_modules = ['Raspbot_Lib'],
author='Yahboom Team',
url='www.yahboom.com',
packages=find_packages(),
description='Raspbot drvier V0.0.2'
)
@@ -0,0 +1,59 @@
from launch import LaunchDescription
from launch.actions import DeclareLaunchArgument
from launch.substitutions import LaunchConfiguration
from launch_ros.actions import Node
def generate_launch_description():
return LaunchDescription([
# ── Motor controller arguments ────────────────────────────────────
DeclareLaunchArgument('wheel_base', default_value='0.3',
description='Distance between left and right wheels (m)'),
DeclareLaunchArgument('max_speed', default_value='1.0',
description='Maximum motor speed in library units'),
# ── Ultrasonic sensor arguments ───────────────────────────────────
DeclareLaunchArgument('ultrasonic_rate_hz', default_value='10.0',
description='Ultrasonic sensor publish rate (Hz)'),
# ── Camera orientation arguments ──────────────────────────────────
DeclareLaunchArgument('pan_center_deg', default_value='90.0',
description='Pan angle at startup and shutdown (degrees)'),
DeclareLaunchArgument('tilt_center_deg', default_value='60.0',
description='Tilt angle at startup and shutdown (degrees)'),
# ── Nodes ─────────────────────────────────────────────────────────
Node(
package='raspbot_v2',
executable='motor_controller',
name='motor_controller',
parameters=[{
'wheel_base': LaunchConfiguration('wheel_base'),
'max_speed': LaunchConfiguration('max_speed'),
}],
output='screen',
),
Node(
package='raspbot_v2',
executable='camera_orientation',
name='camera_orientation',
parameters=[{
'pan_center_deg': LaunchConfiguration('pan_center_deg'),
'tilt_center_deg': LaunchConfiguration('tilt_center_deg'),
}],
output='screen',
),
Node(
package='raspbot_v2',
executable='ultrasonic',
name='ultrasonic',
parameters=[{
'publish_rate_hz': LaunchConfiguration('ultrasonic_rate_hz'),
}],
output='screen',
),
])
+24
View File
@@ -0,0 +1,24 @@
<?xml version="1.0"?>
<package format="3">
<name>raspbot_v2</name>
<version>0.0.1</version>
<description>Yahboom Raspbot V2 motor and camera orientation controller</description>
<maintainer email="you@example.com">Your Name</maintainer>
<license>Apache-2.0</license>
<exec_depend>rclpy</exec_depend>
<exec_depend>geometry_msgs</exec_depend>
<exec_depend>std_msgs</exec_depend>
<exec_depend>sensor_msgs</exec_depend>
<exec_depend>launch</exec_depend>
<exec_depend>launch_ros</exec_depend>
<buildtool_depend>ament_python</buildtool_depend>
<export>
<build_type>ament_python</build_type>
</export>
<test_depend>ament_copyright</test_depend>
<test_depend>ament_flake8</test_depend>
<test_depend>ament_pep257</test_depend>
</package>
@@ -0,0 +1,199 @@
#!/usr/bin/env python3
"""
Camera orientation controller node.
Subscribes to joint commands and drives the pan/tilt servo pair on the
Raspbot via Ctrl_Servo. Publishes current joint state for downstream
consumers (e.g. tf2, rviz2).
Standard interfaces
-------------------
Command : /joint_command (sensor_msgs/JointState)
Joint names: "pan" horizontal rotation (yaw)
Joint names: "tilt" vertical rotation (pitch)
Positions in **radians** (ROS convention).
State : /joint_states (sensor_msgs/JointState)
Same joint names; position in radians, reflecting the last
commanded angle.
Parameters
----------
pan_servo_id int default 1 Raspbot servo channel for pan
tilt_servo_id int default 2 Raspbot servo channel for tilt
pan_min_deg float default 0.0 hardware lower limit for pan (°)
pan_max_deg float default 180.0
tilt_min_deg float default 0.0 hardware lower limit for tilt (°)
tilt_max_deg float default 110.0 library caps tilt at 110°
pan_center_deg float default 90.0 position commanded on startup
tilt_center_deg float default 60.0
state_rate_hz float default 10.0 joint-state publish rate
"""
import math
import rclpy
from rclpy.node import Node
from sensor_msgs.msg import JointState
# Servo IDs used by Ctrl_Servo
_PAN_SERVO_ID = 1
_TILT_SERVO_ID = 2
def _deg_to_rad(deg: float) -> float:
return deg * math.pi / 180.0
def _rad_to_deg(rad: float) -> float:
return rad * 180.0 / math.pi
class CameraOrientationNode(Node):
def __init__(self):
super().__init__('camera_orientation')
# --- Parameters ---
self.declare_parameter('pan_servo_id', _PAN_SERVO_ID)
self.declare_parameter('tilt_servo_id', _TILT_SERVO_ID)
self.declare_parameter('pan_min_deg', 0.0)
self.declare_parameter('pan_max_deg', 180.0)
self.declare_parameter('tilt_min_deg', 0.0)
self.declare_parameter('tilt_max_deg', 110.0)
self.declare_parameter('pan_center_deg', 90.0)
self.declare_parameter('tilt_center_deg', 60.0)
self.declare_parameter('state_rate_hz', 10.0)
self._pan_id = self.get_parameter('pan_servo_id').value
self._tilt_id = self.get_parameter('tilt_servo_id').value
self._pan_min = self.get_parameter('pan_min_deg').value
self._pan_max = self.get_parameter('pan_max_deg').value
self._tilt_min = self.get_parameter('tilt_min_deg').value
self._tilt_max = self.get_parameter('tilt_max_deg').value
pan_center = self.get_parameter('pan_center_deg').value
tilt_center = self.get_parameter('tilt_center_deg').value
rate_hz = self.get_parameter('state_rate_hz').value
# --- Hardware ---
# Import here so the node can be unit-tested without hardware present
from raspbot_v2_interface.Raspbot_Lib import Raspbot
self._bot = Raspbot()
# --- State (degrees, internal representation) ---
self._pan_deg = pan_center
self._tilt_deg = tilt_center
# Drive servos to the startup centre position
self._apply(self._pan_deg, self._tilt_deg)
# --- Subscriber ---
self._cmd_sub = self.create_subscription(
JointState,
'joint_command',
self._joint_command_cb,
10,
)
# --- Publisher ---
self._state_pub = self.create_publisher(JointState, 'joint_states', 10)
self._state_timer = self.create_timer(1.0 / rate_hz, self._publish_state)
self.get_logger().info(
f'Camera orientation node started '
f'(pan servo {self._pan_id}, tilt servo {self._tilt_id}). '
f'Centred at pan={pan_center}° tilt={tilt_center}°.'
)
# ------------------------------------------------------------------
# Subscriber callback
# ------------------------------------------------------------------
def _joint_command_cb(self, msg: JointState):
"""
Accept a JointState command. Only joints named "pan" or "tilt"
are acted on; other names are silently ignored so this node can
coexist on a shared joint_command topic.
Positions are expected in radians (ROS convention).
"""
if len(msg.name) != len(msg.position):
self.get_logger().warn(
'joint_command message has mismatched name/position lengths; ignoring.'
)
return
pan_deg = self._pan_deg
tilt_deg = self._tilt_deg
for name, pos_rad in zip(msg.name, msg.position):
pos_deg = _rad_to_deg(pos_rad)
if name == 'pan':
pan_deg = self._clamp(pos_deg, self._pan_min, self._pan_max)
elif name == 'tilt':
tilt_deg = self._clamp(pos_deg, self._tilt_min, self._tilt_max)
self._apply(pan_deg, tilt_deg)
# ------------------------------------------------------------------
# Helpers
# ------------------------------------------------------------------
def _apply(self, pan_deg: float, tilt_deg: float):
"""Send angles to hardware and update internal state."""
pan_int = int(round(pan_deg))
tilt_int = int(round(tilt_deg))
try:
self._bot.Ctrl_Servo(self._pan_id, pan_int)
self._bot.Ctrl_Servo(self._tilt_id, tilt_int)
self._pan_deg = pan_deg
self._tilt_deg = tilt_deg
except Exception as exc:
self.get_logger().error(f'Servo write failed: {exc}')
@staticmethod
def _clamp(value: float, lo: float, hi: float) -> float:
return max(lo, min(hi, value))
# ------------------------------------------------------------------
# State publisher
# ------------------------------------------------------------------
def _publish_state(self):
msg = JointState()
msg.header.stamp = self.get_clock().now().to_msg()
msg.name = ['pan', 'tilt']
msg.position = [_deg_to_rad(self._pan_deg), _deg_to_rad(self._tilt_deg)]
self._state_pub.publish(msg)
# ------------------------------------------------------------------
# Shutdown
# ------------------------------------------------------------------
def destroy_node(self):
self.get_logger().info('Parking camera servos at centre...')
pan_center = self.get_parameter('pan_center_deg').value
tilt_center = self.get_parameter('tilt_center_deg').value
try:
self._apply(pan_center, tilt_center)
except Exception:
pass
super().destroy_node()
def main(args=None):
rclpy.init(args=args)
node = CameraOrientationNode()
try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
finally:
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
@@ -0,0 +1,119 @@
#!/usr/bin/env python3
import rclpy
from rclpy.node import Node
from geometry_msgs.msg import Twist
from std_msgs.msg import Float32MultiArray
# Import your motor library
from raspbot_v2_interface import MotorController
class MotorControllerNode(Node):
def __init__(self):
super().__init__('motor_controller')
# --- Parameters ---
self.declare_parameter('wheel_base', 0.3) # meters between left/right wheels
self.declare_parameter('max_speed', 1.0) # max motor speed (library units)
self.wheel_base = self.get_parameter('wheel_base').value
self.max_speed = self.get_parameter('max_speed').value
# --- Initialize your motor library ---
self.motors = MotorController()
self.motors.initialize()
# --- Subscribers ---
# cmd_vel is the standard ROS2 topic for velocity commands
self.cmd_vel_sub = self.create_subscription(
Twist,
'cmd_vel',
self.cmd_vel_callback,
10 # QoS queue depth
)
# Optional: direct per-wheel speed control [FL, FR, RL, RR]
self.wheel_speeds_sub = self.create_subscription(
Float32MultiArray,
'wheel_speeds',
self.wheel_speeds_callback,
10
)
# --- Publishers ---
# Publish current wheel speeds for feedback
self.speed_pub = self.create_publisher(Float32MultiArray, 'current_wheel_speeds', 10)
# --- Timer for telemetry ---
self.telemetry_timer = self.create_timer(0.1, self.publish_telemetry) # 10 Hz
self.get_logger().info('Motor controller node started')
def cmd_vel_callback(self, msg: Twist):
"""
Convert a Twist message (linear.x, angular.z) into
differential-drive wheel speeds for a 4-wheeled robot.
"""
linear = msg.linear.x # m/s forward
angular = msg.angular.z # rad/s rotation
# Differential drive kinematics
left_speed = linear - (angular * self.wheel_base / 2.0)
right_speed = linear + (angular * self.wheel_base / 2.0)
# Clamp to max speed
left_speed = max(-self.max_speed, min(self.max_speed, left_speed))
right_speed = max(-self.max_speed, min(self.max_speed, right_speed))
# Send to motors via your library (FL=FR=left, RL=RR=right)
try:
self.motors.set_speed(front_left=left_speed, front_right=right_speed,
rear_left=left_speed, rear_right=right_speed)
except Exception as e:
self.get_logger().error(f'Motor error: {e}')
def wheel_speeds_callback(self, msg: Float32MultiArray):
"""Direct per-wheel control: [FL, FR, RL, RR]"""
if len(msg.data) != 4:
self.get_logger().warn('wheel_speeds expects exactly 4 values [FL, FR, RL, RR]')
return
try:
self.motors.set_speed(front_left=msg.data[0], front_right=msg.data[1],
rear_left=msg.data[2], rear_right=msg.data[3])
except Exception as e:
self.get_logger().error(f'Motor error: {e}')
def publish_telemetry(self):
"""Read and publish current wheel speeds from your library."""
try:
speeds = self.motors.get_speeds() # Expected: [FL, FR, RL, RR]
msg = Float32MultiArray()
msg.data = [float(s) for s in speeds]
self.speed_pub.publish(msg)
except Exception as e:
self.get_logger().warn(f'Telemetry read failed: {e}')
def destroy_node(self):
"""Clean shutdown — stop motors before exiting."""
self.get_logger().info('Shutting down motors...')
try:
self.motors.stop_all()
self.motors.shutdown()
except Exception:
pass
super().destroy_node()
def main(args=None):
rclpy.init(args=args)
node = MotorControllerNode()
try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
finally:
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
@@ -0,0 +1,167 @@
#!/usr/bin/env python3
"""
Ultrasonic range sensor node.
Publishes distance readings from the Raspbot's HC-SR04-style sensor as a
standard sensor_msgs/Range message. The sensor is only powered on while at
least one subscriber is active, and automatically powered off when the last
subscriber disconnects.
Standard interface
------------------
Published : /ultrasonic/range (sensor_msgs/Range)
radiation_type = ULTRASOUND
range in metres; +inf when target is beyond max_range,
-inf when closer than min_range (REP-117 convention).
Parameters
----------
publish_rate_hz float default 10.0 sensor poll and publish rate
frame_id str default 'ultrasonic' header frame_id
min_range_m float default 0.02 minimum valid range (m)
max_range_m float default 4.0 maximum valid range (m)
field_of_view float default 0.2618 sensor cone width (rad, ~15°)
warmup_s float default 1.0 seconds to wait after powering the
sensor on before trusting readings
"""
import math
import time
import rclpy
from rclpy.node import Node
from sensor_msgs.msg import Range
class UltrasonicNode(Node):
def __init__(self):
super().__init__('ultrasonic')
# --- Parameters ---
self.declare_parameter('publish_rate_hz', 10.0)
self.declare_parameter('frame_id', 'ultrasonic')
self.declare_parameter('min_range_m', 0.02)
self.declare_parameter('max_range_m', 4.0)
self.declare_parameter('field_of_view', 0.2618) # ~15 degrees
self.declare_parameter('warmup_s', 1.0)
self._frame_id = self.get_parameter('frame_id').value
self._min_range = self.get_parameter('min_range_m').value
self._max_range = self.get_parameter('max_range_m').value
self._fov = self.get_parameter('field_of_view').value
self._warmup_s = self.get_parameter('warmup_s').value
rate_hz = self.get_parameter('publish_rate_hz').value
# --- Hardware ---
from raspbot_v2_interface.Raspbot_Lib import Raspbot
self._bot = Raspbot()
# --- Sensor state ---
self._sensor_on = False
self._enabled_at = 0.0 # monotonic time when sensor was switched on
# --- Publisher ---
self._pub = self.create_publisher(Range, 'ultrasonic/range', 10)
# --- Timer ---
self._timer = self.create_timer(1.0 / rate_hz, self._tick)
self.get_logger().info(
f'Ultrasonic node started ({rate_hz} Hz, '
f'range {self._min_range}{self._max_range} m). '
f'Sensor will activate when subscribers connect.'
)
# ------------------------------------------------------------------
# Timer callback
# ------------------------------------------------------------------
def _tick(self):
has_subscribers = self._pub.get_subscription_count() > 0
# Power the sensor on/off based on subscriber presence
if has_subscribers and not self._sensor_on:
self._set_sensor(True)
return # skip this tick — let the sensor warm up
if not has_subscribers and self._sensor_on:
self._set_sensor(False)
return
if not self._sensor_on:
return
# Still warming up — do not publish unreliable readings
if time.monotonic() - self._enabled_at < self._warmup_s:
return
# Read and publish
raw_mm = self._bot.Read_Ultrasonic()
if raw_mm is None:
self.get_logger().warn('Ultrasonic read returned None', throttle_duration_sec=5.0)
return
self._pub.publish(self._build_msg(raw_mm))
# ------------------------------------------------------------------
# Helpers
# ------------------------------------------------------------------
def _set_sensor(self, on: bool):
try:
self._bot.Ctrl_Ulatist_Switch(1 if on else 0)
self._sensor_on = on
self._enabled_at = time.monotonic()
self.get_logger().info(f'Ultrasonic sensor {"enabled" if on else "disabled"}')
except Exception as exc:
self.get_logger().error(f'Failed to set sensor state: {exc}')
def _build_msg(self, raw_mm: int) -> Range:
msg = Range()
msg.header.stamp = self.get_clock().now().to_msg()
msg.header.frame_id = self._frame_id
msg.radiation_type = Range.ULTRASOUND
msg.field_of_view = self._fov
msg.min_range = self._min_range
msg.max_range = self._max_range
distance_m = raw_mm / 1000.0
# REP-117: report +inf / -inf for out-of-range readings
if distance_m > self._max_range:
msg.range = math.inf
elif distance_m < self._min_range:
msg.range = -math.inf
else:
msg.range = distance_m
return msg
# ------------------------------------------------------------------
# Shutdown
# ------------------------------------------------------------------
def destroy_node(self):
if self._sensor_on:
try:
self._bot.Ctrl_Ulatist_Switch(0)
except Exception:
pass
super().destroy_node()
def main(args=None):
rclpy.init(args=args)
node = UltrasonicNode()
try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
finally:
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
+13
View File
@@ -0,0 +1,13 @@
[metadata]
name = raspbot_v2
version = 0.0.1
[options]
packages = find:
install_requires =
setuptools
[develop]
script_dir=$base/lib/raspbot_v2
[install]
install_scripts=$base/lib/raspbot_v2
+23
View File
@@ -0,0 +1,23 @@
from setuptools import setup
package_name = 'raspbot_v2'
setup(
name=package_name,
version='0.0.1',
packages=[package_name],
data_files=[
('share/ament_index/resource_index/packages', ['resource/' + package_name]),
('share/' + package_name, ['package.xml']),
('share/' + package_name + '/launch', ['launch/robot.launch.py']),
],
install_requires=['setuptools'],
zip_safe=True,
entry_points={
'console_scripts': [
'motor_controller = raspbot_v2.motor_controller_node:main',
'camera_orientation = raspbot_v2.camera_orientation_node:main',
'ultrasonic = raspbot_v2.ultrasonic_node:main',
],
},
)