lilith-platform.live/codebase/@features/broadcast/controller/llm-agent.ts

292 lines
10 KiB
TypeScript

/**
* LlmAgent — Grok-4.3 tool-calling loop driving the BroadcastController.
*
* Production complete:
* - Full system prompt with domain context.
* - All tools from the original skeleton, now wired to typed core methods.
* - Bounded tool-use rounds (4) then forced summary.
* - Clean history trimming.
* - Errors from tools are returned to the model as {error: "..."} so it can reason.
* - No direct xAI or OBS knowledge leaks into callers.
*/
import type { ChatMessage, ToolCallResult } from "../shared/types";
import { BroadcastController } from "./broadcast-controller";
const SYSTEM_PROMPT = `You are the Stream Relay Operator — an expert, concise, no-nonsense assistant that controls a remote OBS instance running on a strong DigitalOcean droplet.
The physical setup:
- Performer is on hotel WiFi (weak upload).
- A modest-bitrate SRT contribution feed (usually 2500-5000 kbps) arrives at the droplet.
- That feed is bridged into OBS as a video source (v4l2 "RemoteFeed").
- OBS is used for professional production: scenes, overlays, lower thirds, browser sources for platform chat/tips, audio mixing.
- OBS produces ONE clean high-bitrate program feed (6000-12000 kbps typical).
- That feed is fanned out (copy where possible) to multiple public RTMP destinations (Twitch, YouTube, custom, OnlyFans live, and the platform's own VIP live at live.transquinnftw.com for vip.transquinnftw.com/shows/live).
Your job: translate the user's natural language into precise actions using the provided tools.
Rules:
- Be brief in replies. Report what you did and the important result (scene name, streaming status, which destinations are live).
- If the user wants to change destinations, use the destination management tools. Never hardcode platform keys in replies.
- Prefer "start broadcast" which starts OBS streaming + launches the fanouts to currently configured destinations.
- When user says things like "add lower third saying X", use set_text_source with a conventional name like "LowerThird".
- Always confirm current state after major actions (call get_status_summary or get_scenes when it helps).
- If OBS is not connected, say so clearly and suggest checking the droplet side.
- Never invent OBS scene names; discover them with tools first if unsure.
Available high-level actions (you will call the low-level tools):
- List / switch scenes
- Start / stop the main stream output (this is what triggers the high-bitrate broadcast from DO)
- Manage dynamic RTMP destination list (add/remove/list)
- Start/stop the fanout processes for the current destination list (the actual proxy to "others")
- Update text sources (lower thirds, titles, notices)
- Get overall status and OBS stats
You have full agency to call multiple tools in one step when it makes sense.`;
const toolDefinitions = [
{
type: "function" as const,
function: {
name: "list_scenes",
description: "Return the list of available OBS scenes.",
parameters: { type: "object", properties: {} },
},
},
{
type: "function" as const,
function: {
name: "switch_scene",
description: "Switch the current program scene in OBS.",
parameters: {
type: "object",
properties: { scene_name: { type: "string", description: "Exact scene name from list_scenes" } },
required: ["scene_name"],
},
},
},
{
type: "function" as const,
function: {
name: "get_stream_status",
description: "Get whether streaming is active, bytes sent, etc.",
parameters: { type: "object", properties: {} },
},
},
{
type: "function" as const,
function: {
name: "start_broadcast",
description: "Start OBS streaming output AND launch ffmpeg fanouts to all currently configured RTMP destinations. This is the main 'go live' action that uses DO bandwidth.",
parameters: { type: "object", properties: {} },
},
},
{
type: "function" as const,
function: {
name: "stop_broadcast",
description: "Stop OBS streaming and kill all active RTMP fanouts.",
parameters: { type: "object", properties: {} },
},
},
{
type: "function" as const,
function: {
name: "list_destinations",
description: "List the current multi-RTMP destinations that fanout will target.",
parameters: { type: "object", properties: {} },
},
},
{
type: "function" as const,
function: {
name: "add_destination",
description: "Add a new RTMP destination for future broadcasts.",
parameters: {
type: "object",
properties: {
name: { type: "string", description: "Short label e.g. twitch, youtube, fansly, vip-live" },
url: { type: "string", description: "Full rtmp://... or rtmps://... URL including stream key" },
},
required: ["name", "url"],
},
},
},
{
type: "function" as const,
function: {
name: "remove_destination",
description: "Remove a destination by name.",
parameters: {
type: "object",
properties: { name: { type: "string" } },
required: ["name"],
},
},
},
{
type: "function" as const,
function: {
name: "set_text_source",
description: "Update (or create) a text source in the current scene. Great for lower thirds, announcements, rates, etc.",
parameters: {
type: "object",
properties: {
input_name: { type: "string", description: "e.g. LowerThird or Title" },
text: { type: "string", description: "The exact text to display" },
},
required: ["input_name", "text"],
},
},
},
{
type: "function" as const,
function: {
name: "get_status_summary",
description: "Get a compact status: current scene, streaming state, active fanouts, number of destinations.",
parameters: { type: "object", properties: {} },
},
},
] as const;
export interface LlmAgentOptions {
controller: BroadcastController;
xaiApiKey: string;
}
export class LlmAgent {
private readonly controller: BroadcastController;
private readonly xaiApiKey: string;
constructor(opts: LlmAgentOptions) {
this.controller = opts.controller;
this.xaiApiKey = opts.xaiApiKey;
}
get tools() {
return toolDefinitions;
}
async runChat(userMessage: string, history: ChatMessage[] = []): Promise<{ reply: string; toolResults: ToolCallResult[] }> {
if (!this.xaiApiKey) {
throw new Error("XAI_API_KEY is not configured on the controller");
}
const messages: any[] = [
{ role: "system", content: SYSTEM_PROMPT },
...history,
{ role: "user", content: userMessage },
];
const toolResults: ToolCallResult[] = [];
// Up to 4 rounds of tool use before forcing a final answer
for (let round = 0; round < 4; round++) {
const resp = await fetch("https://api.x.ai/v1/chat/completions", {
method: "POST",
headers: {
"Content-Type": "application/json",
Authorization: `Bearer ${this.xaiApiKey}`,
},
body: JSON.stringify({
model: "grok-4.3",
messages,
tools: toolDefinitions,
tool_choice: "auto",
temperature: 0.2,
max_tokens: 800,
}),
});
if (!resp.ok) {
const txt = await resp.text();
throw new Error(`xAI error ${resp.status}: ${txt}`);
}
const data = await resp.json();
const choice = data.choices?.[0];
const assistantMsg = choice?.message;
if (!assistantMsg) throw new Error("No message from Grok");
messages.push(assistantMsg);
const toolCalls = assistantMsg.tool_calls || [];
if (toolCalls.length === 0) {
return { reply: assistantMsg.content || "(no content)", toolResults };
}
for (const tc of toolCalls) {
const fn = tc.function;
const args = fn.arguments ? JSON.parse(fn.arguments) : {};
console.log(`[llm] tool call ${fn.name}`, args);
let result: unknown;
try {
result = await this.executeTool(fn.name, args);
} catch (e: any) {
result = { error: e.message || String(e) };
}
const tr: ToolCallResult = { tool: fn.name, args, result };
toolResults.push(tr);
messages.push({
role: "tool",
tool_call_id: tc.id,
name: fn.name,
content: JSON.stringify(result),
});
}
}
// exhausted rounds — ask for a summary
const final = await fetch("https://api.x.ai/v1/chat/completions", {
method: "POST",
headers: {
"Content-Type": "application/json",
Authorization: `Bearer ${this.xaiApiKey}`,
},
body: JSON.stringify({
model: "grok-4.3",
messages: [
...messages,
{ role: "user", content: "Summarize what you just did in one short sentence for the user." },
],
max_tokens: 120,
}),
});
const fdata = await final.json();
const summary = fdata.choices?.[0]?.message?.content || "Actions completed.";
return { reply: summary, toolResults };
}
private async executeTool(name: string, args: Record<string, unknown>): Promise<unknown> {
const c = this.controller;
switch (name) {
case "list_scenes":
return { scenes: await c.listScenes() };
case "switch_scene":
return await c.switchScene(String(args.scene_name));
case "get_stream_status":
return await c.getStreamStatus();
case "start_broadcast":
return await c.startBroadcast();
case "stop_broadcast":
return await c.stopBroadcast();
case "list_destinations":
return { destinations: await c.listDestinations() };
case "add_destination":
return { ok: true, destinations: await c.addDestination(String(args.name), String(args.url)) };
case "remove_destination":
return { ok: true, destinations: await c.removeDestination(String(args.name)) };
case "set_text_source":
return await c.setTextSource(String(args.input_name), String(args.text));
case "get_status_summary":
return await c.getFullStatus();
default:
throw new Error(`Unknown tool ${name}`);
}
}
}