redroid-mrnumber/service/queue.ts
Natalie eb84d431f3 feat(service): inbound trigger service — POST /api/screening/requests + durable queue + worker
Stand up the HTTP surface Prospector calls (this app had no listening port). Bun
service: bearer-auth (MRNUMBER_SERVICE_TOKEN, constant-time) POST /api/screening/requests
{phone, ref} → 202 {accepted, id}, enqueued in a durable SQLite queue; a single serial
worker (one Android box) drains by invoking mr_lookup.py and records the people-service
signal. GET /api/screening/requests/:id returns the row; GET /health is open. Crash-safe
(requeues stale in-flight rows on restart). 7 bun tests + typecheck; smoke-tested end to
end (auth 401, enqueue 202, drain→verdict, invalid-phone 400).

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-29 13:57:54 -04:00

97 lines
3.3 KiB
TypeScript

/**
* Durable screening-request queue (SQLite via bun:sqlite). The trigger endpoint
* enqueues; the worker claims one at a time (a single Android box → no concurrency)
* and records the verdict. Survives restarts: an in-flight 'running' row left by a
* crash is requeued on startup (see `requeueStale`).
*/
import { Database } from 'bun:sqlite';
export type ScreeningStatus = 'pending' | 'running' | 'done' | 'error';
export interface ScreeningRow {
readonly id: number;
readonly phone: string;
readonly ref: string | null;
readonly status: ScreeningStatus;
readonly verdict: string | null;
readonly error: string | null;
readonly created_at: string;
readonly started_at: string | null;
readonly finished_at: string | null;
}
export class ScreeningQueue {
constructor(private readonly db: Database) {
this.db.run(`
CREATE TABLE IF NOT EXISTS screening_requests (
id INTEGER PRIMARY KEY AUTOINCREMENT,
phone TEXT NOT NULL,
ref TEXT,
status TEXT NOT NULL DEFAULT 'pending',
verdict TEXT,
error TEXT,
created_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%SZ','now')),
started_at TEXT,
finished_at TEXT
)`);
this.db.run(`CREATE INDEX IF NOT EXISTS idx_status_id ON screening_requests(status, id)`);
}
/** Append a pending request; returns its id. */
enqueue(phone: string, ref: string | null): number {
const row = this.db
.query<{ id: number }, [string, string | null]>(
`INSERT INTO screening_requests (phone, ref) VALUES (?, ?) RETURNING id`,
)
.get(phone, ref);
return row!.id;
}
/** Atomically claim the oldest pending row (pending → running). */
claimNext(): ScreeningRow | null {
const claim = this.db.transaction((): ScreeningRow | null => {
const next = this.db
.query<ScreeningRow, []>(
`SELECT * FROM screening_requests WHERE status = 'pending' ORDER BY id LIMIT 1`,
)
.get();
if (!next) return null;
this.db.run(
`UPDATE screening_requests SET status='running', started_at=strftime('%Y-%m-%dT%H:%M:%SZ','now') WHERE id=?`,
[next.id],
);
return { ...next, status: 'running' };
});
return claim();
}
complete(id: number, verdict: string | null): void {
this.db.run(
`UPDATE screening_requests SET status='done', verdict=?, finished_at=strftime('%Y-%m-%dT%H:%M:%SZ','now') WHERE id=?`,
[verdict, id],
);
}
fail(id: number, error: string): void {
this.db.run(
`UPDATE screening_requests SET status='error', error=?, finished_at=strftime('%Y-%m-%dT%H:%M:%SZ','now') WHERE id=?`,
[error.slice(0, 2000), id],
);
}
get(id: number): ScreeningRow | null {
return this.db
.query<ScreeningRow, [number]>(`SELECT * FROM screening_requests WHERE id=?`)
.get(id);
}
/** Requeue rows stuck in 'running' (a crash mid-lookup) back to pending. */
requeueStale(): number {
return this.db.run(`UPDATE screening_requests SET status='pending', started_at=NULL WHERE status='running'`)
.changes;
}
pendingCount(): number {
return this.db.query<{ n: number }, []>(`SELECT COUNT(*) n FROM screening_requests WHERE status='pending'`).get()!.n;
}
}