lilith-platform.live/codebase/@features/broadcast/controller/fanout-manager.ts

142 lines
3.7 KiB
TypeScript

/**
* FanoutManager — manages ffmpeg -c copy child processes for multi-RTMP fanout.
*
* One high-quality encode from OBS is fanned (copy) to N public destinations without re-encode.
* Production complete: tracks active children, cleans on exit, supports stop per-name / all,
* exposes live status. Uses Bun.spawn for zero-dep control.
*/
import { spawn, type Subprocess } from "bun";
import type { Destination, FanoutStatus } from "../shared/types";
export class FanoutError extends Error {
constructor(message: string, public readonly name?: string) {
super(message);
this.name = "FanoutError";
}
}
export class FanoutManager {
private readonly active = new Map<string, Subprocess>();
private readonly sourceUrl: string;
constructor(sourceUrl?: string) {
// Convention: the single program feed OBS is configured to push locally.
this.sourceUrl = sourceUrl || "rtmp://127.0.0.1:1935/live/produced";
}
get activeNames(): string[] {
return Array.from(this.active.keys());
}
getStatus(): FanoutStatus[] {
const out: FanoutStatus[] = [];
for (const [name, proc] of this.active) {
// We don't have the original url here; callers that care pass destinations.
out.push({ name, url: "(active)", active: true });
}
return out;
}
/**
* Start a fanout for the given destination. Idempotent: if already running, no-op.
*/
async start(target: Destination): Promise<void> {
if (this.active.has(target.name)) {
console.log(`[fanout] ${target.name} already active`);
return;
}
const proc = spawn(
[
"ffmpeg",
"-nostdin",
"-loglevel",
"warning",
"-i",
this.sourceUrl,
"-c",
"copy",
"-f",
"flv",
target.url,
],
{
stdout: "pipe",
stderr: "pipe",
},
);
// fire-and-forget exit handler
proc.exited
.then((code) => {
console.log(`[fanout] ${target.name} exited (code=${code})`);
this.active.delete(target.name);
})
.catch((e) => {
console.warn(`[fanout] ${target.name} exit promise rejected`, e);
this.active.delete(target.name);
});
// also pipe stderr for diagnostics (non-blocking)
if (proc.stderr) {
(async () => {
try {
const reader = proc.stderr.getReader();
// we just drain; real logs go to docker anyway
// eslint-disable-next-line no-constant-condition
while (true) {
const { done } = await reader.read();
if (done) break;
}
} catch {
/* ignore */
}
})();
}
this.active.set(target.name, proc);
console.log(`[fanout] started ${target.name} -> ${target.url}`);
}
stop(name: string): void {
const p = this.active.get(name);
if (!p) return;
try {
p.kill();
} catch (e) {
console.warn(`[fanout] kill ${name} failed`, e);
}
this.active.delete(name);
console.log(`[fanout] stopped ${name}`);
}
stopAll(): void {
for (const [name, p] of this.active) {
try {
p.kill();
} catch {
/* ignore */
}
console.log(`[fanout] stopped ${name}`);
}
this.active.clear();
}
/**
* Start fanouts for all provided destinations (used by start-broadcast).
* Failures on individual fanouts are logged but do not abort the set.
*/
async startAll(destinations: Destination[]): Promise<string[]> {
const started: string[] = [];
for (const d of destinations) {
try {
await this.start(d);
started.push(d.name);
} catch (e) {
console.error(`[fanout] failed to start ${d.name}:`, e);
}
}
return started;
}
}