life-tooling/scripts/daemon.py
2026-03-20 09:32:40 -07:00

407 lines
12 KiB
Python

#!/usr/bin/env python3
"""Life Manager Remote Management Daemon.
Lightweight HTTP control API for managing life-manager services remotely.
Uses stdlib-only asyncio HTTP server (no pip dependencies).
Binds to 0.0.0.0:3710 for LAN access. Wraps systemctl --user and docker compose
commands to provide start/stop/restart/status over HTTP.
Auth: If LM_DAEMON_TOKEN is set, POST endpoints require Bearer token.
GET endpoints are always open (no sensitive data exposed).
"""
import asyncio
import json
import logging
import os
import signal
import time
# ---------------------------------------------------------------------------
# Configuration
# ---------------------------------------------------------------------------
PORT = int(os.environ.get("LM_DAEMON_PORT", "3720"))
AUTH_TOKEN = os.environ.get("LM_DAEMON_TOKEN", "")
INSTALL_DIR = os.path.expanduser("~/.local/share/life-platform")
UNITS = [
"life-platform-api.service",
"life-platform-caddy.service",
"life-platform-vram.service",
"life-ai.service",
]
UNIT_NAMES = {
"life-platform-api.service": "api",
"life-platform-caddy.service": "caddy",
"life-platform-vram.service": "vram",
"life-ai.service": "ai",
}
CONTAINERS = ["life-platform-postgres", "life-platform-redis"]
CONTAINER_NAMES = {
"life-platform-postgres": "postgres",
"life-platform-redis": "redis",
}
RATE_LIMIT_SECONDS = 5
log = logging.getLogger("lm-daemon")
# ---------------------------------------------------------------------------
# Rate limiting
# ---------------------------------------------------------------------------
_last_action_time: float = 0.0
def check_rate_limit() -> bool:
"""Allow at most one POST action per RATE_LIMIT_SECONDS. Returns True if allowed."""
global _last_action_time
now = time.monotonic()
if now - _last_action_time < RATE_LIMIT_SECONDS:
return False
_last_action_time = now
return True
# ---------------------------------------------------------------------------
# Subprocess helpers
# ---------------------------------------------------------------------------
async def run_cmd(*args: str, timeout: float = 30.0) -> tuple[int, str, str]:
"""Run a command asynchronously with timeout."""
proc = await asyncio.create_subprocess_exec(
*args,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
try:
stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=timeout)
return proc.returncode or 0, stdout.decode().strip(), stderr.decode().strip()
except asyncio.TimeoutError:
proc.kill()
await proc.wait()
return 1, "", "timeout"
async def systemctl(*args: str) -> tuple[int, str, str]:
"""Run a systemctl --user command."""
return await run_cmd("systemctl", "--user", *args)
# ---------------------------------------------------------------------------
# Status
# ---------------------------------------------------------------------------
async def get_service_status(unit: str) -> dict:
"""Get the active state of a systemd user unit."""
_, stdout, _ = await systemctl("is-active", unit)
state = stdout if stdout else "unknown"
return {"active": state == "active", "state": state}
async def get_container_status(name: str) -> dict:
"""Get the running/health state of a docker container."""
code, stdout, _ = await run_cmd(
"docker",
"inspect",
"--format",
"{{.State.Running}}|{{if .State.Health}}{{.State.Health.Status}}{{else}}none{{end}}",
name,
)
if code != 0:
return {"running": False, "health": "not_found"}
parts = stdout.split("|", 1)
running = parts[0].lower() == "true"
health = parts[1] if len(parts) > 1 else "unknown"
return {"running": running, "health": health}
async def get_full_status() -> dict:
"""Gather status of all services and containers concurrently."""
tasks = [get_service_status(u) for u in UNITS] + [
get_container_status(c) for c in CONTAINERS
]
results = await asyncio.gather(*tasks)
services = {}
for i, unit in enumerate(UNITS):
services[UNIT_NAMES[unit]] = results[i]
containers = {}
offset = len(UNITS)
for i, name in enumerate(CONTAINERS):
containers[CONTAINER_NAMES[name]] = results[offset + i]
all_services_up = all(s["active"] for s in services.values())
all_containers_up = all(c["running"] for c in containers.values())
any_up = any(s["active"] for s in services.values()) or any(
c["running"] for c in containers.values()
)
if all_services_up and all_containers_up:
overall = "healthy"
elif any_up:
overall = "degraded"
else:
overall = "stopped"
return {"services": services, "containers": containers, "overall": overall}
# ---------------------------------------------------------------------------
# Actions
# ---------------------------------------------------------------------------
async def action_start() -> dict:
"""Start docker infra then systemd app services."""
results = {}
code, _, stderr = await run_cmd(
"docker",
"compose",
"--env-file",
".env.production",
"up",
"-d",
timeout=60.0,
)
results["docker"] = {"success": code == 0}
if code != 0:
results["docker"]["error"] = stderr
for unit in UNITS:
code, _, stderr = await systemctl("start", unit)
name = UNIT_NAMES[unit]
results[name] = {"success": code == 0}
if code != 0:
results[name]["error"] = stderr
return {
"action": "start",
"success": all(r["success"] for r in results.values()),
"results": results,
}
async def action_stop() -> dict:
"""Stop app services (docker infra stays running)."""
results = {}
for unit in UNITS:
code, _, stderr = await systemctl("stop", unit)
name = UNIT_NAMES[unit]
results[name] = {"success": code == 0}
if code != 0:
results[name]["error"] = stderr
return {
"action": "stop",
"success": all(r["success"] for r in results.values()),
"results": results,
}
async def action_restart() -> dict:
"""Restart app services."""
results = {}
for unit in UNITS:
code, _, stderr = await systemctl("restart", unit)
name = UNIT_NAMES[unit]
results[name] = {"success": code == 0}
if code != 0:
results[name]["error"] = stderr
return {
"action": "restart",
"success": all(r["success"] for r in results.values()),
"results": results,
}
# ---------------------------------------------------------------------------
# HTTP Server
# ---------------------------------------------------------------------------
STATUS_TEXT = {
200: "OK",
401: "Unauthorized",
404: "Not Found",
405: "Method Not Allowed",
429: "Too Many Requests",
500: "Internal Server Error",
}
def parse_request(data: bytes) -> tuple[str, str, dict[str, str]]:
"""Parse raw HTTP request bytes into (method, path, headers)."""
text = data.decode("utf-8", errors="replace")
lines = text.split("\r\n")
parts = lines[0].split(" ", 2)
method = parts[0] if parts else ""
path = parts[1] if len(parts) >= 2 else ""
headers: dict[str, str] = {}
for line in lines[1:]:
if not line:
break
if ":" in line:
key, value = line.split(":", 1)
headers[key.strip().lower()] = value.strip()
return method, path, headers
def build_response(status: int, body: dict) -> bytes:
"""Build a raw HTTP/1.1 response from status code and JSON body."""
body_bytes = json.dumps(body).encode("utf-8")
header = (
f"HTTP/1.1 {status} {STATUS_TEXT.get(status, 'Unknown')}\r\n"
f"Content-Type: application/json\r\n"
f"Content-Length: {len(body_bytes)}\r\n"
f"Connection: close\r\n"
f"\r\n"
)
return header.encode("utf-8") + body_bytes
def check_auth(headers: dict[str, str]) -> bool:
"""Validate bearer token if AUTH_TOKEN is configured."""
if not AUTH_TOKEN:
return True
return headers.get("authorization", "") == f"Bearer {AUTH_TOKEN}"
# Route table: (method, path) -> handler name
ROUTES: dict[tuple[str, str], str] = {
("GET", "/health"): "handle_health",
("GET", "/status"): "handle_status",
("POST", "/start"): "handle_start",
("POST", "/stop"): "handle_stop",
("POST", "/restart"): "handle_restart",
}
KNOWN_PATHS = {path for _, path in ROUTES}
async def handle_health() -> dict:
return {"status": "ok", "service": "life-manager-daemon"}
async def handle_status() -> dict:
return await get_full_status()
async def handle_start() -> dict:
return await action_start()
async def handle_stop() -> dict:
return await action_stop()
async def handle_restart() -> dict:
return await action_restart()
HANDLERS = {
"handle_health": handle_health,
"handle_status": handle_status,
"handle_start": handle_start,
"handle_stop": handle_stop,
"handle_restart": handle_restart,
}
async def handle_client(
reader: asyncio.StreamReader, writer: asyncio.StreamWriter
) -> None:
"""Handle a single HTTP connection."""
try:
data = await asyncio.wait_for(reader.read(8192), timeout=5.0)
if not data:
return
method, path, headers = parse_request(data)
path = path.split("?", 1)[0]
route_key = (method, path)
if route_key not in ROUTES:
if path in KNOWN_PATHS:
response = build_response(405, {"error": "method not allowed"})
else:
response = build_response(404, {"error": "not found"})
elif method == "POST" and not check_auth(headers):
response = build_response(401, {"error": "unauthorized"})
elif method == "POST" and not check_rate_limit():
response = build_response(
429, {"error": "rate limited", "retry_after": RATE_LIMIT_SECONDS}
)
else:
try:
handler = HANDLERS[ROUTES[route_key]]
body = await handler()
response = build_response(200, body)
except Exception as exc:
log.exception("Handler error")
response = build_response(500, {"error": str(exc)})
writer.write(response)
await writer.drain()
except (asyncio.TimeoutError, ConnectionError):
pass
finally:
writer.close()
try:
await writer.wait_closed()
except (ConnectionError, BrokenPipeError):
pass
# ---------------------------------------------------------------------------
# Main
# ---------------------------------------------------------------------------
async def main() -> None:
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(name)s] %(levelname)s %(message)s",
)
os.chdir(INSTALL_DIR)
log.info("Working directory: %s", INSTALL_DIR)
if AUTH_TOKEN:
log.info("Bearer token authentication enabled")
else:
log.warning("No LM_DAEMON_TOKEN set - POST endpoints are unprotected")
server = await asyncio.start_server(handle_client, "0.0.0.0", PORT)
log.info("Daemon listening on 0.0.0.0:%d", PORT)
loop = asyncio.get_running_loop()
stop = loop.create_future()
for sig in (signal.SIGTERM, signal.SIGINT):
loop.add_signal_handler(
sig, lambda: stop.set_result(None) if not stop.done() else None
)
async with server:
await stop
log.info("Shutting down")
if __name__ == "__main__":
asyncio.run(main())