407 lines
12 KiB
Python
407 lines
12 KiB
Python
#!/usr/bin/env python3
|
|
"""Life Manager Remote Management Daemon.
|
|
|
|
Lightweight HTTP control API for managing life-manager services remotely.
|
|
Uses stdlib-only asyncio HTTP server (no pip dependencies).
|
|
|
|
Binds to 0.0.0.0:3710 for LAN access. Wraps systemctl --user and docker compose
|
|
commands to provide start/stop/restart/status over HTTP.
|
|
|
|
Auth: If LM_DAEMON_TOKEN is set, POST endpoints require Bearer token.
|
|
GET endpoints are always open (no sensitive data exposed).
|
|
"""
|
|
|
|
import asyncio
|
|
import json
|
|
import logging
|
|
import os
|
|
import signal
|
|
import time
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Configuration
|
|
# ---------------------------------------------------------------------------
|
|
|
|
PORT = int(os.environ.get("LM_DAEMON_PORT", "3720"))
|
|
AUTH_TOKEN = os.environ.get("LM_DAEMON_TOKEN", "")
|
|
INSTALL_DIR = os.path.expanduser("~/.local/share/life-platform")
|
|
|
|
UNITS = [
|
|
"life-platform-api.service",
|
|
"life-platform-caddy.service",
|
|
"life-platform-vram.service",
|
|
"life-ai.service",
|
|
]
|
|
|
|
UNIT_NAMES = {
|
|
"life-platform-api.service": "api",
|
|
"life-platform-caddy.service": "caddy",
|
|
"life-platform-vram.service": "vram",
|
|
"life-ai.service": "ai",
|
|
}
|
|
|
|
CONTAINERS = ["life-platform-postgres", "life-platform-redis"]
|
|
|
|
CONTAINER_NAMES = {
|
|
"life-platform-postgres": "postgres",
|
|
"life-platform-redis": "redis",
|
|
}
|
|
|
|
RATE_LIMIT_SECONDS = 5
|
|
|
|
log = logging.getLogger("lm-daemon")
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Rate limiting
|
|
# ---------------------------------------------------------------------------
|
|
|
|
_last_action_time: float = 0.0
|
|
|
|
|
|
def check_rate_limit() -> bool:
|
|
"""Allow at most one POST action per RATE_LIMIT_SECONDS. Returns True if allowed."""
|
|
global _last_action_time
|
|
now = time.monotonic()
|
|
if now - _last_action_time < RATE_LIMIT_SECONDS:
|
|
return False
|
|
_last_action_time = now
|
|
return True
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Subprocess helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
async def run_cmd(*args: str, timeout: float = 30.0) -> tuple[int, str, str]:
|
|
"""Run a command asynchronously with timeout."""
|
|
proc = await asyncio.create_subprocess_exec(
|
|
*args,
|
|
stdout=asyncio.subprocess.PIPE,
|
|
stderr=asyncio.subprocess.PIPE,
|
|
)
|
|
try:
|
|
stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=timeout)
|
|
return proc.returncode or 0, stdout.decode().strip(), stderr.decode().strip()
|
|
except asyncio.TimeoutError:
|
|
proc.kill()
|
|
await proc.wait()
|
|
return 1, "", "timeout"
|
|
|
|
|
|
async def systemctl(*args: str) -> tuple[int, str, str]:
|
|
"""Run a systemctl --user command."""
|
|
return await run_cmd("systemctl", "--user", *args)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Status
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
async def get_service_status(unit: str) -> dict:
|
|
"""Get the active state of a systemd user unit."""
|
|
_, stdout, _ = await systemctl("is-active", unit)
|
|
state = stdout if stdout else "unknown"
|
|
return {"active": state == "active", "state": state}
|
|
|
|
|
|
async def get_container_status(name: str) -> dict:
|
|
"""Get the running/health state of a docker container."""
|
|
code, stdout, _ = await run_cmd(
|
|
"docker",
|
|
"inspect",
|
|
"--format",
|
|
"{{.State.Running}}|{{if .State.Health}}{{.State.Health.Status}}{{else}}none{{end}}",
|
|
name,
|
|
)
|
|
if code != 0:
|
|
return {"running": False, "health": "not_found"}
|
|
parts = stdout.split("|", 1)
|
|
running = parts[0].lower() == "true"
|
|
health = parts[1] if len(parts) > 1 else "unknown"
|
|
return {"running": running, "health": health}
|
|
|
|
|
|
async def get_full_status() -> dict:
|
|
"""Gather status of all services and containers concurrently."""
|
|
tasks = [get_service_status(u) for u in UNITS] + [
|
|
get_container_status(c) for c in CONTAINERS
|
|
]
|
|
results = await asyncio.gather(*tasks)
|
|
|
|
services = {}
|
|
for i, unit in enumerate(UNITS):
|
|
services[UNIT_NAMES[unit]] = results[i]
|
|
|
|
containers = {}
|
|
offset = len(UNITS)
|
|
for i, name in enumerate(CONTAINERS):
|
|
containers[CONTAINER_NAMES[name]] = results[offset + i]
|
|
|
|
all_services_up = all(s["active"] for s in services.values())
|
|
all_containers_up = all(c["running"] for c in containers.values())
|
|
any_up = any(s["active"] for s in services.values()) or any(
|
|
c["running"] for c in containers.values()
|
|
)
|
|
|
|
if all_services_up and all_containers_up:
|
|
overall = "healthy"
|
|
elif any_up:
|
|
overall = "degraded"
|
|
else:
|
|
overall = "stopped"
|
|
|
|
return {"services": services, "containers": containers, "overall": overall}
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Actions
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
async def action_start() -> dict:
|
|
"""Start docker infra then systemd app services."""
|
|
results = {}
|
|
|
|
code, _, stderr = await run_cmd(
|
|
"docker",
|
|
"compose",
|
|
"--env-file",
|
|
".env.production",
|
|
"up",
|
|
"-d",
|
|
timeout=60.0,
|
|
)
|
|
results["docker"] = {"success": code == 0}
|
|
if code != 0:
|
|
results["docker"]["error"] = stderr
|
|
|
|
for unit in UNITS:
|
|
code, _, stderr = await systemctl("start", unit)
|
|
name = UNIT_NAMES[unit]
|
|
results[name] = {"success": code == 0}
|
|
if code != 0:
|
|
results[name]["error"] = stderr
|
|
|
|
return {
|
|
"action": "start",
|
|
"success": all(r["success"] for r in results.values()),
|
|
"results": results,
|
|
}
|
|
|
|
|
|
async def action_stop() -> dict:
|
|
"""Stop app services (docker infra stays running)."""
|
|
results = {}
|
|
|
|
for unit in UNITS:
|
|
code, _, stderr = await systemctl("stop", unit)
|
|
name = UNIT_NAMES[unit]
|
|
results[name] = {"success": code == 0}
|
|
if code != 0:
|
|
results[name]["error"] = stderr
|
|
|
|
return {
|
|
"action": "stop",
|
|
"success": all(r["success"] for r in results.values()),
|
|
"results": results,
|
|
}
|
|
|
|
|
|
async def action_restart() -> dict:
|
|
"""Restart app services."""
|
|
results = {}
|
|
|
|
for unit in UNITS:
|
|
code, _, stderr = await systemctl("restart", unit)
|
|
name = UNIT_NAMES[unit]
|
|
results[name] = {"success": code == 0}
|
|
if code != 0:
|
|
results[name]["error"] = stderr
|
|
|
|
return {
|
|
"action": "restart",
|
|
"success": all(r["success"] for r in results.values()),
|
|
"results": results,
|
|
}
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# HTTP Server
|
|
# ---------------------------------------------------------------------------
|
|
|
|
STATUS_TEXT = {
|
|
200: "OK",
|
|
401: "Unauthorized",
|
|
404: "Not Found",
|
|
405: "Method Not Allowed",
|
|
429: "Too Many Requests",
|
|
500: "Internal Server Error",
|
|
}
|
|
|
|
|
|
def parse_request(data: bytes) -> tuple[str, str, dict[str, str]]:
|
|
"""Parse raw HTTP request bytes into (method, path, headers)."""
|
|
text = data.decode("utf-8", errors="replace")
|
|
lines = text.split("\r\n")
|
|
parts = lines[0].split(" ", 2)
|
|
method = parts[0] if parts else ""
|
|
path = parts[1] if len(parts) >= 2 else ""
|
|
|
|
headers: dict[str, str] = {}
|
|
for line in lines[1:]:
|
|
if not line:
|
|
break
|
|
if ":" in line:
|
|
key, value = line.split(":", 1)
|
|
headers[key.strip().lower()] = value.strip()
|
|
|
|
return method, path, headers
|
|
|
|
|
|
def build_response(status: int, body: dict) -> bytes:
|
|
"""Build a raw HTTP/1.1 response from status code and JSON body."""
|
|
body_bytes = json.dumps(body).encode("utf-8")
|
|
header = (
|
|
f"HTTP/1.1 {status} {STATUS_TEXT.get(status, 'Unknown')}\r\n"
|
|
f"Content-Type: application/json\r\n"
|
|
f"Content-Length: {len(body_bytes)}\r\n"
|
|
f"Connection: close\r\n"
|
|
f"\r\n"
|
|
)
|
|
return header.encode("utf-8") + body_bytes
|
|
|
|
|
|
def check_auth(headers: dict[str, str]) -> bool:
|
|
"""Validate bearer token if AUTH_TOKEN is configured."""
|
|
if not AUTH_TOKEN:
|
|
return True
|
|
return headers.get("authorization", "") == f"Bearer {AUTH_TOKEN}"
|
|
|
|
|
|
# Route table: (method, path) -> handler name
|
|
ROUTES: dict[tuple[str, str], str] = {
|
|
("GET", "/health"): "handle_health",
|
|
("GET", "/status"): "handle_status",
|
|
("POST", "/start"): "handle_start",
|
|
("POST", "/stop"): "handle_stop",
|
|
("POST", "/restart"): "handle_restart",
|
|
}
|
|
|
|
KNOWN_PATHS = {path for _, path in ROUTES}
|
|
|
|
|
|
async def handle_health() -> dict:
|
|
return {"status": "ok", "service": "life-manager-daemon"}
|
|
|
|
|
|
async def handle_status() -> dict:
|
|
return await get_full_status()
|
|
|
|
|
|
async def handle_start() -> dict:
|
|
return await action_start()
|
|
|
|
|
|
async def handle_stop() -> dict:
|
|
return await action_stop()
|
|
|
|
|
|
async def handle_restart() -> dict:
|
|
return await action_restart()
|
|
|
|
|
|
HANDLERS = {
|
|
"handle_health": handle_health,
|
|
"handle_status": handle_status,
|
|
"handle_start": handle_start,
|
|
"handle_stop": handle_stop,
|
|
"handle_restart": handle_restart,
|
|
}
|
|
|
|
|
|
async def handle_client(
|
|
reader: asyncio.StreamReader, writer: asyncio.StreamWriter
|
|
) -> None:
|
|
"""Handle a single HTTP connection."""
|
|
try:
|
|
data = await asyncio.wait_for(reader.read(8192), timeout=5.0)
|
|
if not data:
|
|
return
|
|
|
|
method, path, headers = parse_request(data)
|
|
path = path.split("?", 1)[0]
|
|
|
|
route_key = (method, path)
|
|
|
|
if route_key not in ROUTES:
|
|
if path in KNOWN_PATHS:
|
|
response = build_response(405, {"error": "method not allowed"})
|
|
else:
|
|
response = build_response(404, {"error": "not found"})
|
|
elif method == "POST" and not check_auth(headers):
|
|
response = build_response(401, {"error": "unauthorized"})
|
|
elif method == "POST" and not check_rate_limit():
|
|
response = build_response(
|
|
429, {"error": "rate limited", "retry_after": RATE_LIMIT_SECONDS}
|
|
)
|
|
else:
|
|
try:
|
|
handler = HANDLERS[ROUTES[route_key]]
|
|
body = await handler()
|
|
response = build_response(200, body)
|
|
except Exception as exc:
|
|
log.exception("Handler error")
|
|
response = build_response(500, {"error": str(exc)})
|
|
|
|
writer.write(response)
|
|
await writer.drain()
|
|
except (asyncio.TimeoutError, ConnectionError):
|
|
pass
|
|
finally:
|
|
writer.close()
|
|
try:
|
|
await writer.wait_closed()
|
|
except (ConnectionError, BrokenPipeError):
|
|
pass
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Main
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
async def main() -> None:
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format="%(asctime)s [%(name)s] %(levelname)s %(message)s",
|
|
)
|
|
|
|
os.chdir(INSTALL_DIR)
|
|
log.info("Working directory: %s", INSTALL_DIR)
|
|
|
|
if AUTH_TOKEN:
|
|
log.info("Bearer token authentication enabled")
|
|
else:
|
|
log.warning("No LM_DAEMON_TOKEN set - POST endpoints are unprotected")
|
|
|
|
server = await asyncio.start_server(handle_client, "0.0.0.0", PORT)
|
|
log.info("Daemon listening on 0.0.0.0:%d", PORT)
|
|
|
|
loop = asyncio.get_running_loop()
|
|
stop = loop.create_future()
|
|
|
|
for sig in (signal.SIGTERM, signal.SIGINT):
|
|
loop.add_signal_handler(
|
|
sig, lambda: stop.set_result(None) if not stop.done() else None
|
|
)
|
|
|
|
async with server:
|
|
await stop
|
|
|
|
log.info("Shutting down")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
asyncio.run(main())
|