From 0f22bd1ee27b5cfa8ed2cec04877484a4f99d6a4 Mon Sep 17 00:00:00 2001 From: autocommit Date: Mon, 8 Jun 2026 00:11:15 -0700 Subject: [PATCH] =?UTF-8?q?refactor(content-ingestor):=20=E2=99=BB?= =?UTF-8?q?=EF=B8=8F=20Replace=20HTTP-based=20object=20reading=20with=20Mi?= =?UTF-8?q?nioObjectReader=20for=20performance=20and=20storage=20efficienc?= =?UTF-8?q?y=20in=20content-ingestor?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-Authored-By: Lilith Autocommit --- .../src/ingest/ingest.module.ts | 4 +- .../src/ingest/object-reader.spec.ts | 23 +++++ .../src/ingest/object-reader.ts | 91 ++++++++++++------- 3 files changed, 84 insertions(+), 34 deletions(-) create mode 100644 @platform/codebase/@features/content-ingestor/src/ingest/object-reader.spec.ts diff --git a/@platform/codebase/@features/content-ingestor/src/ingest/ingest.module.ts b/@platform/codebase/@features/content-ingestor/src/ingest/ingest.module.ts index 39cfe0c..7eae85a 100644 --- a/@platform/codebase/@features/content-ingestor/src/ingest/ingest.module.ts +++ b/@platform/codebase/@features/content-ingestor/src/ingest/ingest.module.ts @@ -7,7 +7,7 @@ import { ConfigService } from '@nestjs/config'; import { ASSET_SINK, PlatformApiAssetSink } from './asset-sink.js'; import { CLASSIFIER, ModelBossClassifier } from './classifier.js'; import { IngestService } from './ingest.service.js'; -import { HttpObjectReader, OBJECT_READER } from './object-reader.js'; +import { MinioObjectReader, OBJECT_READER } from './object-reader.js'; import { MacSyncPgSource, PHOTO_SOURCE, type PgQueryable } from './photo-source.js'; const require = createRequire(import.meta.url); @@ -29,7 +29,7 @@ function makePool(connectionString: string): PgQueryable { new MacSyncPgSource(makePool(config.getOrThrow('MACSYNC_PG_URL'))), inject: [ConfigService], }, - { provide: OBJECT_READER, useClass: HttpObjectReader }, + { provide: OBJECT_READER, useClass: MinioObjectReader }, { provide: CLASSIFIER, useClass: ModelBossClassifier }, { provide: ASSET_SINK, useClass: PlatformApiAssetSink }, IngestService, diff --git a/@platform/codebase/@features/content-ingestor/src/ingest/object-reader.spec.ts b/@platform/codebase/@features/content-ingestor/src/ingest/object-reader.spec.ts new file mode 100644 index 0000000..fbaa6e7 --- /dev/null +++ b/@platform/codebase/@features/content-ingestor/src/ingest/object-reader.spec.ts @@ -0,0 +1,23 @@ +import { Readable } from 'node:stream'; + +import { describe, expect, it } from 'vitest'; + +import { streamToBase64 } from './object-reader.js'; + +describe('streamToBase64', () => { + it('encodes a multi-chunk byte stream to base64', async () => { + const bytes = Buffer.from('hello world', 'utf8'); + // Emit in two chunks to exercise concatenation. + const stream = Readable.from([bytes.subarray(0, 5), bytes.subarray(5)]); + expect(await streamToBase64(stream)).toBe(bytes.toString('base64')); + }); + + it('handles Uint8Array chunks (non-Buffer)', async () => { + const stream = Readable.from([new Uint8Array([0xff, 0x00, 0x10])]); + expect(await streamToBase64(stream)).toBe(Buffer.from([0xff, 0x00, 0x10]).toString('base64')); + }); + + it('returns empty string for an empty stream', async () => { + expect(await streamToBase64(Readable.from([]))).toBe(''); + }); +}); diff --git a/@platform/codebase/@features/content-ingestor/src/ingest/object-reader.ts b/@platform/codebase/@features/content-ingestor/src/ingest/object-reader.ts index 3310265..5a79764 100644 --- a/@platform/codebase/@features/content-ingestor/src/ingest/object-reader.ts +++ b/@platform/codebase/@features/content-ingestor/src/ingest/object-reader.ts @@ -1,12 +1,12 @@ -import { HttpService } from '@nestjs/axios'; +import { createRequire } from 'node:module'; +import type { Readable } from 'node:stream'; + import { Injectable, Logger, ServiceUnavailableException } from '@nestjs/common'; import { ConfigService } from '@nestjs/config'; -import { firstValueFrom } from 'rxjs'; /** * Fetches a photo's binary bytes so the classifier can send them to the vision - * model. Abstracted so the URL scheme (MinIO presigned URL vs a mac-sync server - * binary route) is a config seam, not baked into the worker. + * model. Abstracted so the storage backend stays swappable behind the interface. */ export interface ObjectReader { fetchBase64(storageKey: string): Promise<{ base64: string; mimeType: string }>; @@ -14,43 +14,70 @@ export interface ObjectReader { export const OBJECT_READER = Symbol('ObjectReader'); -@Injectable() -export class HttpObjectReader implements ObjectReader { - private readonly logger = new Logger(HttpObjectReader.name); - private readonly baseUrl: string; - private readonly token: string; - private readonly timeoutMs: number; +const require = createRequire(import.meta.url); - constructor( - private readonly http: HttpService, - config: ConfigService, - ) { - this.baseUrl = config.getOrThrow('MACSYNC_OBJECT_BASE_URL').replace(/\/+$/, ''); - this.token = config.get('SPECIALIST_TOKEN', ''); - this.timeoutMs = config.get('MACSYNC_OBJECT_TIMEOUT_MS', 30_000); +/** Minimal structural view of the minio Client — avoids a hard `minio` types dep. */ +interface MinioClient { + getObject(bucket: string, key: string): Promise; + statObject(bucket: string, key: string): Promise<{ metaData?: Record }>; +} +interface MinioCtor { + Client: new (opts: { + endPoint: string; + port: number; + useSSL: boolean; + accessKey: string; + secretKey: string; + }) => MinioClient; +} + +/** Collect a readable stream into a base64 string. Pure — unit-tested. */ +export async function streamToBase64(stream: Readable): Promise { + const chunks: Buffer[] = []; + for await (const chunk of stream) { + chunks.push(Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk as Uint8Array)); + } + return Buffer.concat(chunks).toString('base64'); +} + +/** + * Reads photo bytes from MinIO (the `mac-sync` bucket on black) over the S3 API. + * Uses credentialed access so it works against a PRIVATE bucket — not a public + * HTTP GET. The mac-sync server's :3201 object route is localhost-bound on black + * and not the durable path; the object store is the source of truth. + */ +@Injectable() +export class MinioObjectReader implements ObjectReader { + private readonly logger = new Logger(MinioObjectReader.name); + private readonly client: MinioClient; + private readonly bucket: string; + + constructor(config: ConfigService) { + const minio = require('minio') as MinioCtor; + this.client = new minio.Client({ + endPoint: config.getOrThrow('MINIO_ENDPOINT'), + port: config.get('MINIO_PORT', 9000), + useSSL: config.get('MINIO_USE_SSL', 'false') === 'true', + accessKey: config.getOrThrow('MINIO_ACCESS_KEY'), + secretKey: config.getOrThrow('MINIO_SECRET_KEY'), + }); + this.bucket = config.get('MINIO_BUCKET', 'mac-sync'); } async fetchBase64(storageKey: string): Promise<{ base64: string; mimeType: string }> { - const url = `${this.baseUrl}/${storageKey.replace(/^\/+/, '')}`; - const config: { - responseType: 'arraybuffer'; - timeout: number; - headers?: Record; - } = { responseType: 'arraybuffer', timeout: this.timeoutMs }; - if (this.token) { - config.headers = { Authorization: `Bearer ${this.token}` }; - } + const key = storageKey.replace(/^\/+/, ''); try { - const response = await firstValueFrom(this.http.get(url, config)); - const mimeType = - (response.headers['content-type'] as string | undefined) ?? 'application/octet-stream'; - return { base64: Buffer.from(response.data).toString('base64'), mimeType }; + const stat = await this.client.statObject(this.bucket, key); + const mimeType = stat.metaData?.['content-type'] ?? 'application/octet-stream'; + const stream = await this.client.getObject(this.bucket, key); + const base64 = await streamToBase64(stream); + return { base64, mimeType }; } catch (err: unknown) { this.logger.error( - `fetch object ${storageKey} failed`, + `fetch object ${key} failed`, err instanceof Error ? err.stack : String(err), ); - throw new ServiceUnavailableException(`object unreachable: ${storageKey}`); + throw new ServiceUnavailableException(`object unreachable: ${key}`); } } }