@@ -0,0 +1,208 @@
#!/usr/bin/env python3
"""
OLED display node for the Raspbot V2.
Renders a live dashboard on the I2C OLED screen showing:
- Robot hostname / IP address
- Ultrasonic range reading
- Camera pan / tilt angles
- CPU and memory usage
Parameters
----------
driver str default ' ssd1306 ' – OLED driver: ssd1306 | sh1106
i2c_port int default 1 – I2C bus number
i2c_address int default 0x3C – I2C device address
width int default 128
height int default 64
rotate int default 0 – 0/1/2/3 = 0/90/180/270°
refresh_hz float default 2.0 – display update rate
data_timeout_s float default 5.0 – seconds before a reading shows ' --- '
"""
import math
import socket
import time
import psutil
import rclpy
from rclpy . node import Node
from sensor_msgs . msg import JointState , Range
DRIVERS = ( ' ssd1306 ' , ' sh1106 ' )
def _get_ip ( ) - > str :
try :
s = socket . socket ( socket . AF_INET , socket . SOCK_DGRAM )
s . connect ( ( ' 8.8.8.8 ' , 80 ) )
ip = s . getsockname ( ) [ 0 ]
s . close ( )
return ip
except Exception :
return ' ?.?.?.? '
def _load_font ( size : int ) :
from PIL import ImageFont
candidates = [
' /usr/share/fonts/truetype/dejavu/DejaVuSansMono.ttf ' ,
' /usr/share/fonts/truetype/dejavu/DejaVuSans.ttf ' ,
' /usr/share/fonts/truetype/liberation/LiberationMono-Regular.ttf ' ,
]
for path in candidates :
try :
return ImageFont . truetype ( path , size )
except ( IOError , OSError ) :
pass
return ImageFont . load_default ( )
class OledNode ( Node ) :
def __init__ ( self ) :
super ( ) . __init__ ( ' oled_display ' )
self . declare_parameter ( ' driver ' , ' ssd1306 ' )
self . declare_parameter ( ' i2c_port ' , 1 )
self . declare_parameter ( ' i2c_address ' , 0x3C )
self . declare_parameter ( ' width ' , 128 )
self . declare_parameter ( ' height ' , 64 )
self . declare_parameter ( ' rotate ' , 0 )
self . declare_parameter ( ' refresh_hz ' , 2.0 )
self . declare_parameter ( ' data_timeout_s ' , 5.0 )
self . _timeout = self . get_parameter ( ' data_timeout_s ' ) . value
self . _range : float | None = None
self . _range_ts : float = 0.0
self . _joints : dict [ str , float ] = { }
self . _joints_ts : float = 0.0
self . create_subscription ( Range , ' /ultrasonic/range ' , self . _range_cb , 10 )
self . create_subscription ( JointState , ' /joint_states ' , self . _joints_cb , 10 )
self . _device = self . _init_display ( )
self . _font = _load_font ( 10 )
self . _line_h = 12 # pixels per text row
rate = self . get_parameter ( ' refresh_hz ' ) . value
self . create_timer ( 1.0 / rate , self . _render )
self . get_logger ( ) . info ( ' OLED display node started. ' )
# ------------------------------------------------------------------
# Display initialisation
# ------------------------------------------------------------------
def _init_display ( self ) :
driver = self . get_parameter ( ' driver ' ) . value
if driver not in DRIVERS :
self . get_logger ( ) . warn ( f " Unknown driver ' { driver } ' , falling back to ssd1306. " )
driver = ' ssd1306 '
port = self . get_parameter ( ' i2c_port ' ) . value
address = self . get_parameter ( ' i2c_address ' ) . value
width = self . get_parameter ( ' width ' ) . value
height = self . get_parameter ( ' height ' ) . value
rotate = self . get_parameter ( ' rotate ' ) . value
try :
from luma . core . interface . serial import i2c as luma_i2c
from luma . oled . device import sh1106 , ssd1306
serial = luma_i2c ( port = port , address = address )
cls = ssd1306 if driver == ' ssd1306 ' else sh1106
device = cls ( serial , width = width , height = height , rotate = rotate )
self . get_logger ( ) . info (
f " OLED { driver } initialised at I2C bus { port } , address 0x { address : 02X } . "
)
return device
except Exception as e :
self . get_logger ( ) . error ( f ' Could not initialise OLED display: { e } ' )
return None
# ------------------------------------------------------------------
# Topic callbacks
# ------------------------------------------------------------------
def _range_cb ( self , msg : Range ) - > None :
self . _range = msg . range
self . _range_ts = time . monotonic ( )
def _joints_cb ( self , msg : JointState ) - > None :
self . _joints = dict ( zip ( msg . name , msg . position ) )
self . _joints_ts = time . monotonic ( )
# ------------------------------------------------------------------
# Rendering
# ------------------------------------------------------------------
def _render ( self ) - > None :
if self . _device is None :
return
now = time . monotonic ( )
# Ultrasonic range
if self . _range is not None and now - self . _range_ts < self . _timeout :
range_str = f ' { self . _range : .2f } m '
else :
range_str = ' --- '
# Pan / tilt
if self . _joints and now - self . _joints_ts < self . _timeout :
pan_deg = math . degrees ( self . _joints . get ( ' pan ' , 0.0 ) )
tilt_deg = math . degrees ( self . _joints . get ( ' tilt ' , 0.0 ) )
pt_str = f ' P: { pan_deg : +.0f } ° T: { tilt_deg : +.0f } ° '
else :
pt_str = ' P:--- T:--- '
# System stats
cpu_pct = psutil . cpu_percent ( )
mem_pct = psutil . virtual_memory ( ) . percent
ip = _get_ip ( )
try :
from luma . core . render import canvas
with canvas ( self . _device ) as draw :
y = 0
draw . text ( ( 0 , y ) , ' RASPBOT V2 ' , font = self . _font , fill = ' white ' )
y + = self . _line_h
draw . text ( ( 0 , y ) , f ' IP: { ip } ' , font = self . _font , fill = ' white ' )
y + = self . _line_h
draw . text ( ( 0 , y ) , f ' Dist: { range_str } ' , font = self . _font , fill = ' white ' )
y + = self . _line_h
draw . text ( ( 0 , y ) , pt_str , font = self . _font , fill = ' white ' )
y + = self . _line_h
draw . text ( ( 0 , y ) , f ' CPU: { cpu_pct : .0f } % Mem: { mem_pct : .0f } % ' , font = self . _font , fill = ' white ' )
except Exception as e :
self . get_logger ( ) . error ( f ' Display render error: { e } ' )
# ------------------------------------------------------------------
# Shutdown
# ------------------------------------------------------------------
def destroy_node ( self ) - > None :
if self . _device is not None :
try :
self . _device . clear ( )
self . _device . cleanup ( )
except Exception :
pass
super ( ) . destroy_node ( )
def main ( args = None ) :
rclpy . init ( args = args )
node = OledNode ( )
try :
rclpy . spin ( node )
except KeyboardInterrupt :
pass
finally :
node . destroy_node ( )
rclpy . shutdown ( )
if __name__ == ' __main__ ' :
main ( )