diff --git a/docker-compose.yml b/docker-compose.yml index 7018cd5..6f2baa1 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -61,6 +61,28 @@ services: - ROS_DOMAIN_ID=${ROS_DOMAIN_ID:-0} restart: unless-stopped + wifi: + build: + context: wifi + dockerfile: Dockerfile + platforms: + - linux/arm64 + image: raspbot_v2_wifi:latest + network_mode: host + cap_add: + - NET_ADMIN + volumes: + - /run/dbus/system_bus_socket:/run/dbus/system_bus_socket + environment: + - WIFI_SSID=${WIFI_SSID:-} + - HOTSPOT_SSID=${HOTSPOT_SSID:-raspbot-hotspot} + - HOTSPOT_PASSWORD=${HOTSPOT_PASSWORD:-raspbot1234} + - HOTSPOT_BAND=${HOTSPOT_BAND:-bg} + - WIFI_IFACE=${WIFI_IFACE:-} + - WIFI_TIMEOUT=${WIFI_TIMEOUT:-30} + - POLL_INTERVAL=${POLL_INTERVAL:-15} + restart: unless-stopped + webui: build: context: webui diff --git a/wifi/Dockerfile b/wifi/Dockerfile new file mode 100644 index 0000000..ce59495 --- /dev/null +++ b/wifi/Dockerfile @@ -0,0 +1,19 @@ +FROM debian:bookworm-slim + +RUN apt-get update && apt-get install -y --no-install-recommends \ + network-manager \ + python3 \ + && rm -rf /var/lib/apt/lists/* + +WORKDIR /app +COPY wifi_manager.py . + +ENV WIFI_SSID="" +ENV HOTSPOT_SSID="raspbot-hotspot" +ENV HOTSPOT_PASSWORD="raspbot1234" +ENV HOTSPOT_BAND="bg" +ENV WIFI_IFACE="" +ENV WIFI_TIMEOUT="30" +ENV POLL_INTERVAL="15" + +CMD ["python3", "wifi_manager.py"] diff --git a/wifi/wifi_manager.py b/wifi/wifi_manager.py new file mode 100644 index 0000000..bb57de8 --- /dev/null +++ b/wifi/wifi_manager.py @@ -0,0 +1,204 @@ +#!/usr/bin/env python3 +""" +WiFi hotspot fallback manager. + +Monitors whether the robot is connected to a known SSID. If the SSID is not +seen within WIFI_TIMEOUT seconds of startup, or if the connection is later +lost, it creates a NetworkManager WiFi hotspot so the robot remains reachable. +When the target SSID becomes visible again the hotspot is torn down and the +robot reconnects. + +Communicates with the host NetworkManager via nmcli (the host D-Bus socket is +bind-mounted into the container). +""" +import os +import subprocess +import time +import logging + +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s %(levelname)-7s %(message)s', + datefmt='%H:%M:%S', +) +log = logging.getLogger('wifi_manager') + +WIFI_SSID = os.environ.get('WIFI_SSID', '') +HOTSPOT_SSID = os.environ.get('HOTSPOT_SSID', 'raspbot-hotspot') +HOTSPOT_PASSWORD = os.environ.get('HOTSPOT_PASSWORD', 'raspbot1234') +HOTSPOT_BAND = os.environ.get('HOTSPOT_BAND', 'bg') # bg=2.4 GHz, a=5 GHz +WIFI_IFACE = os.environ.get('WIFI_IFACE', '') # auto-detect if empty +WIFI_TIMEOUT = int(os.environ.get('WIFI_TIMEOUT', '30')) +POLL_INTERVAL = int(os.environ.get('POLL_INTERVAL', '15')) +HOTSPOT_CON_NAME = 'raspbot-hotspot' + + +def _run(cmd: list[str], check=False, capture=True) -> subprocess.CompletedProcess: + return subprocess.run( + cmd, + capture_output=capture, + text=True, + check=check, + ) + + +def _nmcli(*args: str, check=False) -> subprocess.CompletedProcess: + return _run(['nmcli', '--terse', '--colors', 'no', *args], check=check) + + +def get_wifi_interface() -> str | None: + """Return the name of the first WiFi interface NM knows about.""" + r = _nmcli('-f', 'DEVICE,TYPE', 'device', 'status') + for line in r.stdout.splitlines(): + parts = line.split(':') + if len(parts) >= 2 and parts[1] == 'wifi': + return parts[0] + return None + + +def is_connected_to(ssid: str) -> bool: + """Return True if any WiFi connection with this SSID is currently active.""" + r = _nmcli('-f', 'NAME,TYPE,STATE', 'connection', 'show', '--active') + for line in r.stdout.splitlines(): + parts = line.split(':') + if len(parts) >= 3 and parts[2].strip() == 'activated': + # Also check active device connection SSID via device status + pass + # More reliable: check active connection SSID via device + r2 = _nmcli('-f', 'ACTIVE-CONNECTION.ID,GENERAL.CONNECTION', 'device', 'show') + # Simplest approach: scan active connections for the SSID + r3 = _nmcli('-f', 'ACTIVE,SSID', 'device', 'wifi', 'list') + for line in r3.stdout.splitlines(): + parts = line.split(':', 1) + if len(parts) == 2 and parts[0].strip() == 'yes' and parts[1].strip() == ssid: + return True + return False + + +def ssid_visible(ssid: str, iface: str) -> bool: + """Return True if the target SSID appears in a WiFi scan.""" + r = _nmcli('-f', 'SSID', 'device', 'wifi', 'list', 'ifname', iface) + for line in r.stdout.splitlines(): + if line.strip() == ssid: + return True + return False + + +def hotspot_active() -> bool: + """Return True if our hotspot connection is currently active.""" + r = _nmcli('-f', 'NAME,STATE', 'connection', 'show', '--active') + for line in r.stdout.splitlines(): + parts = line.split(':') + if len(parts) >= 2 and parts[0] == HOTSPOT_CON_NAME and 'activated' in parts[1]: + return True + return False + + +def start_hotspot(iface: str) -> bool: + """Bring up a WiFi hotspot on *iface*. Returns True on success.""" + # Remove any stale connection profile first + _nmcli('connection', 'delete', HOTSPOT_CON_NAME) + + log.info('Starting hotspot SSID=%s on %s', HOTSPOT_SSID, iface) + r = _nmcli( + 'device', 'wifi', 'hotspot', + 'ifname', iface, + 'con-name', HOTSPOT_CON_NAME, + 'ssid', HOTSPOT_SSID, + 'band', HOTSPOT_BAND, + 'password', HOTSPOT_PASSWORD, + ) + if r.returncode != 0: + log.error('Failed to start hotspot: %s', r.stderr.strip()) + return False + log.info('Hotspot active — SSID=%s password=%s', HOTSPOT_SSID, HOTSPOT_PASSWORD) + return True + + +def stop_hotspot() -> None: + """Tear down the hotspot connection profile.""" + log.info('Stopping hotspot') + _nmcli('connection', 'down', HOTSPOT_CON_NAME) + _nmcli('connection', 'delete', HOTSPOT_CON_NAME) + + +def connect_to_ssid(ssid: str) -> bool: + """Ask NetworkManager to connect to a previously-known SSID.""" + log.info('Connecting to %s', ssid) + r = _nmcli('connection', 'up', 'id', ssid) + if r.returncode != 0: + # Fall back to device connect (works if there's a saved profile) + r2 = _nmcli('device', 'wifi', 'connect', ssid) + if r2.returncode != 0: + log.warning('Could not reconnect to %s: %s', ssid, r2.stderr.strip()) + return False + return True + + +def wait_for_ssid(ssid: str, iface: str, timeout: int) -> bool: + """Poll until *ssid* is connected or *timeout* seconds elapse.""" + deadline = time.monotonic() + timeout + while time.monotonic() < deadline: + if is_connected_to(ssid): + return True + remaining = deadline - time.monotonic() + log.info('Waiting for %s (%.0fs remaining)…', ssid, max(remaining, 0)) + time.sleep(min(POLL_INTERVAL, max(remaining, 1))) + return False + + +def main() -> None: + if not WIFI_SSID: + log.error('WIFI_SSID not set — nothing to do.') + return + + iface = WIFI_IFACE or get_wifi_interface() + if not iface: + log.error('No WiFi interface found via NetworkManager.') + return + log.info('Using WiFi interface: %s', iface) + + in_hotspot_mode = False + + # ── startup: wait for the target SSID ─────────────────────────────────── + log.info('Waiting up to %ds for SSID=%s', WIFI_TIMEOUT, WIFI_SSID) + if wait_for_ssid(WIFI_SSID, iface, WIFI_TIMEOUT): + log.info('Connected to %s', WIFI_SSID) + else: + log.info('SSID not available — activating hotspot') + start_hotspot(iface) + in_hotspot_mode = True + + # ── main loop ──────────────────────────────────────────────────────────── + while True: + time.sleep(POLL_INTERVAL) + + if in_hotspot_mode: + # Trigger a fresh scan so NM updates the visible-SSIDs list + _nmcli('device', 'wifi', 'rescan', 'ifname', iface) + time.sleep(3) # give NM a moment to update scan results + + if ssid_visible(WIFI_SSID, iface): + log.info('Target SSID %s visible — switching back', WIFI_SSID) + stop_hotspot() + in_hotspot_mode = False + time.sleep(2) + if connect_to_ssid(WIFI_SSID): + log.info('Reconnected to %s', WIFI_SSID) + else: + log.warning('Reconnect failed — falling back to hotspot') + start_hotspot(iface) + in_hotspot_mode = True + else: + if not is_connected_to(WIFI_SSID): + log.warning('Lost connection to %s', WIFI_SSID) + # Brief retry before giving up + time.sleep(5) + if not is_connected_to(WIFI_SSID): + log.info('Connection lost — activating hotspot') + start_hotspot(iface) + in_hotspot_mode = True + + +if __name__ == '__main__': + main()