lilith-platform.live/codebase/@features/api/src/processors/index.ts
Natalie 231b58b2d3 chore(ci): typecheck-all.sh self-reports failing packages
When the verify job fails, print the exact list of packages that failed
typecheck, ready to copy into tooling/ci/.typecheck-debt. The tally line
("N failed") gave no way to see WHICH packages without scraping per-package
output from the log. Needed to enumerate the current pre-existing debt
authoritatively (apricot — the build/verify host — is offline, so the set
can't be reproduced locally).

Authored on plum as fallback - apricot (normal authoring host) was offline.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-19 05:05:39 -05:00

121 lines
4.9 KiB
TypeScript

import type { Sql } from '@/shared/db';
import { logger } from '@/shared/logger';
import { startContactRenderer } from './contact-renderer/index';
import { reconcileContactStats } from './contact-stats-reconciler/index';
import { startContentClassifier } from './content-classifier/index';
import type { CmConfig } from './content-classifier/classifier';
import { startRelationshipResolver } from './relationship-resolver/index';
import { startGeoInference } from './geo-inference/index';
import { startOutreachDispatcher } from './outreach-dispatcher/index';
import { startInterestDetector } from './interest-detector/index';
import { startCalendarProjection } from './calendar-projection/index';
import { startWaitlistDigest } from './waitlist-digest/index';
import { startCalendarEventEnrichment } from './calendar-event-enrichment/index';
import { startCalendarEventParticipantTrigger } from './calendar-event-participant-trigger/index';
import { startPiiExtractor } from './pii-extractor/index';
import { startProspectClassifier } from './prospect-classifier/index';
import { getProspectLlmClient } from '@/shared/llm/prospect-llm-client';
import type { WaitlistDigestConfig } from './waitlist-digest/index';
export type Processor = () => Promise<void>;
export interface OutreachConfig {
macSyncBaseUrl: string;
macSyncServiceToken: string;
}
export type { WaitlistDigestConfig };
/**
* Starts all LISTEN-based processors. Each processor is a self-contained
* async function that opens a LISTEN connection and loops indefinitely.
*
* Call this after migrations complete in server.ts.
* On shutdown (SIGINT/SIGTERM), each processor cleans up its connection.
*/
export async function startProcessors(
sqls: {
quinn: Sql;
icloud: Sql;
},
cm: CmConfig,
outreachConfig?: OutreachConfig,
digestConfig?: WaitlistDigestConfig,
): Promise<void> {
if (process.env['RUN_PROCESSORS'] === 'false') {
logger.info('processors disabled (RUN_PROCESSORS=false)');
return;
}
logger.info('processors starting');
// One-shot catch-up: the live resolver + renderer below only act on *new* inbound
// via LISTEN/NOTIFY, so any message that predates a relationship (e.g. Apple-Contacts
// imports) is never counted. Reconcile inbound stats from macsync on boot so the gap
// can't silently reopen. Monotonic + idempotent — never lowers a value. Non-blocking
// so it doesn't delay the LISTEN loops; opt out with RECONCILE_STATS_ON_STARTUP=false.
if (process.env['RECONCILE_STATS_ON_STARTUP'] !== 'false') {
void reconcileContactStats(sqls.quinn, sqls.icloud)
.then((result) => logger.info('contact-stats reconcile (startup catch-up) complete', { ...result }))
.catch((err) =>
logger.error('contact-stats reconcile (startup catch-up) failed', {
error: err instanceof Error ? err.message : String(err),
}),
);
}
const processors: Processor[] = [
() => startContentClassifier(sqls.quinn, sqls.icloud, cm),
() => startRelationshipResolver(sqls.quinn, sqls.icloud),
() => startGeoInference(sqls.quinn, sqls.icloud),
() => startInterestDetector(sqls.quinn, sqls.icloud),
() => startContactRenderer(sqls.quinn, sqls.icloud),
() => startPiiExtractor(sqls.quinn, sqls.icloud),
() => startCalendarProjection(sqls.quinn, sqls.icloud),
];
// Optional: live prospect classification on new inbound. auto (default) uses
// model-boss when MODEL_BOSS_URL is set (apricot prod), else claude-code-sdk.
const prospectLlm = getProspectLlmClient();
if (prospectLlm) {
processors.push(() => startProspectClassifier(sqls.quinn, sqls.icloud, prospectLlm));
} else {
logger.info('prospect-classifier disabled (no LLM backend — set MODEL_BOSS_URL or install claude CLI)');
}
// Optional: start outreach-dispatcher if mac-sync config is available
if (outreachConfig?.macSyncServiceToken) {
processors.push(() =>
startOutreachDispatcher(sqls.quinn, sqls.icloud, {
serviceToken: outreachConfig.macSyncServiceToken,
macSyncBaseUrl: outreachConfig.macSyncBaseUrl,
}),
);
}
// Optional: start daily waitlist digest if provider email is configured
if (digestConfig?.providerEmail) {
processors.push(() => startWaitlistDigest(sqls.quinn, digestConfig));
}
void startCalendarEventEnrichment(sqls.quinn, {
MODEL_BOSS_URL: process.env['MODEL_BOSS_URL'] ?? '',
MODEL_BOSS_API_KEY: process.env['MODEL_BOSS_API_KEY'],
MODEL_BOSS_MODEL: process.env['MODEL_BOSS_MODEL'] ?? 'auto',
});
void startCalendarEventParticipantTrigger(sqls.quinn);
for (const processor of processors) {
processor().catch((err) => {
logger.error('processor crashed', {
error: err instanceof Error ? err.message : String(err),
stack: err instanceof Error ? err.stack : undefined,
});
// Let it crash; systemd will restart the service
process.exit(1);
});
}
logger.info('all processors started');
}