MQTT = Message Queuing Telemetry Transport, Message Bus for Enterprise Orchestration
Here’s the nutshell up front, then a complete, commented, runnable mini-stack you can drop onto a Raspberry Pi 5 + an NVIDIA Jetson + a laptop/phone (for smart-glasses/Web UI):
Topic convention (namespaced, role-aware):
Example: site/sd-shop/edge/pi5-A/sensor/temp, site/sd-shop/voice/glasses-12/intent
Security essentials:
–––
docker-compose.yml
version: "3.9"<br>services:<br> mqtt:<br> image: eclipse-mosquitto:2<br> container_name: mqtt<br> restart: unless-stopped<br> ports:<br> - "1883:1883" # MQTT TCP<br> - "9001:9001" # MQTT over WebSocket (for glasses/web)<br> volumes:<br> - ./mosquitto.conf:/mosquitto/config/mosquitto.conf:ro<br> - ./data/mosquitto:/mosquitto/data<br> - ./log/mosquitto:/mosquitto/log<br> # Optional: a lightweight MQTT web console could be added later<br>
mosquitto.conf
persistence true<br>persistence_location /mosquitto/data/<br>log_dest file /mosquitto/log/mosquitto.log<br>listener 1883<br>allow_anonymous false<br>password_file /mosquitto/config/passwd<br><br># WebSockets for smart glasses / web HUD<br>listener 9001<br>protocol websockets<br><br># (For production add TLS listeners and point to certs)<br># listener 8883<br># cafile /mosquitto/config/ca.crt<br># certfile /mosquitto/config/server.crt<br># keyfile /mosquitto/config/server.key<br>
.env.example
MQTT_HOST=localhost<br>MQTT_PORT=1883<br>MQTT_WS=ws://localhost:9001/mqtt<br>MQTT_USER=shopuser<br>MQTT_PASS=shoppass<br>SITE_ID=sd-shop<br>PI_DEVICE_ID=pi5-A<br>JETSON_NODE_ID=orin-1<br>
requirements.txt
paho-mqtt==1.6.1<br>faster-whisper==1.0.3<br>websockets==12.0<br>uvicorn==0.30.1<br>fastapi==0.115.0<br>python-dotenv==1.0.1<br>opencv-python==4.10.0.84<br>numpy==1.26.4<br>
lib/schema.py
# Shared message helpers to keep payloads consistent.<br># Use simple dicts; easy for MQTT and logging.<br><br>from datetime import datetime, timezone<br>from typing import Dict, Any<br><br>def now_iso():<br> return datetime.now(timezone.utc).isoformat()<br><br>def sensor_payload(site_id: str, device_id: str, name: str, value: Any, unit: str = "", meta: Dict[str, Any] = None):<br> return {<br> "ts": now_iso(),<br> "siteId": site_id,<br> "deviceId": device_id,<br> "type": "sensor",<br> "name": name,<br> "value": value,<br> "unit": unit,<br> "meta": meta or {}<br> }<br><br>def detection_payload(site_id: str, node_id: str, stream: str, label: str, score: float, bbox=None, meta=None):<br> return {<br> "ts": now_iso(),<br> "siteId": site_id,<br> "nodeId": node_id,<br> "type": "detection",<br> "stream": stream,<br> "label": label,<br> "score": score,<br> "bbox": bbox or [],<br> "meta": meta or {}<br> }<br><br>def intent_payload(site_id: str, device_id: str, intent: str, slots=None, transcript:str=""):<br> return {<br> "ts": now_iso(),<br> "siteId": site_id,<br> "deviceId": device_id,<br> "type": "intent",<br> "intent": intent, # e.g., "OPEN_BAY", "SET_FAN", "WHERE_IS"<br> "slots": slots or {}, # e.g., {"bay":"1"} or {"percent":60}<br> "transcript": transcript<br> }<br><br>def command_payload(site_id: str, target: str, action: str, args=None, reason:str=""):<br> return {<br> "ts": now_iso(),<br> "siteId": site_id,<br> "target": target, # e.g., "edge/pi5-A/fan" or "edge/*/doors/bay1"<br> "action": action, # e.g., "SET", "OPEN", "CLOSE"<br> "args": args or {},<br> "reason": reason<br> }<br><br>def notify_payload(site_id: str, role: str, text: str, severity:str="info", meta=None):<br> return {<br> "ts": now_iso(),<br> "siteId": site_id,<br> "role": role, # "tech", "safety", "admin"<br> "text": text,<br> "severity": severity,<br> "meta": meta or {}<br> }<br>
services/pi_edge/sensor_publisher.py
# Raspberry Pi 5 edge publisher:<br># - Publishes temperature/humidity and a fake door sensor.<br># - Subscribes for control commands (e.g., fan setpoint) to show bidirectional flow.<br><br>import os, json, random, time<br>from dotenv import load_dotenv<br>import paho.mqtt.client as mqtt<br>from lib.schema import sensor_payload<br>load_dotenv()<br><br>MQTT_HOST = os.getenv("MQTT_HOST","localhost")<br>MQTT_PORT = int(os.getenv("MQTT_PORT","1883"))<br>MQTT_USER = os.getenv("MQTT_USER","shopuser")<br>MQTT_PASS = os.getenv("MQTT_PASS","shoppass")<br>SITE_ID = os.getenv("SITE_ID","sd-shop")<br>DEVICE_ID = os.getenv("PI_DEVICE_ID","pi5-A")<br><br># Topic helpers<br>def t_sensor(name): # publishes<br> return f"site/{SITE_ID}/edge/{DEVICE_ID}/sensor/{name}"<br><br>CMD_TOPIC = f"site/{SITE_ID}/cmd/edge/{DEVICE_ID}/#"<br><br>fan_setpoint = 0 # pretend we have a PWM fan<br><br>def on_connect(client, userdata, flags, rc):<br> print("Pi edge connected rc=", rc)<br> client.subscribe(CMD_TOPIC, qos=1)<br><br>def on_message(client, userdata, msg):<br> global fan_setpoint<br> try:<br> payload = json.loads(msg.payload.decode("utf-8"))<br> if payload.get("action") == "SET" and "percent" in payload.get("args",{}):<br> fan_setpoint = int(payload["args"]["percent"])<br> print(f"[EDGE] Fan setpoint updated -> {fan_setpoint}% (reason: {payload.get('reason','')})")<br> else:<br> print("[EDGE] Unknown command:", payload)<br> except Exception as e:<br> print("Command parse error:", e)<br><br>client = mqtt.Client(client_id=f"pi-edge-{DEVICE_ID}", clean_session=True)<br>client.username_pw_set(MQTT_USER, MQTT_PASS)<br>client.on_connect = on_connect<br>client.on_message = on_message<br>client.will_set(f"site/{SITE_ID}/edge/{DEVICE_ID}/status", "offline", retain=True, qos=1)<br>client.connect(MQTT_HOST, MQTT_PORT, 60)<br>client.loop_start()<br><br># Mark this node online<br>client.publish(f"site/{SITE_ID}/edge/{DEVICE_ID}/status", "online", retain=True, qos=1)<br><br>try:<br> while True:<br> # Replace with real sensor reads (e.g., from I2C, SPI, GPIO)<br> temp_c = round(20 + random.uniform(-1.0, 3.0), 2)<br> hum = round(50 + random.uniform(-3.0, 5.0), 2)<br> door = random.choice(["open","closed"]) if random.random()<0.05 else "closed"<br><br> client.publish(t_sensor("tempC"), json.dumps(sensor_payload(SITE_ID, DEVICE_ID, "tempC", temp_c, "C")), qos=1, retain=True)<br> client.publish(t_sensor("humidity"), json.dumps(sensor_payload(SITE_ID, DEVICE_ID, "humidity", hum, "%")), qos=1)<br> client.publish(t_sensor("bay1Door"), json.dumps(sensor_payload(SITE_ID, DEVICE_ID, "bay1Door", door)), qos=1)<br><br> # Report our current fan setpoint as a "sensor" for visibility<br> client.publish(t_sensor("fanSetpoint"), json.dumps(sensor_payload(SITE_ID, DEVICE_ID, "fanSetpoint", fan_setpoint, "%")), qos=1, retain=True)<br><br> time.sleep(3)<br>except KeyboardInterrupt:<br> pass<br>finally:<br> client.loop_stop()<br> client.disconnect()<br>
services/jetson_vision/vision_publisher.py
# Jetson Orin vision publisher:<br># - Simulates detections from a camera stream.<br># - Replace simulate_detections() with your actual CV/DeepStream/YOLO code.<br><br>import os, json, time, random<br>from dotenv import load_dotenv<br>import paho.mqtt.client as mqtt<br>from lib.schema import detection_payload<br>load_dotenv()<br><br>MQTT_HOST = os.getenv("MQTT_HOST","localhost")<br>MQTT_PORT = int(os.getenv("MQTT_PORT","1883"))<br>MQTT_USER = os.getenv("MQTT_USER","shopuser")<br>MQTT_PASS = os.getenv("MQTT_PASS","shoppass")<br>SITE_ID = os.getenv("SITE_ID","sd-shop")<br>NODE_ID = os.getenv("JETSON_NODE_ID","orin-1")<br>STREAM = "bay1-cam"<br><br>def topic():<br> return f"site/{SITE_ID}/jetson/{NODE_ID}/vision/{STREAM}"<br><br>client = mqtt.Client(client_id=f"jetson-{NODE_ID}", clean_session=True)<br>client.username_pw_set(MQTT_USER, MQTT_PASS)<br>client.will_set(f"site/{SITE_ID}/jetson/{NODE_ID}/status", "offline", retain=True, qos=1)<br>client.connect(MQTT_HOST, MQTT_PORT, 60)<br>client.loop_start()<br>client.publish(f"site/{SITE_ID}/jetson/{NODE_ID}/status", "online", retain=True, qos=1)<br><br>labels = ["person", "forklift", "no-hardhat", "spill", "tool"]<br><br>try:<br> while True:<br> label = random.choice(labels)<br> score = round(random.uniform(0.55, 0.99), 2)<br> det = detection_payload(SITE_ID, NODE_ID, STREAM, label, score, bbox=[100,120,200,250])<br> client.publish(topic(), json.dumps(det), qos=1)<br> time.sleep(5)<br>except KeyboardInterrupt:<br> pass<br>finally:<br> client.loop_stop()<br> client.disconnect()<br>
services/voice_transcriber/stt_server.py
# Voice → Intent service:<br># - Accepts short WAV/MP3 uploads via FastAPI endpoint /api/transcribe<br># - Transcribes with faster-whisper (local, Jetson-friendly if CUDA available)<br># - Very simple intent parser turns transcript into {intent, slots}<br># - Publishes to MQTT for the orchestrator.<br><br>import os, io, json<br>from dotenv import load_dotenv<br>from fastapi import FastAPI, UploadFile, File<br>import uvicorn<br>import paho.mqtt.client as mqtt<br>from faster_whisper import WhisperModel<br>from lib.schema import intent_payload<br><br>load_dotenv()<br>MQTT_HOST = os.getenv("MQTT_HOST","localhost")<br>MQTT_PORT = int(os.getenv("MQTT_PORT","1883"))<br>MQTT_USER = os.getenv("MQTT_USER","shopuser")<br>MQTT_PASS = os.getenv("MQTT_PASS","shoppass")<br>SITE_ID = os.getenv("SITE_ID","sd-shop")<br><br>DEVICE_ID = "stt-1" # could be the ID of the glasses or mic device<br><br># Load lightweight model first; swap to medium/large as hardware allows<br>model = WhisperModel("small", device="auto", compute_type="auto")<br><br>client = mqtt.Client(client_id="stt-server", clean_session=True)<br>client.username_pw_set(MQTT_USER, MQTT_PASS)<br>client.connect(MQTT_HOST, MQTT_PORT, 60)<br>client.loop_start()<br><br>app = FastAPI()<br><br>def topic_intent(dev_id):<br> return f"site/{SITE_ID}/voice/{dev_id}/intent"<br><br>def parse_intent(text: str):<br> t = text.lower()<br> # naive parser to prove the pipeline; replace with Rasa/LUIS/Transformers later<br> if "open bay" in t:<br> return "OPEN_BAY", {"bay": "".join([c for c in t if c.isdigit()]) or "1"}<br> if "close bay" in t:<br> return "CLOSE_BAY", {"bay": "".join([c for c in t if c.isdigit()]) or "1"}<br> if "set fan" in t or "fan" in t:<br> # extract percent<br> import re<br> m = re.search(r"(\d+)\s*%", t)<br> pct = int(m.group(1)) if m else 50<br> return "SET_FAN", {"percent": pct}<br> if "where is" in t:<br> # e.g., "where is torque wrench"<br> item = t.split("where is")[-1].strip()<br> return "WHERE_IS", {"item": item}<br> return "UNKNOWN", {}<br><br>@app.post("/api/transcribe")<br>async def transcribe(file: UploadFile = File(...), deviceId: str = "glasses-00"):<br> audio_bytes = await file.read()<br> segments, info = model.transcribe(io.BytesIO(audio_bytes), beam_size=1)<br> transcript = " ".join([seg.text for seg in segments]).strip()<br><br> intent, slots = parse_intent(transcript)<br> payload = intent_payload(SITE_ID, deviceId, intent, slots, transcript)<br> client.publish(topic_intent(deviceId), json.dumps(payload), qos=1)<br> return {"intent": intent, "slots": slots, "transcript": transcript}<br><br>if __name__ == "__main__":<br> uvicorn.run(app, host="0.0.0.0", port=8088)<br>
services/orchestrator/command_router.py
# Orchestrator:<br># - Subscribes to intents, detections, and key sensors.<br># - Applies rules → emits commands & human notifications.<br># - This is where "enterprise behavior" lives; expand with a rules engine later.<br><br>import os, json<br>from dotenv import load_dotenv<br>import paho.mqtt.client as mqtt<br>from lib.schema import command_payload, notify_payload<br>load_dotenv()<br><br>MQTT_HOST = os.getenv("MQTT_HOST","localhost")<br>MQTT_PORT = int(os.getenv("MQTT_PORT","1883"))<br>MQTT_USER = os.getenv("MQTT_USER","shopuser")<br>MQTT_PASS = os.getenv("MQTT_PASS","shoppass")<br>SITE_ID = os.getenv("SITE_ID","sd-shop")<br><br>INTENTS = f"site/{SITE_ID}/voice/+/intent"<br>DETECT = f"site/{SITE_ID}/jetson/+/vision/+"<br>SENSORS = f"site/{SITE_ID}/edge/+/sensor/+"<br><br>client = mqtt.Client(client_id="orchestrator", clean_session=True)<br>client.username_pw_set(MQTT_USER, MQTT_PASS)<br><br>def publish_cmd(target: str, action: str, args=None, reason=""):<br> topic = f"site/{SITE_ID}/cmd/{target}"<br> payload = command_payload(SITE_ID, target, action, args or {}, reason)<br> client.publish(topic, json.dumps(payload), qos=1)<br><br>def notify(role: str, text: str, severity="info", meta=None):<br> topic = f"site/{SITE_ID}/notify/{role}/general"<br> payload = notify_payload(SITE_ID, role, text, severity, meta)<br> client.publish(topic, json.dumps(payload), qos=0)<br><br>def on_connect(c,u,f,rc):<br> print("Orchestrator connected rc=", rc)<br> c.subscribe(INTENTS, qos=1)<br> c.subscribe(DETECT, qos=1)<br> c.subscribe(SENSORS, qos=0)<br><br>def handle_intent(payload):<br> intent = payload.get("intent")<br> slots = payload.get("slots", {})<br> dev = payload.get("deviceId","unknown")<br> if intent == "OPEN_BAY":<br> bay = slots.get("bay","1")<br> publish_cmd(f"edge/+/doors/bay{bay}", "OPEN", reason=f"Voice by {dev}")<br> notify("tech", f"Opening Bay {bay} (via {dev})")<br> elif intent == "CLOSE_BAY":<br> bay = slots.get("bay","1")<br> publish_cmd(f"edge/+/doors/bay{bay}", "CLOSE", reason=f"Voice by {dev}")<br> notify("tech", f"Closing Bay {bay} (via {dev})")<br> elif intent == "SET_FAN":<br> pct = int(slots.get("percent",50))<br> publish_cmd("edge/pi5-A/fan", "SET", args={"percent": pct}, reason=f"Voice by {dev}")<br> notify("tech", f"Setting exhaust fan to {pct}% (via {dev})")<br> elif intent == "WHERE_IS":<br> item = slots.get("item","unknown tool")<br> # In a real system you'd query inventory/RTLS. Here we notify.<br> notify("tech", f"Last seen location for {item}: Tool Cage A")<br> else:<br> notify("tech", f"Unrecognized voice command from {dev}: {payload.get('transcript','')}", "warning")<br><br>def handle_detection(payload):<br> label = payload.get("label")<br> score = payload.get("score",0)<br> if label == "no-hardhat" and score > 0.7:<br> notify("safety", "Hardhat missing detected at Bay 1", "warning", {"score": score})<br> if label == "spill" and score > 0.6:<br> publish_cmd("edge/+/beacon/floor", "SET", args={"color":"yellow","blink":True})<br> notify("safety", "Spill detected near Bay 1. Beacon activated.", "critical")<br><br>def handle_sensor(topic, payload):<br> # Example: auto-open fans if temp too high<br> if topic.endswith("/sensor/tempC"):<br> val = payload.get("value", 0)<br> if val > 28:<br> publish_cmd("edge/pi5-A/fan", "SET", args={"percent": 70}, reason="Temp > 28C")<br> notify("tech", "Auto cooling: fan set to 70% (temp high)")<br> # Door status could drive geofencing, etc.<br><br>def on_message(c,u,msg):<br> try:<br> payload = json.loads(msg.payload.decode("utf-8"))<br> except:<br> print("Bad JSON on", msg.topic); return<br><br> topic = msg.topic<br> if "/voice/" in topic and topic.endswith("/intent"):<br> handle_intent(payload)<br> elif "/vision/" in topic:<br> handle_detection(payload)<br> elif "/sensor/" in topic:<br> handle_sensor(topic, payload)<br><br>client.on_connect = on_connect<br>client.on_message = on_message<br>client.connect(MQTT_HOST, MQTT_PORT, 60)<br>client.loop_forever()<br>
services/glasses_client/glasses.html
<!-- Smart-glasses / phone HUD:<br> - Connects via MQTT over WebSockets<br> - Shows live notifications and lets you push simple voice intents by text (demo)<br> - For real glasses: use the device’s browser or wrap with a WebView. --><br><br><!DOCTYPE html><br><html><br><head><br> <meta charset="utf-8" /><br> <title>Empire HUD</title><br> <meta name="viewport" content="width=device-width, initial-scale=1" /><br> <style><br> body { font-family: system-ui, sans-serif; background:#0a0f18; color:#eaf6ff; padding:16px; }<br> .card { background:#0d1624; border:1px solid #113a59; border-radius:16px; padding:16px; margin-bottom:12px; box-shadow:0 0 20px rgba(0,204,255,0.1); }<br> input, button { font-size:16px; padding:10px; border-radius:10px; border:1px solid #113a59; background:#09101a; color:#eaf6ff; }<br> button { cursor: pointer; }<br> .notif { border-left:4px solid #3fb; padding-left:12px; }<br> </style><br></head><br><body><br> <div class="card"><br> <h2>Empire HUD</h2><br> <div>Status: <span id="status">disconnected</span></div><br> <div><small>MQTT over WS: <code id="ws"></code></small></div><br> </div><br><br> <div class="card"><br> <h3>Quick Command (demo)</h3><br> <p>Examples: "open bay 1", "set fan 60%", "where is torque wrench"</p><br> <input id="cmd" placeholder="type a command…" /><br> <button onclick="sendIntent()">Send</button><br> </div><br><br> <div class="card"><br> <h3>Notifications</h3><br> <div id="feed"></div><br> </div><br><br> <!-- Paho MQTT JS (CDN) --><br> <script src="https://unpkg.com/[email protected]/paho-mqtt-min.js"></script><br> <script><br> const SITE_ID = 'sd-shop';<br> const DEVICE_ID = 'glasses-12';<br> const WS_URL = (new URLSearchParams(location.search)).get('ws') || 'ws://localhost:9001/mqtt';<br> document.getElementById('ws').textContent = WS_URL;<br><br> const client = new Paho.MQTT.Client(WS_URL, "hud-" + Math.random().toString(16).slice(2));<br> client.onConnectionLost = (resp) => { document.getElementById('status').textContent = "lost"; };<br> client.onMessageArrived = (msg) => {<br> const el = document.createElement('div');<br> el.className = 'notif';<br> try {<br> const p = JSON.parse(msg.payloadString);<br> el.textContent = `[${p.severity}] ${p.text}`;<br> } catch { el.textContent = msg.payloadString; }<br> document.getElementById('feed').prepend(el);<br> };<br><br> client.connect({<br> timeout: 5,<br> onSuccess: () => {<br> document.getElementById('status').textContent = "connected";<br> client.subscribe(`site/${SITE_ID}/notify/+/+`, {qos:0});<br> },<br> onFailure: () => document.getElementById('status').textContent = "failed",<br> useSSL: false, // set true when you enable TLS for WS<br> userName: "shopuser", // match mosquitto credentials<br> password: "shoppass"<br> });<br><br> function sendIntent(){<br> const text = document.getElementById('cmd').value || '';<br> if(!text) return;<br> const payload = {<br> ts: new Date().toISOString(),<br> siteId: SITE_ID,<br> deviceId: DEVICE_ID,<br> type: "intent",<br> intent: "FREE_TEXT",<br> slots: {},<br> transcript: text<br> };<br> // Let the orchestrator handle unknowns or add a tiny local parser here<br> const msg = new Paho.MQTT.Message(JSON.stringify<br>
Here’s the nutshell up front, then a complete, commented, runnable mini-stack you can drop onto a Raspberry Pi 5 + an NVIDIA Jetson + a laptop/phone (for smart-glasses/Web UI):
- MQTT = Message Queuing Telemetry Transport. It’s a super-light pub/sub protocol over TCP (often on port 1883 or 8883/TLS) designed for unreliable links and tiny devices. Core wins: topics, QoS (0/1/2), retained messages, will messages, tiny headers, and easy auth/TLS.
- Why it’s perfect here: it’s the “message bus” that lets your Pi sensors, Jetson vision, voice-to-text ML, and smart glasses publish/subscribe to the same live event fabric. Each component stays decoupled, but the enterprise behaves like one coordinated organism.
Architecture (practical & scalable)
- Broker (Eclipse Mosquitto) = the hub. Turn on username/password + TLS, define topic ACLs.
- Edge (Raspberry Pi 5) = publishes sensor readings (env, RFID/NFC, door states), subscribes to control commands.
- Jetson (Orin) = runs vision models, publishes detections (person/plate/tool), subscribes to inference control.
- Voice STT/NLU = transcribes mic audio into intents (“open bay one”, “start exhaust fan 60%”), publishes intents.
- Orchestrator = subscribes to intents + detections + sensors, emits deterministic actions and notifications.
- Smart Glasses / HUD (Web App) = connects via MQTT over WebSockets, shows alerts/tasks, supports push-to-talk.
- Logging/BI (optional) = a timeseries sink (e.g., Influx, SQL via worker) that subscribes to everything.
Topic convention (namespaced, role-aware):
- site/{siteId}/edge/{deviceId}/sensor/{name}
- site/{siteId}/jetson/{nodeId}/vision/{stream}
- site/{siteId}/voice/{deviceId}/intent
- site/{siteId}/cmd/{target}/{action}
- site/{siteId}/notify/{role}/{channel}
Example: site/sd-shop/edge/pi5-A/sensor/temp, site/sd-shop/voice/glasses-12/intent
Security essentials:
- TLS on 8883 + per-client creds, ACLs by prefix (least privilege).
- Use retained messages for “last known state” sensors.
- LWT (Last Will & Testament) on each client to flip health flags if it drops.
Files & Full Code (commented)
- docker-compose.yml
- mosquitto.conf
- .env.example
- requirements.txt
- lib/schema.py (shared payload helpers)
- services/pi_edge/sensor_publisher.py (Pi 5)
- services/jetson_vision/vision_publisher.py (Jetson)
- services/voice_transcriber/stt_server.py (voice → intent; Whisper/faster-whisper ready)
- services/orchestrator/command_router.py (enterprise rules / routing)
- services/glasses_client/glasses.html (HUD over WebSockets; works on a phone or smart-glasses browser)
–––
docker-compose.yml
version: "3.9"<br>services:<br> mqtt:<br> image: eclipse-mosquitto:2<br> container_name: mqtt<br> restart: unless-stopped<br> ports:<br> - "1883:1883" # MQTT TCP<br> - "9001:9001" # MQTT over WebSocket (for glasses/web)<br> volumes:<br> - ./mosquitto.conf:/mosquitto/config/mosquitto.conf:ro<br> - ./data/mosquitto:/mosquitto/data<br> - ./log/mosquitto:/mosquitto/log<br> # Optional: a lightweight MQTT web console could be added later<br>
mosquitto.conf
persistence true<br>persistence_location /mosquitto/data/<br>log_dest file /mosquitto/log/mosquitto.log<br>listener 1883<br>allow_anonymous false<br>password_file /mosquitto/config/passwd<br><br># WebSockets for smart glasses / web HUD<br>listener 9001<br>protocol websockets<br><br># (For production add TLS listeners and point to certs)<br># listener 8883<br># cafile /mosquitto/config/ca.crt<br># certfile /mosquitto/config/server.crt<br># keyfile /mosquitto/config/server.key<br>
.env.example
MQTT_HOST=localhost<br>MQTT_PORT=1883<br>MQTT_WS=ws://localhost:9001/mqtt<br>MQTT_USER=shopuser<br>MQTT_PASS=shoppass<br>SITE_ID=sd-shop<br>PI_DEVICE_ID=pi5-A<br>JETSON_NODE_ID=orin-1<br>
requirements.txt
paho-mqtt==1.6.1<br>faster-whisper==1.0.3<br>websockets==12.0<br>uvicorn==0.30.1<br>fastapi==0.115.0<br>python-dotenv==1.0.1<br>opencv-python==4.10.0.84<br>numpy==1.26.4<br>
lib/schema.py
# Shared message helpers to keep payloads consistent.<br># Use simple dicts; easy for MQTT and logging.<br><br>from datetime import datetime, timezone<br>from typing import Dict, Any<br><br>def now_iso():<br> return datetime.now(timezone.utc).isoformat()<br><br>def sensor_payload(site_id: str, device_id: str, name: str, value: Any, unit: str = "", meta: Dict[str, Any] = None):<br> return {<br> "ts": now_iso(),<br> "siteId": site_id,<br> "deviceId": device_id,<br> "type": "sensor",<br> "name": name,<br> "value": value,<br> "unit": unit,<br> "meta": meta or {}<br> }<br><br>def detection_payload(site_id: str, node_id: str, stream: str, label: str, score: float, bbox=None, meta=None):<br> return {<br> "ts": now_iso(),<br> "siteId": site_id,<br> "nodeId": node_id,<br> "type": "detection",<br> "stream": stream,<br> "label": label,<br> "score": score,<br> "bbox": bbox or [],<br> "meta": meta or {}<br> }<br><br>def intent_payload(site_id: str, device_id: str, intent: str, slots=None, transcript:str=""):<br> return {<br> "ts": now_iso(),<br> "siteId": site_id,<br> "deviceId": device_id,<br> "type": "intent",<br> "intent": intent, # e.g., "OPEN_BAY", "SET_FAN", "WHERE_IS"<br> "slots": slots or {}, # e.g., {"bay":"1"} or {"percent":60}<br> "transcript": transcript<br> }<br><br>def command_payload(site_id: str, target: str, action: str, args=None, reason:str=""):<br> return {<br> "ts": now_iso(),<br> "siteId": site_id,<br> "target": target, # e.g., "edge/pi5-A/fan" or "edge/*/doors/bay1"<br> "action": action, # e.g., "SET", "OPEN", "CLOSE"<br> "args": args or {},<br> "reason": reason<br> }<br><br>def notify_payload(site_id: str, role: str, text: str, severity:str="info", meta=None):<br> return {<br> "ts": now_iso(),<br> "siteId": site_id,<br> "role": role, # "tech", "safety", "admin"<br> "text": text,<br> "severity": severity,<br> "meta": meta or {}<br> }<br>
services/pi_edge/sensor_publisher.py
# Raspberry Pi 5 edge publisher:<br># - Publishes temperature/humidity and a fake door sensor.<br># - Subscribes for control commands (e.g., fan setpoint) to show bidirectional flow.<br><br>import os, json, random, time<br>from dotenv import load_dotenv<br>import paho.mqtt.client as mqtt<br>from lib.schema import sensor_payload<br>load_dotenv()<br><br>MQTT_HOST = os.getenv("MQTT_HOST","localhost")<br>MQTT_PORT = int(os.getenv("MQTT_PORT","1883"))<br>MQTT_USER = os.getenv("MQTT_USER","shopuser")<br>MQTT_PASS = os.getenv("MQTT_PASS","shoppass")<br>SITE_ID = os.getenv("SITE_ID","sd-shop")<br>DEVICE_ID = os.getenv("PI_DEVICE_ID","pi5-A")<br><br># Topic helpers<br>def t_sensor(name): # publishes<br> return f"site/{SITE_ID}/edge/{DEVICE_ID}/sensor/{name}"<br><br>CMD_TOPIC = f"site/{SITE_ID}/cmd/edge/{DEVICE_ID}/#"<br><br>fan_setpoint = 0 # pretend we have a PWM fan<br><br>def on_connect(client, userdata, flags, rc):<br> print("Pi edge connected rc=", rc)<br> client.subscribe(CMD_TOPIC, qos=1)<br><br>def on_message(client, userdata, msg):<br> global fan_setpoint<br> try:<br> payload = json.loads(msg.payload.decode("utf-8"))<br> if payload.get("action") == "SET" and "percent" in payload.get("args",{}):<br> fan_setpoint = int(payload["args"]["percent"])<br> print(f"[EDGE] Fan setpoint updated -> {fan_setpoint}% (reason: {payload.get('reason','')})")<br> else:<br> print("[EDGE] Unknown command:", payload)<br> except Exception as e:<br> print("Command parse error:", e)<br><br>client = mqtt.Client(client_id=f"pi-edge-{DEVICE_ID}", clean_session=True)<br>client.username_pw_set(MQTT_USER, MQTT_PASS)<br>client.on_connect = on_connect<br>client.on_message = on_message<br>client.will_set(f"site/{SITE_ID}/edge/{DEVICE_ID}/status", "offline", retain=True, qos=1)<br>client.connect(MQTT_HOST, MQTT_PORT, 60)<br>client.loop_start()<br><br># Mark this node online<br>client.publish(f"site/{SITE_ID}/edge/{DEVICE_ID}/status", "online", retain=True, qos=1)<br><br>try:<br> while True:<br> # Replace with real sensor reads (e.g., from I2C, SPI, GPIO)<br> temp_c = round(20 + random.uniform(-1.0, 3.0), 2)<br> hum = round(50 + random.uniform(-3.0, 5.0), 2)<br> door = random.choice(["open","closed"]) if random.random()<0.05 else "closed"<br><br> client.publish(t_sensor("tempC"), json.dumps(sensor_payload(SITE_ID, DEVICE_ID, "tempC", temp_c, "C")), qos=1, retain=True)<br> client.publish(t_sensor("humidity"), json.dumps(sensor_payload(SITE_ID, DEVICE_ID, "humidity", hum, "%")), qos=1)<br> client.publish(t_sensor("bay1Door"), json.dumps(sensor_payload(SITE_ID, DEVICE_ID, "bay1Door", door)), qos=1)<br><br> # Report our current fan setpoint as a "sensor" for visibility<br> client.publish(t_sensor("fanSetpoint"), json.dumps(sensor_payload(SITE_ID, DEVICE_ID, "fanSetpoint", fan_setpoint, "%")), qos=1, retain=True)<br><br> time.sleep(3)<br>except KeyboardInterrupt:<br> pass<br>finally:<br> client.loop_stop()<br> client.disconnect()<br>
services/jetson_vision/vision_publisher.py
# Jetson Orin vision publisher:<br># - Simulates detections from a camera stream.<br># - Replace simulate_detections() with your actual CV/DeepStream/YOLO code.<br><br>import os, json, time, random<br>from dotenv import load_dotenv<br>import paho.mqtt.client as mqtt<br>from lib.schema import detection_payload<br>load_dotenv()<br><br>MQTT_HOST = os.getenv("MQTT_HOST","localhost")<br>MQTT_PORT = int(os.getenv("MQTT_PORT","1883"))<br>MQTT_USER = os.getenv("MQTT_USER","shopuser")<br>MQTT_PASS = os.getenv("MQTT_PASS","shoppass")<br>SITE_ID = os.getenv("SITE_ID","sd-shop")<br>NODE_ID = os.getenv("JETSON_NODE_ID","orin-1")<br>STREAM = "bay1-cam"<br><br>def topic():<br> return f"site/{SITE_ID}/jetson/{NODE_ID}/vision/{STREAM}"<br><br>client = mqtt.Client(client_id=f"jetson-{NODE_ID}", clean_session=True)<br>client.username_pw_set(MQTT_USER, MQTT_PASS)<br>client.will_set(f"site/{SITE_ID}/jetson/{NODE_ID}/status", "offline", retain=True, qos=1)<br>client.connect(MQTT_HOST, MQTT_PORT, 60)<br>client.loop_start()<br>client.publish(f"site/{SITE_ID}/jetson/{NODE_ID}/status", "online", retain=True, qos=1)<br><br>labels = ["person", "forklift", "no-hardhat", "spill", "tool"]<br><br>try:<br> while True:<br> label = random.choice(labels)<br> score = round(random.uniform(0.55, 0.99), 2)<br> det = detection_payload(SITE_ID, NODE_ID, STREAM, label, score, bbox=[100,120,200,250])<br> client.publish(topic(), json.dumps(det), qos=1)<br> time.sleep(5)<br>except KeyboardInterrupt:<br> pass<br>finally:<br> client.loop_stop()<br> client.disconnect()<br>
services/voice_transcriber/stt_server.py
# Voice → Intent service:<br># - Accepts short WAV/MP3 uploads via FastAPI endpoint /api/transcribe<br># - Transcribes with faster-whisper (local, Jetson-friendly if CUDA available)<br># - Very simple intent parser turns transcript into {intent, slots}<br># - Publishes to MQTT for the orchestrator.<br><br>import os, io, json<br>from dotenv import load_dotenv<br>from fastapi import FastAPI, UploadFile, File<br>import uvicorn<br>import paho.mqtt.client as mqtt<br>from faster_whisper import WhisperModel<br>from lib.schema import intent_payload<br><br>load_dotenv()<br>MQTT_HOST = os.getenv("MQTT_HOST","localhost")<br>MQTT_PORT = int(os.getenv("MQTT_PORT","1883"))<br>MQTT_USER = os.getenv("MQTT_USER","shopuser")<br>MQTT_PASS = os.getenv("MQTT_PASS","shoppass")<br>SITE_ID = os.getenv("SITE_ID","sd-shop")<br><br>DEVICE_ID = "stt-1" # could be the ID of the glasses or mic device<br><br># Load lightweight model first; swap to medium/large as hardware allows<br>model = WhisperModel("small", device="auto", compute_type="auto")<br><br>client = mqtt.Client(client_id="stt-server", clean_session=True)<br>client.username_pw_set(MQTT_USER, MQTT_PASS)<br>client.connect(MQTT_HOST, MQTT_PORT, 60)<br>client.loop_start()<br><br>app = FastAPI()<br><br>def topic_intent(dev_id):<br> return f"site/{SITE_ID}/voice/{dev_id}/intent"<br><br>def parse_intent(text: str):<br> t = text.lower()<br> # naive parser to prove the pipeline; replace with Rasa/LUIS/Transformers later<br> if "open bay" in t:<br> return "OPEN_BAY", {"bay": "".join([c for c in t if c.isdigit()]) or "1"}<br> if "close bay" in t:<br> return "CLOSE_BAY", {"bay": "".join([c for c in t if c.isdigit()]) or "1"}<br> if "set fan" in t or "fan" in t:<br> # extract percent<br> import re<br> m = re.search(r"(\d+)\s*%", t)<br> pct = int(m.group(1)) if m else 50<br> return "SET_FAN", {"percent": pct}<br> if "where is" in t:<br> # e.g., "where is torque wrench"<br> item = t.split("where is")[-1].strip()<br> return "WHERE_IS", {"item": item}<br> return "UNKNOWN", {}<br><br>@app.post("/api/transcribe")<br>async def transcribe(file: UploadFile = File(...), deviceId: str = "glasses-00"):<br> audio_bytes = await file.read()<br> segments, info = model.transcribe(io.BytesIO(audio_bytes), beam_size=1)<br> transcript = " ".join([seg.text for seg in segments]).strip()<br><br> intent, slots = parse_intent(transcript)<br> payload = intent_payload(SITE_ID, deviceId, intent, slots, transcript)<br> client.publish(topic_intent(deviceId), json.dumps(payload), qos=1)<br> return {"intent": intent, "slots": slots, "transcript": transcript}<br><br>if __name__ == "__main__":<br> uvicorn.run(app, host="0.0.0.0", port=8088)<br>
services/orchestrator/command_router.py
# Orchestrator:<br># - Subscribes to intents, detections, and key sensors.<br># - Applies rules → emits commands & human notifications.<br># - This is where "enterprise behavior" lives; expand with a rules engine later.<br><br>import os, json<br>from dotenv import load_dotenv<br>import paho.mqtt.client as mqtt<br>from lib.schema import command_payload, notify_payload<br>load_dotenv()<br><br>MQTT_HOST = os.getenv("MQTT_HOST","localhost")<br>MQTT_PORT = int(os.getenv("MQTT_PORT","1883"))<br>MQTT_USER = os.getenv("MQTT_USER","shopuser")<br>MQTT_PASS = os.getenv("MQTT_PASS","shoppass")<br>SITE_ID = os.getenv("SITE_ID","sd-shop")<br><br>INTENTS = f"site/{SITE_ID}/voice/+/intent"<br>DETECT = f"site/{SITE_ID}/jetson/+/vision/+"<br>SENSORS = f"site/{SITE_ID}/edge/+/sensor/+"<br><br>client = mqtt.Client(client_id="orchestrator", clean_session=True)<br>client.username_pw_set(MQTT_USER, MQTT_PASS)<br><br>def publish_cmd(target: str, action: str, args=None, reason=""):<br> topic = f"site/{SITE_ID}/cmd/{target}"<br> payload = command_payload(SITE_ID, target, action, args or {}, reason)<br> client.publish(topic, json.dumps(payload), qos=1)<br><br>def notify(role: str, text: str, severity="info", meta=None):<br> topic = f"site/{SITE_ID}/notify/{role}/general"<br> payload = notify_payload(SITE_ID, role, text, severity, meta)<br> client.publish(topic, json.dumps(payload), qos=0)<br><br>def on_connect(c,u,f,rc):<br> print("Orchestrator connected rc=", rc)<br> c.subscribe(INTENTS, qos=1)<br> c.subscribe(DETECT, qos=1)<br> c.subscribe(SENSORS, qos=0)<br><br>def handle_intent(payload):<br> intent = payload.get("intent")<br> slots = payload.get("slots", {})<br> dev = payload.get("deviceId","unknown")<br> if intent == "OPEN_BAY":<br> bay = slots.get("bay","1")<br> publish_cmd(f"edge/+/doors/bay{bay}", "OPEN", reason=f"Voice by {dev}")<br> notify("tech", f"Opening Bay {bay} (via {dev})")<br> elif intent == "CLOSE_BAY":<br> bay = slots.get("bay","1")<br> publish_cmd(f"edge/+/doors/bay{bay}", "CLOSE", reason=f"Voice by {dev}")<br> notify("tech", f"Closing Bay {bay} (via {dev})")<br> elif intent == "SET_FAN":<br> pct = int(slots.get("percent",50))<br> publish_cmd("edge/pi5-A/fan", "SET", args={"percent": pct}, reason=f"Voice by {dev}")<br> notify("tech", f"Setting exhaust fan to {pct}% (via {dev})")<br> elif intent == "WHERE_IS":<br> item = slots.get("item","unknown tool")<br> # In a real system you'd query inventory/RTLS. Here we notify.<br> notify("tech", f"Last seen location for {item}: Tool Cage A")<br> else:<br> notify("tech", f"Unrecognized voice command from {dev}: {payload.get('transcript','')}", "warning")<br><br>def handle_detection(payload):<br> label = payload.get("label")<br> score = payload.get("score",0)<br> if label == "no-hardhat" and score > 0.7:<br> notify("safety", "Hardhat missing detected at Bay 1", "warning", {"score": score})<br> if label == "spill" and score > 0.6:<br> publish_cmd("edge/+/beacon/floor", "SET", args={"color":"yellow","blink":True})<br> notify("safety", "Spill detected near Bay 1. Beacon activated.", "critical")<br><br>def handle_sensor(topic, payload):<br> # Example: auto-open fans if temp too high<br> if topic.endswith("/sensor/tempC"):<br> val = payload.get("value", 0)<br> if val > 28:<br> publish_cmd("edge/pi5-A/fan", "SET", args={"percent": 70}, reason="Temp > 28C")<br> notify("tech", "Auto cooling: fan set to 70% (temp high)")<br> # Door status could drive geofencing, etc.<br><br>def on_message(c,u,msg):<br> try:<br> payload = json.loads(msg.payload.decode("utf-8"))<br> except:<br> print("Bad JSON on", msg.topic); return<br><br> topic = msg.topic<br> if "/voice/" in topic and topic.endswith("/intent"):<br> handle_intent(payload)<br> elif "/vision/" in topic:<br> handle_detection(payload)<br> elif "/sensor/" in topic:<br> handle_sensor(topic, payload)<br><br>client.on_connect = on_connect<br>client.on_message = on_message<br>client.connect(MQTT_HOST, MQTT_PORT, 60)<br>client.loop_forever()<br>
services/glasses_client/glasses.html
<!-- Smart-glasses / phone HUD:<br> - Connects via MQTT over WebSockets<br> - Shows live notifications and lets you push simple voice intents by text (demo)<br> - For real glasses: use the device’s browser or wrap with a WebView. --><br><br><!DOCTYPE html><br><html><br><head><br> <meta charset="utf-8" /><br> <title>Empire HUD</title><br> <meta name="viewport" content="width=device-width, initial-scale=1" /><br> <style><br> body { font-family: system-ui, sans-serif; background:#0a0f18; color:#eaf6ff; padding:16px; }<br> .card { background:#0d1624; border:1px solid #113a59; border-radius:16px; padding:16px; margin-bottom:12px; box-shadow:0 0 20px rgba(0,204,255,0.1); }<br> input, button { font-size:16px; padding:10px; border-radius:10px; border:1px solid #113a59; background:#09101a; color:#eaf6ff; }<br> button { cursor: pointer; }<br> .notif { border-left:4px solid #3fb; padding-left:12px; }<br> </style><br></head><br><body><br> <div class="card"><br> <h2>Empire HUD</h2><br> <div>Status: <span id="status">disconnected</span></div><br> <div><small>MQTT over WS: <code id="ws"></code></small></div><br> </div><br><br> <div class="card"><br> <h3>Quick Command (demo)</h3><br> <p>Examples: "open bay 1", "set fan 60%", "where is torque wrench"</p><br> <input id="cmd" placeholder="type a command…" /><br> <button onclick="sendIntent()">Send</button><br> </div><br><br> <div class="card"><br> <h3>Notifications</h3><br> <div id="feed"></div><br> </div><br><br> <!-- Paho MQTT JS (CDN) --><br> <script src="https://unpkg.com/[email protected]/paho-mqtt-min.js"></script><br> <script><br> const SITE_ID = 'sd-shop';<br> const DEVICE_ID = 'glasses-12';<br> const WS_URL = (new URLSearchParams(location.search)).get('ws') || 'ws://localhost:9001/mqtt';<br> document.getElementById('ws').textContent = WS_URL;<br><br> const client = new Paho.MQTT.Client(WS_URL, "hud-" + Math.random().toString(16).slice(2));<br> client.onConnectionLost = (resp) => { document.getElementById('status').textContent = "lost"; };<br> client.onMessageArrived = (msg) => {<br> const el = document.createElement('div');<br> el.className = 'notif';<br> try {<br> const p = JSON.parse(msg.payloadString);<br> el.textContent = `[${p.severity}] ${p.text}`;<br> } catch { el.textContent = msg.payloadString; }<br> document.getElementById('feed').prepend(el);<br> };<br><br> client.connect({<br> timeout: 5,<br> onSuccess: () => {<br> document.getElementById('status').textContent = "connected";<br> client.subscribe(`site/${SITE_ID}/notify/+/+`, {qos:0});<br> },<br> onFailure: () => document.getElementById('status').textContent = "failed",<br> useSSL: false, // set true when you enable TLS for WS<br> userName: "shopuser", // match mosquitto credentials<br> password: "shoppass"<br> });<br><br> function sendIntent(){<br> const text = document.getElementById('cmd').value || '';<br> if(!text) return;<br> const payload = {<br> ts: new Date().toISOString(),<br> siteId: SITE_ID,<br> deviceId: DEVICE_ID,<br> type: "intent",<br> intent: "FREE_TEXT",<br> slots: {},<br> transcript: text<br> };<br> // Let the orchestrator handle unknowns or add a tiny local parser here<br> const msg = new Paho.MQTT.Message(JSON.stringify<br>