diff --git a/tools/nightcrawler/data b/tools/nightcrawler/data new file mode 120000 index 000000000..6bf9c7074 --- /dev/null +++ b/tools/nightcrawler/data @@ -0,0 +1 @@ +/mnt/bigdisk/nightcrawler \ No newline at end of file diff --git a/tools/nightcrawler/frontend-admin/src/pages/CrawlJobs.tsx b/tools/nightcrawler/frontend-admin/src/pages/CrawlJobs.tsx index bd259918b..eb3cdfbe9 100644 --- a/tools/nightcrawler/frontend-admin/src/pages/CrawlJobs.tsx +++ b/tools/nightcrawler/frontend-admin/src/pages/CrawlJobs.tsx @@ -36,7 +36,19 @@ const MetricsRow = styled.div` display: flex; gap: 12px; margin-bottom: 20px; - flex-wrap: wrap; + align-items: stretch; +`; + +const PieChartCell = styled.div` + flex: 0 0 auto; + + /* Strip ChartContainer padding to fit in the metrics row */ + > div { + height: 100%; + padding: 12px; + display: flex; + align-items: center; + } `; const ErrorBanner = styled.div` @@ -136,6 +148,15 @@ const DangerButton = styled(SecondaryButton)` } `; +const GhostTag = styled.span` + font-size: 10px; + color: ${({ theme }) => theme.colors.warning?.main ?? '#f57c00'}; + background: ${({ theme }) => theme.colors.warning?.background ?? 'rgba(245, 124, 0, 0.15)'}; + padding: 1px 5px; + border-radius: 3px; + font-weight: 600; +`; + const ActionGroup = styled.div` display: flex; gap: 4px; @@ -365,10 +386,25 @@ export function CrawlJobs() { return new Date(b.startedAt).getTime() - new Date(a.startedAt).getTime(); }); + const isGhostSession = (s: CrawlSession): boolean => { + const isLive = s.status === 'running' || s.status === 'active'; + const ageMs = Date.now() - new Date(s.startedAt).getTime(); + return isLive && s.discovered === 0 && ageMs > 10 * 60_000; + }; + const sessionColumns: Column[] = [ { key: 'platform', label: 'Platform', render: (s) => s.platform }, { key: 'city', label: 'City', render: (s) => s.city }, - { key: 'status', label: 'Status', render: (s) => }, + { + key: 'status', + label: 'Status', + render: (s) => ( + + + {isGhostSession(s) && GHOST?} + + ), + }, { key: 'discovered', label: 'Discovered', sortable: true, render: (s) => s.discovered }, { key: 'errors', label: 'Errors', sortable: true, render: (s) => s.errors }, { key: 'duration', label: 'Duration', render: (s) => formatDuration(s.durationMs) }, @@ -427,7 +463,9 @@ export function CrawlJobs() { ? [...statusSlices, { label: '_pad', value: 0.001, color: statusSlices[0].color }] : statusSlices; return ( - + + + ); })()} diff --git a/tools/nightcrawler/frontend-admin/src/panels/ActivityPanel.tsx b/tools/nightcrawler/frontend-admin/src/panels/ActivityPanel.tsx index bffa25341..592c3bb1b 100644 --- a/tools/nightcrawler/frontend-admin/src/panels/ActivityPanel.tsx +++ b/tools/nightcrawler/frontend-admin/src/panels/ActivityPanel.tsx @@ -83,6 +83,16 @@ const TimeAgo = styled.span` white-space: nowrap; `; +const GhostTag = styled.span` + font-size: 10px; + color: ${({ theme }) => theme.colors.warning?.main ?? '#f57c00'}; + background: ${({ theme }) => theme.colors.warning?.background ?? 'rgba(245, 124, 0, 0.15)'}; + padding: 1px 5px; + border-radius: 3px; + font-weight: 600; + flex-shrink: 0; +`; + const EmptyState = styled.div` padding: 24px; color: ${({ theme }) => theme.colors.text.muted}; @@ -127,7 +137,16 @@ export function ActivityPanel() { usePolling(fetchSessions, { intervalMs: 5_000, immediate: true }); - if (sessions.length === 0) { + // Sort: running/active first, then by date descending + const sorted = [...sessions].sort((a, b) => { + const aLive = a.status === 'running' || a.status === 'active'; + const bLive = b.status === 'running' || b.status === 'active'; + if (aLive && !bLive) return -1; + if (!aLive && bLive) return 1; + return new Date(b.startedAt).getTime() - new Date(a.startedAt).getTime(); + }); + + if (sorted.length === 0) { return ( No sessions recorded @@ -138,12 +157,15 @@ export function ActivityPanel() { return ( - {sessions.map((session) => { + {sorted.map((session) => { const isLive = session.status === 'running' || session.status === 'active'; + const ageMs = Date.now() - new Date(session.startedAt).getTime(); + const isGhost = isLive && session.discovered === 0 && ageMs > 10 * 60_000; return ( {isLive && } + {isGhost && GHOST?} {session.platform}/{session.city} diff --git a/tools/nightcrawler/src/db/entities/platform-listing.entity.ts b/tools/nightcrawler/src/db/entities/platform-listing.entity.ts index b913596f9..df4bfe66b 100644 --- a/tools/nightcrawler/src/db/entities/platform-listing.entity.ts +++ b/tools/nightcrawler/src/db/entities/platform-listing.entity.ts @@ -45,6 +45,13 @@ export class PlatformListing { @Column({ type: 'jsonb', nullable: true }) similarProfileUrls?: string[]; + /** + * Absolute path to the most recent gzip'd raw HTML snapshot on bigdisk. + * Populated during crawl/reprocess when HTML capture is enabled. + */ + @Column({ type: 'varchar', length: 1024, nullable: true }) + rawHtmlPath?: string; + @Column({ type: 'timestamp', default: () => 'CURRENT_TIMESTAMP' }) lastScrapedAt!: Date; diff --git a/tools/nightcrawler/src/jobs/crawl-job-queue.ts b/tools/nightcrawler/src/jobs/crawl-job-queue.ts index 30fff3812..28c0f3f19 100644 --- a/tools/nightcrawler/src/jobs/crawl-job-queue.ts +++ b/tools/nightcrawler/src/jobs/crawl-job-queue.ts @@ -9,12 +9,14 @@ import type { PlatformId, CityId } from '../types'; // Job payload types export interface CrawlJobData { - type: 'crawl-platform-city' | 'crawl-full' | 'discover-selectors' | 'reprocess-provider'; + type: 'crawl-platform-city' | 'crawl-full' | 'discover-selectors' | 'reprocess-provider' | 'backfill-provider' | 'reprocess-from-html'; platform: PlatformId; city: CityId; maxPages?: number; providerId?: string; profileUrl?: string; + /** Absolute path to stored raw HTML (for reprocess-from-html jobs) */ + rawHtmlPath?: string; } export interface CrawlJobProgress { diff --git a/tools/nightcrawler/src/jobs/crawl-job-worker.ts b/tools/nightcrawler/src/jobs/crawl-job-worker.ts index 54c80f17a..88ae0442a 100644 --- a/tools/nightcrawler/src/jobs/crawl-job-worker.ts +++ b/tools/nightcrawler/src/jobs/crawl-job-worker.ts @@ -12,6 +12,7 @@ import { CrawlOrchestrator } from '../pipeline/orchestrator'; import { BrowserManager } from '../browser/browser-manager'; import { createAdapter } from '../adapters'; import { DiscoveredProvider } from '../db/entities/discovered-provider.entity'; +import { HtmlSnapshotStore } from '../storage/html-snapshot-store'; import { serverEvents } from '../api/event-emitter'; const QUEUE_NAME = 'nightcrawler-crawl'; @@ -71,10 +72,14 @@ export class CrawlJobWorker { config: CrawlConfig, dataSource: DataSource, ): Promise { - if (job.data.type === 'reprocess-provider') { + if (job.data.type === 'reprocess-provider' || job.data.type === 'backfill-provider') { return this.reprocessProvider(job, config, dataSource); } + if (job.data.type === 'reprocess-from-html') { + return this.reprocessFromHtml(job, config, dataSource); + } + const { platform, city, maxPages } = job.data; // Override config with job-specific settings @@ -176,6 +181,11 @@ export class CrawlJobWorker { // Always re-scrape the profile with latest selectors const profile = await adapter.scrapeProfile(page); + // Capture raw HTML to bigdisk for future re-extraction + const htmlStore = new HtmlSnapshotStore(); + const rawHtml = await page.content(); + const htmlPath = await htmlStore.save({ platform, providerId, html: rawHtml }); + // Only attempt contact reveal if contact info is incomplete let contact: import('../types').ContactInfo = {}; let contactRevealStatus: ContactRevealStatus = 'not_attempted'; @@ -208,6 +218,89 @@ export class CrawlJobWorker { contact, contactRevealStatus, contactRevealSkipped, + rawHtmlPath: htmlPath, + }); + + await job.updateProgress({ + profilesVisited: 1, + profilesDiscovered: 0, + profilesUpdated: 1, + profilesSkipped: 0, + errors: 0, + phase: 'complete', + } satisfies CrawlJobProgress); + + return result; + } finally { + await browserManager.close(platform); + } + } + + /** + * Reprocess a provider from stored raw HTML — no network hit, no CAPTCHA. + * Loads gzip'd HTML from bigdisk, feeds to page.setContent(), re-runs extraction. + */ + private async reprocessFromHtml( + job: Job, + config: CrawlConfig, + dataSource: DataSource, + ): Promise { + const { platform, city, providerId, rawHtmlPath } = job.data; + + if (!providerId || !rawHtmlPath) { + throw new Error('reprocess-from-html requires providerId and rawHtmlPath'); + } + + await job.updateProgress({ + profilesVisited: 0, + profilesDiscovered: 0, + profilesUpdated: 0, + profilesSkipped: 0, + errors: 0, + phase: 'bfs', + } satisfies CrawlJobProgress); + + const htmlStore = new HtmlSnapshotStore(); + const html = await htmlStore.read(rawHtmlPath); + + const adapter = createAdapter(platform, config); + const orchestrator = new CrawlOrchestrator(config, dataSource); + const browserManager = new BrowserManager(config); + + try { + await browserManager.launch(platform); + const page = await browserManager.getPage(platform); + + // Load stored HTML into page context — no network request + await page.setContent(html, { waitUntil: 'load' }); + + // Re-run extraction with current selectors + const profile = await adapter.scrapeProfile(page); + + // Look up provider to get profile URL for reprocess path + const providerRepo = dataSource.getRepository(DiscoveredProvider); + const provider = await providerRepo.findOne({ + where: { id: providerId }, + relations: ['listings'], + }); + + if (!provider) { + throw new Error(`Provider not found: ${providerId}`); + } + + const listing = provider.getMostRecentListing(); + const profileUrl = listing?.profileUrl ?? ''; + + const result = await orchestrator.reprocessProvider({ + providerId, + profileUrl, + platform, + city, + profile, + contact: {}, + contactRevealStatus: 'not_attempted', + contactRevealSkipped: true, + rawHtmlPath, }); await job.updateProgress({ diff --git a/tools/nightcrawler/src/pipeline/orchestrator.ts b/tools/nightcrawler/src/pipeline/orchestrator.ts index de9239e9b..f29f18122 100644 --- a/tools/nightcrawler/src/pipeline/orchestrator.ts +++ b/tools/nightcrawler/src/pipeline/orchestrator.ts @@ -22,6 +22,7 @@ import { BlocklistService } from './blocklist'; import { BrowserManager } from '../browser/browser-manager'; import { createAdapter } from '../adapters'; import { getCityConfig } from '../config/cities'; +import { HtmlSnapshotStore } from '../storage/html-snapshot-store'; import { DiscoveredProvider } from '../db/entities/discovered-provider.entity'; import { PlatformListing } from '../db/entities/platform-listing.entity'; import { PhotoHash } from '../db/entities/photo-hash.entity'; @@ -88,12 +89,15 @@ export class CrawlOrchestrator { private blocklist: BlocklistService; private circuitBreakers: Map; + private htmlStore: HtmlSnapshotStore; + constructor(config: CrawlConfig, dataSource: DataSource) { this.config = config; this.dataSource = dataSource; this.photoHasher = new PhotoHasher(); this.dedupEngine = new DedupEngine(dataSource, this.photoHasher); this.blocklist = new BlocklistService(dataSource); + this.htmlStore = new HtmlSnapshotStore(); this.circuitBreakers = new Map(); // Initialize circuit breakers per platform @@ -219,6 +223,18 @@ export class CrawlOrchestrator { const profile: ScrapedProfile = await adapter.scrapeProfile(page); + // Capture raw HTML to bigdisk for future re-extraction + let rawHtmlPath: string | undefined; + try { + const rawHtml = await page.content(); + // Extract provider identifier from profile URL for directory naming + const urlSegments = profileUrl.split('/'); + const providerSlug = urlSegments[urlSegments.length - 1] || urlSegments[urlSegments.length - 2] || 'unknown'; + rawHtmlPath = await this.htmlStore.save({ platform, providerId: providerSlug, html: rawHtml }); + } catch (htmlError) { + console.warn(`[${platform}] Failed to save HTML snapshot for ${profileUrl}:`, htmlError); + } + // Enqueue newly discovered similar profiles for (const similarUrl of profile.similarProfileUrls) { if (!visited.has(similarUrl)) { @@ -555,6 +571,7 @@ export class CrawlOrchestrator { contact: ContactInfo; contactRevealStatus: ContactRevealStatus; contactRevealSkipped: boolean; + rawHtmlPath?: string; }): Promise { const startTime = Date.now(); const { @@ -566,6 +583,7 @@ export class CrawlOrchestrator { contact, contactRevealStatus, contactRevealSkipped, + rawHtmlPath, } = params; const queryRunner = this.dataSource.createQueryRunner(); @@ -637,6 +655,7 @@ export class CrawlOrchestrator { ? profile.similarProfileUrls : undefined; listing.lastScrapedAt = new Date(); + if (rawHtmlPath) listing.rawHtmlPath = rawHtmlPath; await listingRepo.save(listing); } else { listing = listingRepo.create({ @@ -648,6 +667,7 @@ export class CrawlOrchestrator { ? profile.similarProfileUrls : undefined, lastScrapedAt: new Date(), + rawHtmlPath, }); await listingRepo.save(listing); } diff --git a/tools/nightcrawler/src/storage/html-snapshot-store.ts b/tools/nightcrawler/src/storage/html-snapshot-store.ts new file mode 100644 index 000000000..35eae8b62 --- /dev/null +++ b/tools/nightcrawler/src/storage/html-snapshot-store.ts @@ -0,0 +1,88 @@ +/** + * HtmlSnapshotStore — gzip'd HTML snapshot storage on bigdisk + * + * Saves raw page HTML to /mnt/bigdisk/nightcrawler/html/// + * with timestamped filenames and a `latest.html.gz` symlink. + * Uses node:zlib stream pipeline for memory-efficient compression. + */ + +import { mkdir, symlink, unlink, readlink } from 'node:fs/promises'; +import { createReadStream, createWriteStream } from 'node:fs'; +import { join, dirname } from 'node:path'; +import { pipeline } from 'node:stream/promises'; +import { createGzip, createGunzip } from 'node:zlib'; +import { Readable } from 'node:stream'; +import type { PlatformId } from '../types'; + +const HTML_ROOT = '/mnt/bigdisk/nightcrawler/html'; + +export interface SaveSnapshotParams { + platform: PlatformId; + providerId: string; + html: string; +} + +export class HtmlSnapshotStore { + /** + * Save gzip'd HTML to bigdisk, update `latest.html.gz` symlink. + * Returns the absolute path to the saved file. + */ + async save({ platform, providerId, html }: SaveSnapshotParams): Promise { + const dir = join(HTML_ROOT, platform, providerId); + await mkdir(dir, { recursive: true }); + + const timestamp = new Date().toISOString().replace(/:/g, '-'); + const filename = `${timestamp}.html.gz`; + const filepath = join(dir, filename); + + // Stream gzip write + const readable = Readable.from([html]); + const gzip = createGzip({ level: 6 }); + const writable = createWriteStream(filepath); + await pipeline(readable, gzip, writable); + + // Update latest symlink (atomic: remove old, create new) + const latestPath = join(dir, 'latest.html.gz'); + try { + await unlink(latestPath); + } catch { + // symlink didn't exist yet + } + await symlink(filename, latestPath); + + return filepath; + } + + /** + * Read and decompress a gzip'd HTML file, returning the full HTML string. + */ + async read(filepath: string): Promise { + const chunks: Buffer[] = []; + const readable = createReadStream(filepath); + const gunzip = createGunzip(); + + const collectStream = new (await import('node:stream')).Writable({ + write(chunk: Buffer, _encoding, callback) { + chunks.push(chunk); + callback(); + }, + }); + + await pipeline(readable, gunzip, collectStream); + return Buffer.concat(chunks).toString('utf-8'); + } + + /** + * Get the path to the latest snapshot symlink for a provider. + * Returns null if no snapshots exist. + */ + async getLatestPath(platform: PlatformId, providerId: string): Promise { + const latestPath = join(HTML_ROOT, platform, providerId, 'latest.html.gz'); + try { + await readlink(latestPath); + return latestPath; + } catch { + return null; + } + } +}