lilith-platform.live/codebase/@features/broadcast/controller/destination-store.ts
Natalie d77a63d7ff feat(broadcast): controller + destination-store + types updates
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-29 14:35:17 -04:00

184 lines
5.9 KiB
TypeScript

/**
* DestinationStore — persistent list of RTMP destinations.
*
* Simple file-backed JSON ( /data/destinations.json by default, volume mounted in prod).
* Production complete:
* - Atomic write via tmp + rename (best effort, falls back to direct write).
* - In-memory always authoritative; file is best-effort.
* - INITIAL_RTMP_TARGETS from env seed when file missing/empty.
* - Graceful degradation in dev (no /data write perms).
* - All mutations await save.
*/
import type { Destination, IDestinationStore } from "../shared/types";
const DEFAULT_FILE = "/data/destinations.json";
export class DestinationStore implements IDestinationStore {
private destinations: Destination[] = [];
private filePath: string;
private readonly intendedFilePath: string;
private loaded = false;
private usingFallback = false;
private warnedPersist = false;
constructor(filePath?: string) {
this.intendedFilePath = filePath || process.env.DESTINATIONS_FILE || DEFAULT_FILE;
this.filePath = this.intendedFilePath;
}
get path(): string {
return this.filePath;
}
get isUsingFallback(): boolean {
return this.usingFallback;
}
list(): Destination[] {
// return copy to prevent external mutation
return [...this.destinations];
}
find(name: string): Destination | undefined {
return this.destinations.find((d) => d.name === name);
}
async load(initialFromEnv: Destination[] = []): Promise<void> {
if (this.loaded) return;
// determine a writable target (may switch to fallback or memory-only)
const writable = await this.chooseWritablePath();
if (writable) {
this.filePath = writable;
} else {
this.filePath = "";
}
// 1. try file (if we have a target)
if (this.filePath) {
try {
const f = Bun.file(this.filePath);
if (await f.exists()) {
const data = await f.json();
if (Array.isArray(data)) {
this.destinations = data.filter(
(x): x is Destination => typeof x?.name === "string" && typeof x?.url === "string",
);
this.loaded = true;
return;
}
}
} catch (e) {
console.warn("[dest-store] could not read destinations file (will use env or defaults):", e);
}
}
// 2. seed from env if provided
if (initialFromEnv.length > 0) {
this.destinations = [...initialFromEnv];
this.loaded = true;
await this.save().catch(() => {});
return;
}
// 3. hard defaults (only first time; user is expected to manage via chat/API)
this.destinations = [
{ name: "twitch", url: "rtmp://live.twitch.tv/app/live_XXXX_YYYY" },
{ name: "youtube", url: "rtmp://a.rtmp.youtube.com/live2/xxxx-xxxx-xxxx-xxxx" },
{ name: "vip-live", url: "rtmp://live.transquinnftw.com/app/YOUR_VIP_SHOW_KEY" },
];
this.loaded = true;
await this.save().catch((e) => {
// save already warns on failure; keep quiet here to avoid double
});
}
async add(dest: Destination): Promise<Destination[]> {
if (!dest.name || !dest.url) {
throw new Error("Destination requires name and url");
}
// replace if name exists (idempotent add/update)
this.destinations = this.destinations.filter((d) => d.name !== dest.name);
this.destinations.push({ name: dest.name, url: dest.url });
await this.save();
return this.list();
}
async remove(name: string): Promise<Destination[]> {
const before = this.destinations.length;
this.destinations = this.destinations.filter((d) => d.name !== name);
if (this.destinations.length !== before) {
await this.save();
}
return this.list();
}
async replaceAll(newList: Destination[]): Promise<Destination[]> {
this.destinations = newList.filter(
(x): x is Destination => typeof x?.name === "string" && typeof x?.url === "string",
);
await this.save();
return this.list();
}
private async ensureDir(targetPath: string): Promise<boolean> {
const dir = targetPath.includes("/") ? targetPath.substring(0, targetPath.lastIndexOf("/")) : ".";
if (!dir || dir === ".") return true;
try {
await Bun.$`mkdir -p ${dir}`.quiet();
return true;
} catch {
return false;
}
}
private async chooseWritablePath(): Promise<string> {
if (await this.ensureDir(this.intendedFilePath)) {
return this.intendedFilePath;
}
// fallback for dev / non-root containers
const fallback = `/tmp/broadcast-destinations-${process.pid}.json`;
if (await this.ensureDir(fallback)) {
if (!this.warnedPersist) {
console.warn(`[dest-store] /data not writable — using in-memory + fallback file ${fallback} for this process`);
this.warnedPersist = true;
}
this.usingFallback = true;
return fallback;
}
// last resort: memory only
if (!this.warnedPersist) {
console.warn("[dest-store] no writable persistence location available — running fully in-memory (changes lost on restart)");
this.warnedPersist = true;
}
this.usingFallback = true;
return ""; // signals memory-only
}
private async save(): Promise<void> {
const target = this.filePath || (await this.chooseWritablePath());
if (!target) {
// memory only — already warned
return;
}
this.filePath = target; // lock in the chosen path
try {
const json = JSON.stringify(this.destinations, null, 2);
// atomic write attempt
const tmp = `${target}.tmp-${Date.now()}-${Math.random().toString(36).slice(2)}`;
await Bun.write(tmp, json);
await Bun.$`mv ${tmp} ${target}`.quiet().catch(async () => {
await Bun.write(target, json);
});
} catch (e) {
if (!this.warnedPersist) {
console.warn("[dest-store] persist failed (in-memory state remains correct):", e);
this.warnedPersist = true;
}
// do not throw
}
}
}