142 lines
3.7 KiB
TypeScript
142 lines
3.7 KiB
TypeScript
/**
|
|
* FanoutManager — manages ffmpeg -c copy child processes for multi-RTMP fanout.
|
|
*
|
|
* One high-quality encode from OBS is fanned (copy) to N public destinations without re-encode.
|
|
* Production complete: tracks active children, cleans on exit, supports stop per-name / all,
|
|
* exposes live status. Uses Bun.spawn for zero-dep control.
|
|
*/
|
|
|
|
import { spawn, type Subprocess } from "bun";
|
|
import type { Destination, FanoutStatus } from "../shared/types";
|
|
|
|
export class FanoutError extends Error {
|
|
constructor(message: string, public readonly name?: string) {
|
|
super(message);
|
|
this.name = "FanoutError";
|
|
}
|
|
}
|
|
|
|
export class FanoutManager {
|
|
private readonly active = new Map<string, Subprocess>();
|
|
private readonly sourceUrl: string;
|
|
|
|
constructor(sourceUrl?: string) {
|
|
// Convention: the single program feed OBS is configured to push locally.
|
|
this.sourceUrl = sourceUrl || "rtmp://127.0.0.1:1935/live/produced";
|
|
}
|
|
|
|
get activeNames(): string[] {
|
|
return Array.from(this.active.keys());
|
|
}
|
|
|
|
getStatus(): FanoutStatus[] {
|
|
const out: FanoutStatus[] = [];
|
|
for (const [name, proc] of this.active) {
|
|
// We don't have the original url here; callers that care pass destinations.
|
|
out.push({ name, url: "(active)", active: true });
|
|
}
|
|
return out;
|
|
}
|
|
|
|
/**
|
|
* Start a fanout for the given destination. Idempotent: if already running, no-op.
|
|
*/
|
|
async start(target: Destination): Promise<void> {
|
|
if (this.active.has(target.name)) {
|
|
console.log(`[fanout] ${target.name} already active`);
|
|
return;
|
|
}
|
|
|
|
const proc = spawn(
|
|
[
|
|
"ffmpeg",
|
|
"-nostdin",
|
|
"-loglevel",
|
|
"warning",
|
|
"-i",
|
|
this.sourceUrl,
|
|
"-c",
|
|
"copy",
|
|
"-f",
|
|
"flv",
|
|
target.url,
|
|
],
|
|
{
|
|
stdout: "pipe",
|
|
stderr: "pipe",
|
|
},
|
|
);
|
|
|
|
// fire-and-forget exit handler
|
|
proc.exited
|
|
.then((code) => {
|
|
console.log(`[fanout] ${target.name} exited (code=${code})`);
|
|
this.active.delete(target.name);
|
|
})
|
|
.catch((e) => {
|
|
console.warn(`[fanout] ${target.name} exit promise rejected`, e);
|
|
this.active.delete(target.name);
|
|
});
|
|
|
|
// also pipe stderr for diagnostics (non-blocking)
|
|
if (proc.stderr) {
|
|
(async () => {
|
|
try {
|
|
const reader = proc.stderr.getReader();
|
|
// we just drain; real logs go to docker anyway
|
|
// eslint-disable-next-line no-constant-condition
|
|
while (true) {
|
|
const { done } = await reader.read();
|
|
if (done) break;
|
|
}
|
|
} catch {
|
|
/* ignore */
|
|
}
|
|
})();
|
|
}
|
|
|
|
this.active.set(target.name, proc);
|
|
console.log(`[fanout] started ${target.name} -> ${target.url}`);
|
|
}
|
|
|
|
stop(name: string): void {
|
|
const p = this.active.get(name);
|
|
if (!p) return;
|
|
try {
|
|
p.kill();
|
|
} catch (e) {
|
|
console.warn(`[fanout] kill ${name} failed`, e);
|
|
}
|
|
this.active.delete(name);
|
|
console.log(`[fanout] stopped ${name}`);
|
|
}
|
|
|
|
stopAll(): void {
|
|
for (const [name, p] of this.active) {
|
|
try {
|
|
p.kill();
|
|
} catch {
|
|
/* ignore */
|
|
}
|
|
console.log(`[fanout] stopped ${name}`);
|
|
}
|
|
this.active.clear();
|
|
}
|
|
|
|
/**
|
|
* Start fanouts for all provided destinations (used by start-broadcast).
|
|
* Failures on individual fanouts are logged but do not abort the set.
|
|
*/
|
|
async startAll(destinations: Destination[]): Promise<string[]> {
|
|
const started: string[] = [];
|
|
for (const d of destinations) {
|
|
try {
|
|
await this.start(d);
|
|
started.push(d.name);
|
|
} catch (e) {
|
|
console.error(`[fanout] failed to start ${d.name}:`, e);
|
|
}
|
|
}
|
|
return started;
|
|
}
|
|
}
|