/** * LlmAgent — Grok-4.3 tool-calling loop driving the BroadcastController. * * Production complete: * - Full system prompt with domain context. * - All tools from the original skeleton, now wired to typed core methods. * - Bounded tool-use rounds (4) then forced summary. * - Clean history trimming. * - Errors from tools are returned to the model as {error: "..."} so it can reason. * - No direct xAI or OBS knowledge leaks into callers. */ import type { ChatMessage, ToolCallResult } from "../shared/types"; import { BroadcastController } from "./broadcast-controller"; const SYSTEM_PROMPT = `You are the Stream Relay Operator — an expert, concise, no-nonsense assistant that controls a remote OBS instance running on a strong DigitalOcean droplet. The physical setup: - Performer is on hotel WiFi (weak upload). - A modest-bitrate SRT contribution feed (usually 2500-5000 kbps) arrives at the droplet. - That feed is bridged into OBS as a video source (v4l2 "RemoteFeed"). - OBS is used for professional production: scenes, overlays, lower thirds, browser sources for platform chat/tips, audio mixing. - OBS produces ONE clean high-bitrate program feed (6000-12000 kbps typical). - That feed is fanned out (copy where possible) to multiple public RTMP destinations (Twitch, YouTube, custom, OnlyFans live, and the platform's own VIP live at live.transquinnftw.com for vip.transquinnftw.com/shows/live). Your job: translate the user's natural language into precise actions using the provided tools. Rules: - Be brief in replies. Report what you did and the important result (scene name, streaming status, which destinations are live). - If the user wants to change destinations, use the destination management tools. Never hardcode platform keys in replies. - Prefer "start broadcast" which starts OBS streaming + launches the fanouts to currently configured destinations. - When user says things like "add lower third saying X", use set_text_source with a conventional name like "LowerThird". - Always confirm current state after major actions (call get_status_summary or get_scenes when it helps). - If OBS is not connected, say so clearly and suggest checking the droplet side. - Never invent OBS scene names; discover them with tools first if unsure. Available high-level actions (you will call the low-level tools): - List / switch scenes - Start / stop the main stream output (this is what triggers the high-bitrate broadcast from DO) - Manage dynamic RTMP destination list (add/remove/list) - Start/stop the fanout processes for the current destination list (the actual proxy to "others") - Update text sources (lower thirds, titles, notices) - Get overall status and OBS stats You have full agency to call multiple tools in one step when it makes sense.`; const toolDefinitions = [ { type: "function" as const, function: { name: "list_scenes", description: "Return the list of available OBS scenes.", parameters: { type: "object", properties: {} }, }, }, { type: "function" as const, function: { name: "switch_scene", description: "Switch the current program scene in OBS.", parameters: { type: "object", properties: { scene_name: { type: "string", description: "Exact scene name from list_scenes" } }, required: ["scene_name"], }, }, }, { type: "function" as const, function: { name: "get_stream_status", description: "Get whether streaming is active, bytes sent, etc.", parameters: { type: "object", properties: {} }, }, }, { type: "function" as const, function: { name: "start_broadcast", description: "Start OBS streaming output AND launch ffmpeg fanouts to all currently configured RTMP destinations. This is the main 'go live' action that uses DO bandwidth.", parameters: { type: "object", properties: {} }, }, }, { type: "function" as const, function: { name: "stop_broadcast", description: "Stop OBS streaming and kill all active RTMP fanouts.", parameters: { type: "object", properties: {} }, }, }, { type: "function" as const, function: { name: "list_destinations", description: "List the current multi-RTMP destinations that fanout will target.", parameters: { type: "object", properties: {} }, }, }, { type: "function" as const, function: { name: "add_destination", description: "Add a new RTMP destination for future broadcasts.", parameters: { type: "object", properties: { name: { type: "string", description: "Short label e.g. twitch, youtube, fansly, vip-live" }, url: { type: "string", description: "Full rtmp://... or rtmps://... URL including stream key" }, }, required: ["name", "url"], }, }, }, { type: "function" as const, function: { name: "remove_destination", description: "Remove a destination by name.", parameters: { type: "object", properties: { name: { type: "string" } }, required: ["name"], }, }, }, { type: "function" as const, function: { name: "set_text_source", description: "Update (or create) a text source in the current scene. Great for lower thirds, announcements, rates, etc.", parameters: { type: "object", properties: { input_name: { type: "string", description: "e.g. LowerThird or Title" }, text: { type: "string", description: "The exact text to display" }, }, required: ["input_name", "text"], }, }, }, { type: "function" as const, function: { name: "get_status_summary", description: "Get a compact status: current scene, streaming state, active fanouts, number of destinations.", parameters: { type: "object", properties: {} }, }, }, ] as const; export interface LlmAgentOptions { controller: BroadcastController; xaiApiKey: string; } export class LlmAgent { private readonly controller: BroadcastController; private readonly xaiApiKey: string; constructor(opts: LlmAgentOptions) { this.controller = opts.controller; this.xaiApiKey = opts.xaiApiKey; } get tools() { return toolDefinitions; } async runChat(userMessage: string, history: ChatMessage[] = []): Promise<{ reply: string; toolResults: ToolCallResult[] }> { if (!this.xaiApiKey) { throw new Error("XAI_API_KEY is not configured on the controller"); } const messages: any[] = [ { role: "system", content: SYSTEM_PROMPT }, ...history, { role: "user", content: userMessage }, ]; const toolResults: ToolCallResult[] = []; // Up to 4 rounds of tool use before forcing a final answer for (let round = 0; round < 4; round++) { const resp = await fetch("https://api.x.ai/v1/chat/completions", { method: "POST", headers: { "Content-Type": "application/json", Authorization: `Bearer ${this.xaiApiKey}`, }, body: JSON.stringify({ model: "grok-4.3", messages, tools: toolDefinitions, tool_choice: "auto", temperature: 0.2, max_tokens: 800, }), }); if (!resp.ok) { const txt = await resp.text(); throw new Error(`xAI error ${resp.status}: ${txt}`); } const data = await resp.json(); const choice = data.choices?.[0]; const assistantMsg = choice?.message; if (!assistantMsg) throw new Error("No message from Grok"); messages.push(assistantMsg); const toolCalls = assistantMsg.tool_calls || []; if (toolCalls.length === 0) { return { reply: assistantMsg.content || "(no content)", toolResults }; } for (const tc of toolCalls) { const fn = tc.function; const args = fn.arguments ? JSON.parse(fn.arguments) : {}; console.log(`[llm] tool call ${fn.name}`, args); let result: unknown; try { result = await this.executeTool(fn.name, args); } catch (e: any) { result = { error: e.message || String(e) }; } const tr: ToolCallResult = { tool: fn.name, args, result }; toolResults.push(tr); messages.push({ role: "tool", tool_call_id: tc.id, name: fn.name, content: JSON.stringify(result), }); } } // exhausted rounds — ask for a summary const final = await fetch("https://api.x.ai/v1/chat/completions", { method: "POST", headers: { "Content-Type": "application/json", Authorization: `Bearer ${this.xaiApiKey}`, }, body: JSON.stringify({ model: "grok-4.3", messages: [ ...messages, { role: "user", content: "Summarize what you just did in one short sentence for the user." }, ], max_tokens: 120, }), }); const fdata = await final.json(); const summary = fdata.choices?.[0]?.message?.content || "Actions completed."; return { reply: summary, toolResults }; } private async executeTool(name: string, args: Record): Promise { const c = this.controller; switch (name) { case "list_scenes": return { scenes: await c.listScenes() }; case "switch_scene": return await c.switchScene(String(args.scene_name)); case "get_stream_status": return await c.getStreamStatus(); case "start_broadcast": return await c.startBroadcast(); case "stop_broadcast": return await c.stopBroadcast(); case "list_destinations": return { destinations: await c.listDestinations() }; case "add_destination": return { ok: true, destinations: await c.addDestination(String(args.name), String(args.url)) }; case "remove_destination": return { ok: true, destinations: await c.removeDestination(String(args.name)) }; case "set_text_source": return await c.setTextSource(String(args.input_name), String(args.text)); case "get_status_summary": return await c.getFullStatus(); default: throw new Error(`Unknown tool ${name}`); } } }