lilith-platform.live/codebase/@features/broadcast/controller/index.ts

435 lines
17 KiB
TypeScript

/**
* @lilith/broadcast-controller (self-contained entrypoint)
*
* Thin Bun.serve wrapper + embedded chat UI for the droplet use-case.
* The real production logic lives in the sibling modules:
* - obs-client.ts
* - fanout-manager.ts
* - destination-store.ts
* - broadcast-controller.ts ← the core engine (BroadcastController)
* - llm-agent.ts
*
* This file now:
* - wires the core exactly once
* - keeps the original zero-dependency, single-file-deploy UI + server behavior
* - exposes the same /api/* surface (plus additional direct endpoints for parity with backend-api)
* - delegates every action through the typed core (no duplication of business logic)
*
* Run (unchanged from skeleton):
* XAI_API_KEY=... OBS_WS_URL=... OBS_WS_PASSWORD=... UI_PASSPHRASE=... \
* bun run index.ts
*
* The collective keeps the embedded controller working while the architecture grows.
*/
import { type Subprocess } from "bun"; // only for type of active fanouts in status (we delegate)
import type {
Destination,
ChatMessage,
ToolCallResult,
} from "../shared/types";
import { ObsClient } from "./obs-client";
import { FanoutManager } from "./fanout-manager";
import { DestinationStore } from "./destination-store";
import { BroadcastController } from "./broadcast-controller";
import { LlmAgent } from "./llm-agent";
// ---------------- env ----------------
const PORT = Number(process.env.PORT || 8080);
const OBS_WS_URL = process.env.OBS_WS_URL || "ws://127.0.0.1:4455";
const OBS_WS_PASSWORD = process.env.OBS_WS_PASSWORD || "";
const XAI_API_KEY = process.env.XAI_API_KEY || "";
const UI_PASSPHRASE = process.env.UI_PASSPHRASE || "change-this-immediately";
const INITIAL_RTMP_TARGETS: Destination[] = process.env.INITIAL_RTMP_TARGETS
? (JSON.parse(process.env.INITIAL_RTMP_TARGETS) as any[]).filter(
(x): x is Destination => typeof x?.name === "string" && typeof x?.url === "string",
)
: [];
if (!XAI_API_KEY) {
console.warn("[controller] WARNING: XAI_API_KEY not set. LLM chat will fail until provided.");
}
// ---------------- wire the core (this is the controller) ----------------
const obs = new ObsClient(OBS_WS_URL, OBS_WS_PASSWORD);
const fanout = new FanoutManager();
const store = new DestinationStore(); // uses DESTINATIONS_FILE or /data/destinations.json
const core = new BroadcastController({ obs, fanout, store });
const llmAgent = new LlmAgent({ controller: core, xaiApiKey: XAI_API_KEY });
await store.load(INITIAL_RTMP_TARGETS);
// ---------------- HTTP server + chat UI (preserved behavior + enhanced surface) ----------------
function isAuthed(url: URL): boolean {
const pass = url.searchParams.get("p") || "";
return pass === UI_PASSPHRASE;
}
const server = Bun.serve({
port: PORT,
async fetch(req) {
const url = new URL(req.url);
const authed = isAuthed(url);
if (url.pathname === "/") {
if (!authed) {
return new Response(loginHtml(), { headers: { "Content-Type": "text/html" } });
}
return new Response(indexHtml(), { headers: { "Content-Type": "text/html; charset=utf-8" } });
}
// health (public)
if (url.pathname === "/health") {
return Response.json({ ok: true, obsReady: obs.isConnected });
}
// ---- protected routes ----
if (!authed) {
return new Response("unauthorized", { status: 401 });
}
// status (legacy shape + new full)
if (url.pathname === "/api/status") {
try {
const summary = await core.getFullStatus();
const status = await core.getStatusSummary();
return Response.json({
ok: true,
summary,
status,
destinations: core.store.list(),
fanouts: fanout.activeNames,
});
} catch (e: any) {
return Response.json({ error: e.message || String(e) }, { status: 500 });
}
}
// direct control surface (added for completeness; mirrors backend-api)
if (url.pathname === "/api/scenes" && req.method === "GET") {
try {
const scenes = await core.listScenes();
return Response.json({ ok: true, scenes });
} catch (e: any) {
return Response.json({ error: e.message || String(e) }, { status: 500 });
}
}
if (url.pathname === "/api/scenes/switch" && req.method === "POST") {
try {
const { sceneName } = await req.json();
if (!sceneName) return Response.json({ error: "sceneName required" }, { status: 400 });
const r = await core.switchScene(sceneName);
return Response.json({ ok: true, ...r });
} catch (e: any) {
return Response.json({ error: e.message || String(e) }, { status: 500 });
}
}
if (url.pathname === "/api/stream/status" && req.method === "GET") {
try {
const s = await core.getStreamStatus();
return Response.json({ ok: true, stream: s });
} catch (e: any) {
return Response.json({ error: e.message || String(e) }, { status: 500 });
}
}
if (url.pathname === "/api/broadcast/start" && req.method === "POST") {
try {
const r = await core.startBroadcast();
return Response.json({ ok: true, ...r });
} catch (e: any) {
return Response.json({ error: e.message || String(e) }, { status: 500 });
}
}
if (url.pathname === "/api/broadcast/stop" && req.method === "POST") {
try {
const r = await core.stopBroadcast();
return Response.json({ ok: true, ...r });
} catch (e: any) {
return Response.json({ error: e.message || String(e) }, { status: 500 });
}
}
if (url.pathname === "/api/destinations" && req.method === "GET") {
try {
const destinations = await core.listDestinations();
return Response.json({ ok: true, destinations });
} catch (e: any) {
return Response.json({ error: e.message || String(e) }, { status: 500 });
}
}
if (url.pathname === "/api/destinations" && req.method === "POST") {
try {
const { name, url: destUrl } = await req.json();
if (!name || !destUrl) return Response.json({ error: "name and url required" }, { status: 400 });
const destinations = await core.addDestination(name, destUrl);
return Response.json({ ok: true, destinations });
} catch (e: any) {
return Response.json({ error: e.message || String(e) }, { status: 500 });
}
}
if (url.pathname.startsWith("/api/destinations/") && req.method === "DELETE") {
try {
const name = url.pathname.split("/").pop()!;
const destinations = await core.removeDestination(name);
return Response.json({ ok: true, destinations });
} catch (e: any) {
return Response.json({ error: e.message || String(e) }, { status: 500 });
}
}
if (url.pathname === "/api/text" && req.method === "POST") {
try {
const { inputName, text } = await req.json();
if (!inputName || typeof text !== "string") {
return Response.json({ error: "inputName and text required" }, { status: 400 });
}
const r = await core.setTextSource(inputName, text);
return Response.json({ ok: true, ...r });
} catch (e: any) {
return Response.json({ error: e.message || String(e) }, { status: 500 });
}
}
// LLM chat (unchanged contract)
if (url.pathname === "/api/chat" && req.method === "POST") {
const body = await req.json().catch(() => ({}));
const message: string = body.message || "";
const history: ChatMessage[] = body.history || [];
if (!message.trim()) return Response.json({ error: "empty message" }, { status: 400 });
try {
const { reply, toolResults } = await llmAgent.runChat(message, history);
return Response.json({ reply, toolResults });
} catch (e: any) {
return Response.json({ error: e.message || String(e) }, { status: 500 });
}
}
// legacy quick (still works, now delegates)
if (url.pathname === "/api/quick" && req.method === "POST") {
const { action } = await req.json();
try {
let result: unknown;
if (action === "start") result = await core.startBroadcast();
else if (action === "stop") result = await core.stopBroadcast();
else if (action === "status") result = await core.getFullStatus();
else if (action === "scenes") result = { scenes: await core.listScenes() };
else return Response.json({ error: "unknown action" }, { status: 400 });
return Response.json({ ok: true, result });
} catch (e: any) {
return Response.json({ error: e.message || String(e) }, { status: 500 });
}
}
return new Response("not found", { status: 404 });
},
});
console.log(`[controller] listening on :${PORT}`);
console.log(`[controller] OBS target: ${OBS_WS_URL}`);
console.log(
`[controller] UI passphrase: ${UI_PASSPHRASE === "change-this-immediately" ? "!!! CHANGE VIA ENV !!!" : "(set)"}`,
);
// Auto-connect to OBS (the client itself retries)
obs
.connect()
.then(() => {
console.log("[controller] OBS websocket ready");
})
.catch((e) => {
console.log("[controller] initial OBS connect failed (will retry):", (e as Error).message);
});
// ----------------------------- Embedded UI (unchanged, single file deploy friendly) -----------------------------
function loginHtml() {
return `<!doctype html>
<html>
<head><meta charset="utf-8"/><title>Stream Relay • Login</title>
<style>body{font-family:system-ui;background:#0a0a0a;color:#ddd;display:flex;align-items:center;justify-content:center;height:100vh;margin:0} .card{background:#111;border:1px solid #333;padding:2rem;border-radius:8px;width:320px} input{width:100%;padding:.6rem;background:#1a1a1a;border:1px solid #333;color:#eee} button{margin-top:1rem;width:100%;padding:.7rem;background:#3b82f6;border:none;color:white;font-weight:600;border-radius:4px}</style>
</head>
<body>
<div class="card">
<h2>Stream Relay Control</h2>
<p style="font-size:0.9rem;opacity:0.7">Enter the passphrase to access the LLM interface.</p>
<form method="get">
<input name="p" type="password" placeholder="passphrase" autofocus />
<button type="submit">Enter</button>
</form>
<p style="font-size:0.75rem;opacity:0.5;margin-top:1rem">The collective runs the heavy lifting on DO. You only push a thin SRT feed from the hotel.</p>
</div>
</body>
</html>`;
}
function indexHtml() {
return `<!doctype html>
<html>
<head>
<meta charset="utf-8"/>
<title>quinn.cast • LLM Relay</title>
<script src="https://cdn.tailwindcss.com"></script>
<style>
body { background:#0b0b0b; color:#e5e5e5; }
.chat { scrollbar-width: thin; }
.msg { max-width: 85%; }
.tool { font-family: ui-monospace, monospace; font-size: 0.75rem; background:#111; }
</style>
</head>
<body class="min-h-screen">
<div class="max-w-3xl mx-auto p-4">
<div class="flex items-center justify-between mb-4">
<div>
<span class="text-2xl font-semibold tracking-tight">quinn.cast</span>
<span class="ml-2 text-xs px-2 py-0.5 rounded bg-zinc-800">LLM OBS + SRT relay</span>
</div>
<div class="text-xs text-zinc-500">hotel WiFi thin client → DO high-bitrate broadcast</div>
</div>
<div class="grid grid-cols-1 lg:grid-cols-3 gap-4">
<!-- Status -->
<div class="lg:col-span-1 bg-zinc-950 border border-zinc-800 rounded-xl p-4">
<div class="uppercase text-[10px] tracking-[1px] text-zinc-500 mb-2">Status</div>
<div id="status" class="space-y-1 text-sm"></div>
<div class="mt-4 flex gap-2">
<button onclick="quick('start')" class="flex-1 bg-emerald-600 hover:bg-emerald-500 active:bg-emerald-700 transition text-sm py-2 rounded-lg font-medium">Start Broadcast</button>
<button onclick="quick('stop')" class="flex-1 bg-rose-600 hover:bg-rose-500 active:bg-rose-700 transition text-sm py-2 rounded-lg font-medium">Stop</button>
</div>
<button onclick="quick('scenes')" class="mt-2 w-full text-xs py-1.5 border border-zinc-700 hover:bg-zinc-900 rounded-lg">List Scenes</button>
<div class="mt-4">
<div class="uppercase text-[10px] tracking-[1px] text-zinc-500 mb-1">Destinations</div>
<div id="dests" class="text-xs text-zinc-400"></div>
</div>
</div>
<!-- Chat -->
<div class="lg:col-span-2 bg-zinc-950 border border-zinc-800 rounded-xl flex flex-col" style="height:70vh">
<div class="px-4 py-3 border-b border-zinc-800 flex items-center justify-between">
<div class="font-medium">LLM Control</div>
<div class="text-[10px] text-emerald-400">grok-4.3 + obs-websocket</div>
</div>
<div id="chat" class="flex-1 overflow-auto p-4 space-y-3 chat text-sm"></div>
<div class="p-3 border-t border-zinc-800">
<div class="flex gap-2">
<input id="input" class="flex-1 bg-black border border-zinc-700 rounded-lg px-3 py-2 text-sm focus:outline-none focus:border-zinc-500" placeholder="switch to bedroom, start broadcast, add lower third 'Rates on profile'..." />
<button onclick="sendMessage()" class="px-5 rounded-lg bg-white text-black font-medium">Send</button>
</div>
<div class="text-[10px] text-zinc-600 mt-1.5">Examples: "go live", "switch scene to closeup", "add title 'Live from the hotel'", "add twitch rtmp://.../key"</div>
</div>
</div>
</div>
<div class="mt-3 text-[10px] text-zinc-500">All encoding and final broadcast bitrate happens on the DO droplet. Your hotel connection only carries the contribution feed.</div>
</div>
<script>
let history = [];
async function refreshStatus() {
try {
const r = await fetch('/api/status' + location.search);
const j = await r.json();
const s = j.summary || {};
const el = document.getElementById('status');
el.innerHTML = \`
<div class="flex justify-between"><span class="text-zinc-400">OBS</span> <span class="\${s.connected_to_obs ? 'text-emerald-400' : 'text-rose-400'}">\${s.connected_to_obs ? 'connected' : 'disconnected'}</span></div>
<div class="flex justify-between"><span class="text-zinc-400">Scene</span> <span>\${s.current_scene || '—'}</span></div>
<div class="flex justify-between"><span class="text-zinc-400">Streaming</span> <span class="\${s.streaming ? 'text-emerald-400' : ''}">\${s.streaming ? 'LIVE' : 'idle'}</span></div>
<div class="flex justify-between"><span class="text-zinc-400">Active fanouts</span> <span>\${(j.fanouts || []).join(', ') || '—'}</span></div>
\`;
const d = document.getElementById('dests');
d.innerHTML = (j.destinations || []).map(d => \`<div>\${d.name}</div>\`).join('') || '<span class="text-zinc-600">none</span>';
} catch(e) {}
}
setInterval(refreshStatus, 4000);
refreshStatus();
function addMsg(who, text, extra = '') {
const chat = document.getElementById('chat');
const div = document.createElement('div');
div.className = 'msg ' + (who === 'you' ? 'ml-auto bg-zinc-800' : 'bg-zinc-900') + ' rounded-2xl px-3 py-2';
div.innerHTML = \`<div class="text-[10px] opacity-50 mb-0.5">\${who}</div><div>\${text}</div>\${extra}\`;
chat.appendChild(div);
chat.scrollTop = chat.scrollHeight;
}
async function sendMessage() {
const input = document.getElementById('input');
const val = input.value.trim();
if (!val) return;
addMsg('you', val);
input.value = '';
const body = { message: val, history };
const r = await fetch('/api/chat' + location.search, { method: 'POST', headers: {'Content-Type':'application/json'}, body: JSON.stringify(body) });
const j = await r.json();
if (j.error) {
addMsg('system', 'Error: ' + j.error);
return;
}
let extra = '';
if (j.toolResults && j.toolResults.length) {
extra = '<div class="mt-1 text-[10px] text-amber-400/80 tool rounded p-1">' + j.toolResults.map(t => t.tool + ' → ' + (typeof t.result === 'object' ? JSON.stringify(t.result).slice(0,120) : t.result)).join('<br>') + '</div>';
}
addMsg('grok', j.reply || '(no reply)', extra);
history.push({ role: 'user', content: val });
history.push({ role: 'assistant', content: j.reply || '' });
if (history.length > 12) history = history.slice(-12);
refreshStatus();
}
function quick(action) {
fetch('/api/quick' + location.search, { method:'POST', headers:{'Content-Type':'application/json'}, body: JSON.stringify({action}) })
.then(r => r.json())
.then(j => {
const chat = document.getElementById('chat');
addMsg('quick', JSON.stringify(j.result || j, null, 2));
refreshStatus();
})
.catch(e => addMsg('quick', 'error ' + e));
}
document.getElementById('input').addEventListener('keydown', e => {
if (e.key === 'Enter') sendMessage();
});
// boot
addMsg('system', 'Connected. Type natural language commands. The model will use tools to talk to OBS and manage the relay.');
</script>
</body>
</html>`;
}
// ready banner
setTimeout(() => {
console.log(`[controller] UI ready at http://localhost:${PORT}/?p=${encodeURIComponent(UI_PASSPHRASE)}`);
}, 200);
// graceful fanout cleanup on exit
function shutdown() {
console.log("[controller] shutting down, stopping fanouts");
fanout.stopAll();
server.stop();
process.exit(0);
}
process.on("SIGINT", shutdown);
process.on("SIGTERM", shutdown);