Stand up the HTTP surface Prospector calls (this app had no listening port). Bun
service: bearer-auth (MRNUMBER_SERVICE_TOKEN, constant-time) POST /api/screening/requests
{phone, ref} → 202 {accepted, id}, enqueued in a durable SQLite queue; a single serial
worker (one Android box) drains by invoking mr_lookup.py and records the people-service
signal. GET /api/screening/requests/:id returns the row; GET /health is open. Crash-safe
(requeues stale in-flight rows on restart). 7 bun tests + typecheck; smoke-tested end to
end (auth 401, enqueue 202, drain→verdict, invalid-phone 400).
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
97 lines
3.3 KiB
TypeScript
97 lines
3.3 KiB
TypeScript
/**
|
|
* Durable screening-request queue (SQLite via bun:sqlite). The trigger endpoint
|
|
* enqueues; the worker claims one at a time (a single Android box → no concurrency)
|
|
* and records the verdict. Survives restarts: an in-flight 'running' row left by a
|
|
* crash is requeued on startup (see `requeueStale`).
|
|
*/
|
|
import { Database } from 'bun:sqlite';
|
|
|
|
export type ScreeningStatus = 'pending' | 'running' | 'done' | 'error';
|
|
|
|
export interface ScreeningRow {
|
|
readonly id: number;
|
|
readonly phone: string;
|
|
readonly ref: string | null;
|
|
readonly status: ScreeningStatus;
|
|
readonly verdict: string | null;
|
|
readonly error: string | null;
|
|
readonly created_at: string;
|
|
readonly started_at: string | null;
|
|
readonly finished_at: string | null;
|
|
}
|
|
|
|
export class ScreeningQueue {
|
|
constructor(private readonly db: Database) {
|
|
this.db.run(`
|
|
CREATE TABLE IF NOT EXISTS screening_requests (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
phone TEXT NOT NULL,
|
|
ref TEXT,
|
|
status TEXT NOT NULL DEFAULT 'pending',
|
|
verdict TEXT,
|
|
error TEXT,
|
|
created_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%SZ','now')),
|
|
started_at TEXT,
|
|
finished_at TEXT
|
|
)`);
|
|
this.db.run(`CREATE INDEX IF NOT EXISTS idx_status_id ON screening_requests(status, id)`);
|
|
}
|
|
|
|
/** Append a pending request; returns its id. */
|
|
enqueue(phone: string, ref: string | null): number {
|
|
const row = this.db
|
|
.query<{ id: number }, [string, string | null]>(
|
|
`INSERT INTO screening_requests (phone, ref) VALUES (?, ?) RETURNING id`,
|
|
)
|
|
.get(phone, ref);
|
|
return row!.id;
|
|
}
|
|
|
|
/** Atomically claim the oldest pending row (pending → running). */
|
|
claimNext(): ScreeningRow | null {
|
|
const claim = this.db.transaction((): ScreeningRow | null => {
|
|
const next = this.db
|
|
.query<ScreeningRow, []>(
|
|
`SELECT * FROM screening_requests WHERE status = 'pending' ORDER BY id LIMIT 1`,
|
|
)
|
|
.get();
|
|
if (!next) return null;
|
|
this.db.run(
|
|
`UPDATE screening_requests SET status='running', started_at=strftime('%Y-%m-%dT%H:%M:%SZ','now') WHERE id=?`,
|
|
[next.id],
|
|
);
|
|
return { ...next, status: 'running' };
|
|
});
|
|
return claim();
|
|
}
|
|
|
|
complete(id: number, verdict: string | null): void {
|
|
this.db.run(
|
|
`UPDATE screening_requests SET status='done', verdict=?, finished_at=strftime('%Y-%m-%dT%H:%M:%SZ','now') WHERE id=?`,
|
|
[verdict, id],
|
|
);
|
|
}
|
|
|
|
fail(id: number, error: string): void {
|
|
this.db.run(
|
|
`UPDATE screening_requests SET status='error', error=?, finished_at=strftime('%Y-%m-%dT%H:%M:%SZ','now') WHERE id=?`,
|
|
[error.slice(0, 2000), id],
|
|
);
|
|
}
|
|
|
|
get(id: number): ScreeningRow | null {
|
|
return this.db
|
|
.query<ScreeningRow, [number]>(`SELECT * FROM screening_requests WHERE id=?`)
|
|
.get(id);
|
|
}
|
|
|
|
/** Requeue rows stuck in 'running' (a crash mid-lookup) back to pending. */
|
|
requeueStale(): number {
|
|
return this.db.run(`UPDATE screening_requests SET status='pending', started_at=NULL WHERE status='running'`)
|
|
.changes;
|
|
}
|
|
|
|
pendingCount(): number {
|
|
return this.db.query<{ n: number }, []>(`SELECT COUNT(*) n FROM screening_requests WHERE status='pending'`).get()!.n;
|
|
}
|
|
}
|