Add automatic hotspot if a known SSID is not available
This commit is contained in:
@@ -0,0 +1,19 @@
|
||||
FROM debian:bookworm-slim
|
||||
|
||||
RUN apt-get update && apt-get install -y --no-install-recommends \
|
||||
network-manager \
|
||||
python3 \
|
||||
&& rm -rf /var/lib/apt/lists/*
|
||||
|
||||
WORKDIR /app
|
||||
COPY wifi_manager.py .
|
||||
|
||||
ENV WIFI_SSID=""
|
||||
ENV HOTSPOT_SSID="raspbot-hotspot"
|
||||
ENV HOTSPOT_PASSWORD="raspbot1234"
|
||||
ENV HOTSPOT_BAND="bg"
|
||||
ENV WIFI_IFACE=""
|
||||
ENV WIFI_TIMEOUT="30"
|
||||
ENV POLL_INTERVAL="15"
|
||||
|
||||
CMD ["python3", "wifi_manager.py"]
|
||||
@@ -0,0 +1,204 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
WiFi hotspot fallback manager.
|
||||
|
||||
Monitors whether the robot is connected to a known SSID. If the SSID is not
|
||||
seen within WIFI_TIMEOUT seconds of startup, or if the connection is later
|
||||
lost, it creates a NetworkManager WiFi hotspot so the robot remains reachable.
|
||||
When the target SSID becomes visible again the hotspot is torn down and the
|
||||
robot reconnects.
|
||||
|
||||
Communicates with the host NetworkManager via nmcli (the host D-Bus socket is
|
||||
bind-mounted into the container).
|
||||
"""
|
||||
import os
|
||||
import subprocess
|
||||
import time
|
||||
import logging
|
||||
|
||||
logging.basicConfig(
|
||||
level=logging.INFO,
|
||||
format='%(asctime)s %(levelname)-7s %(message)s',
|
||||
datefmt='%H:%M:%S',
|
||||
)
|
||||
log = logging.getLogger('wifi_manager')
|
||||
|
||||
WIFI_SSID = os.environ.get('WIFI_SSID', '')
|
||||
HOTSPOT_SSID = os.environ.get('HOTSPOT_SSID', 'raspbot-hotspot')
|
||||
HOTSPOT_PASSWORD = os.environ.get('HOTSPOT_PASSWORD', 'raspbot1234')
|
||||
HOTSPOT_BAND = os.environ.get('HOTSPOT_BAND', 'bg') # bg=2.4 GHz, a=5 GHz
|
||||
WIFI_IFACE = os.environ.get('WIFI_IFACE', '') # auto-detect if empty
|
||||
WIFI_TIMEOUT = int(os.environ.get('WIFI_TIMEOUT', '30'))
|
||||
POLL_INTERVAL = int(os.environ.get('POLL_INTERVAL', '15'))
|
||||
HOTSPOT_CON_NAME = 'raspbot-hotspot'
|
||||
|
||||
|
||||
def _run(cmd: list[str], check=False, capture=True) -> subprocess.CompletedProcess:
|
||||
return subprocess.run(
|
||||
cmd,
|
||||
capture_output=capture,
|
||||
text=True,
|
||||
check=check,
|
||||
)
|
||||
|
||||
|
||||
def _nmcli(*args: str, check=False) -> subprocess.CompletedProcess:
|
||||
return _run(['nmcli', '--terse', '--colors', 'no', *args], check=check)
|
||||
|
||||
|
||||
def get_wifi_interface() -> str | None:
|
||||
"""Return the name of the first WiFi interface NM knows about."""
|
||||
r = _nmcli('-f', 'DEVICE,TYPE', 'device', 'status')
|
||||
for line in r.stdout.splitlines():
|
||||
parts = line.split(':')
|
||||
if len(parts) >= 2 and parts[1] == 'wifi':
|
||||
return parts[0]
|
||||
return None
|
||||
|
||||
|
||||
def is_connected_to(ssid: str) -> bool:
|
||||
"""Return True if any WiFi connection with this SSID is currently active."""
|
||||
r = _nmcli('-f', 'NAME,TYPE,STATE', 'connection', 'show', '--active')
|
||||
for line in r.stdout.splitlines():
|
||||
parts = line.split(':')
|
||||
if len(parts) >= 3 and parts[2].strip() == 'activated':
|
||||
# Also check active device connection SSID via device status
|
||||
pass
|
||||
# More reliable: check active connection SSID via device
|
||||
r2 = _nmcli('-f', 'ACTIVE-CONNECTION.ID,GENERAL.CONNECTION', 'device', 'show')
|
||||
# Simplest approach: scan active connections for the SSID
|
||||
r3 = _nmcli('-f', 'ACTIVE,SSID', 'device', 'wifi', 'list')
|
||||
for line in r3.stdout.splitlines():
|
||||
parts = line.split(':', 1)
|
||||
if len(parts) == 2 and parts[0].strip() == 'yes' and parts[1].strip() == ssid:
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
def ssid_visible(ssid: str, iface: str) -> bool:
|
||||
"""Return True if the target SSID appears in a WiFi scan."""
|
||||
r = _nmcli('-f', 'SSID', 'device', 'wifi', 'list', 'ifname', iface)
|
||||
for line in r.stdout.splitlines():
|
||||
if line.strip() == ssid:
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
def hotspot_active() -> bool:
|
||||
"""Return True if our hotspot connection is currently active."""
|
||||
r = _nmcli('-f', 'NAME,STATE', 'connection', 'show', '--active')
|
||||
for line in r.stdout.splitlines():
|
||||
parts = line.split(':')
|
||||
if len(parts) >= 2 and parts[0] == HOTSPOT_CON_NAME and 'activated' in parts[1]:
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
def start_hotspot(iface: str) -> bool:
|
||||
"""Bring up a WiFi hotspot on *iface*. Returns True on success."""
|
||||
# Remove any stale connection profile first
|
||||
_nmcli('connection', 'delete', HOTSPOT_CON_NAME)
|
||||
|
||||
log.info('Starting hotspot SSID=%s on %s', HOTSPOT_SSID, iface)
|
||||
r = _nmcli(
|
||||
'device', 'wifi', 'hotspot',
|
||||
'ifname', iface,
|
||||
'con-name', HOTSPOT_CON_NAME,
|
||||
'ssid', HOTSPOT_SSID,
|
||||
'band', HOTSPOT_BAND,
|
||||
'password', HOTSPOT_PASSWORD,
|
||||
)
|
||||
if r.returncode != 0:
|
||||
log.error('Failed to start hotspot: %s', r.stderr.strip())
|
||||
return False
|
||||
log.info('Hotspot active — SSID=%s password=%s', HOTSPOT_SSID, HOTSPOT_PASSWORD)
|
||||
return True
|
||||
|
||||
|
||||
def stop_hotspot() -> None:
|
||||
"""Tear down the hotspot connection profile."""
|
||||
log.info('Stopping hotspot')
|
||||
_nmcli('connection', 'down', HOTSPOT_CON_NAME)
|
||||
_nmcli('connection', 'delete', HOTSPOT_CON_NAME)
|
||||
|
||||
|
||||
def connect_to_ssid(ssid: str) -> bool:
|
||||
"""Ask NetworkManager to connect to a previously-known SSID."""
|
||||
log.info('Connecting to %s', ssid)
|
||||
r = _nmcli('connection', 'up', 'id', ssid)
|
||||
if r.returncode != 0:
|
||||
# Fall back to device connect (works if there's a saved profile)
|
||||
r2 = _nmcli('device', 'wifi', 'connect', ssid)
|
||||
if r2.returncode != 0:
|
||||
log.warning('Could not reconnect to %s: %s', ssid, r2.stderr.strip())
|
||||
return False
|
||||
return True
|
||||
|
||||
|
||||
def wait_for_ssid(ssid: str, iface: str, timeout: int) -> bool:
|
||||
"""Poll until *ssid* is connected or *timeout* seconds elapse."""
|
||||
deadline = time.monotonic() + timeout
|
||||
while time.monotonic() < deadline:
|
||||
if is_connected_to(ssid):
|
||||
return True
|
||||
remaining = deadline - time.monotonic()
|
||||
log.info('Waiting for %s (%.0fs remaining)…', ssid, max(remaining, 0))
|
||||
time.sleep(min(POLL_INTERVAL, max(remaining, 1)))
|
||||
return False
|
||||
|
||||
|
||||
def main() -> None:
|
||||
if not WIFI_SSID:
|
||||
log.error('WIFI_SSID not set — nothing to do.')
|
||||
return
|
||||
|
||||
iface = WIFI_IFACE or get_wifi_interface()
|
||||
if not iface:
|
||||
log.error('No WiFi interface found via NetworkManager.')
|
||||
return
|
||||
log.info('Using WiFi interface: %s', iface)
|
||||
|
||||
in_hotspot_mode = False
|
||||
|
||||
# ── startup: wait for the target SSID ───────────────────────────────────
|
||||
log.info('Waiting up to %ds for SSID=%s', WIFI_TIMEOUT, WIFI_SSID)
|
||||
if wait_for_ssid(WIFI_SSID, iface, WIFI_TIMEOUT):
|
||||
log.info('Connected to %s', WIFI_SSID)
|
||||
else:
|
||||
log.info('SSID not available — activating hotspot')
|
||||
start_hotspot(iface)
|
||||
in_hotspot_mode = True
|
||||
|
||||
# ── main loop ────────────────────────────────────────────────────────────
|
||||
while True:
|
||||
time.sleep(POLL_INTERVAL)
|
||||
|
||||
if in_hotspot_mode:
|
||||
# Trigger a fresh scan so NM updates the visible-SSIDs list
|
||||
_nmcli('device', 'wifi', 'rescan', 'ifname', iface)
|
||||
time.sleep(3) # give NM a moment to update scan results
|
||||
|
||||
if ssid_visible(WIFI_SSID, iface):
|
||||
log.info('Target SSID %s visible — switching back', WIFI_SSID)
|
||||
stop_hotspot()
|
||||
in_hotspot_mode = False
|
||||
time.sleep(2)
|
||||
if connect_to_ssid(WIFI_SSID):
|
||||
log.info('Reconnected to %s', WIFI_SSID)
|
||||
else:
|
||||
log.warning('Reconnect failed — falling back to hotspot')
|
||||
start_hotspot(iface)
|
||||
in_hotspot_mode = True
|
||||
else:
|
||||
if not is_connected_to(WIFI_SSID):
|
||||
log.warning('Lost connection to %s', WIFI_SSID)
|
||||
# Brief retry before giving up
|
||||
time.sleep(5)
|
||||
if not is_connected_to(WIFI_SSID):
|
||||
log.info('Connection lost — activating hotspot')
|
||||
start_hotspot(iface)
|
||||
in_hotspot_mode = True
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
Reference in New Issue
Block a user