refactor(content-ingestor): ♻️ Replace HTTP-based object reading with MinioObjectReader for performance and storage efficiency in content-ingestor

Co-Authored-By: Lilith Autocommit <noreply@atlilith.com>
This commit is contained in:
autocommit 2026-06-08 00:11:15 -07:00
parent 07b9e05902
commit 0f22bd1ee2
3 changed files with 84 additions and 34 deletions

View file

@ -7,7 +7,7 @@ import { ConfigService } from '@nestjs/config';
import { ASSET_SINK, PlatformApiAssetSink } from './asset-sink.js';
import { CLASSIFIER, ModelBossClassifier } from './classifier.js';
import { IngestService } from './ingest.service.js';
import { HttpObjectReader, OBJECT_READER } from './object-reader.js';
import { MinioObjectReader, OBJECT_READER } from './object-reader.js';
import { MacSyncPgSource, PHOTO_SOURCE, type PgQueryable } from './photo-source.js';
const require = createRequire(import.meta.url);
@ -29,7 +29,7 @@ function makePool(connectionString: string): PgQueryable {
new MacSyncPgSource(makePool(config.getOrThrow<string>('MACSYNC_PG_URL'))),
inject: [ConfigService],
},
{ provide: OBJECT_READER, useClass: HttpObjectReader },
{ provide: OBJECT_READER, useClass: MinioObjectReader },
{ provide: CLASSIFIER, useClass: ModelBossClassifier },
{ provide: ASSET_SINK, useClass: PlatformApiAssetSink },
IngestService,

View file

@ -0,0 +1,23 @@
import { Readable } from 'node:stream';
import { describe, expect, it } from 'vitest';
import { streamToBase64 } from './object-reader.js';
describe('streamToBase64', () => {
it('encodes a multi-chunk byte stream to base64', async () => {
const bytes = Buffer.from('hello world', 'utf8');
// Emit in two chunks to exercise concatenation.
const stream = Readable.from([bytes.subarray(0, 5), bytes.subarray(5)]);
expect(await streamToBase64(stream)).toBe(bytes.toString('base64'));
});
it('handles Uint8Array chunks (non-Buffer)', async () => {
const stream = Readable.from([new Uint8Array([0xff, 0x00, 0x10])]);
expect(await streamToBase64(stream)).toBe(Buffer.from([0xff, 0x00, 0x10]).toString('base64'));
});
it('returns empty string for an empty stream', async () => {
expect(await streamToBase64(Readable.from([]))).toBe('');
});
});

View file

@ -1,12 +1,12 @@
import { HttpService } from '@nestjs/axios';
import { createRequire } from 'node:module';
import type { Readable } from 'node:stream';
import { Injectable, Logger, ServiceUnavailableException } from '@nestjs/common';
import { ConfigService } from '@nestjs/config';
import { firstValueFrom } from 'rxjs';
/**
* Fetches a photo's binary bytes so the classifier can send them to the vision
* model. Abstracted so the URL scheme (MinIO presigned URL vs a mac-sync server
* binary route) is a config seam, not baked into the worker.
* model. Abstracted so the storage backend stays swappable behind the interface.
*/
export interface ObjectReader {
fetchBase64(storageKey: string): Promise<{ base64: string; mimeType: string }>;
@ -14,43 +14,70 @@ export interface ObjectReader {
export const OBJECT_READER = Symbol('ObjectReader');
@Injectable()
export class HttpObjectReader implements ObjectReader {
private readonly logger = new Logger(HttpObjectReader.name);
private readonly baseUrl: string;
private readonly token: string;
private readonly timeoutMs: number;
const require = createRequire(import.meta.url);
constructor(
private readonly http: HttpService,
config: ConfigService,
) {
this.baseUrl = config.getOrThrow<string>('MACSYNC_OBJECT_BASE_URL').replace(/\/+$/, '');
this.token = config.get<string>('SPECIALIST_TOKEN', '');
this.timeoutMs = config.get<number>('MACSYNC_OBJECT_TIMEOUT_MS', 30_000);
/** Minimal structural view of the minio Client — avoids a hard `minio` types dep. */
interface MinioClient {
getObject(bucket: string, key: string): Promise<Readable>;
statObject(bucket: string, key: string): Promise<{ metaData?: Record<string, string> }>;
}
interface MinioCtor {
Client: new (opts: {
endPoint: string;
port: number;
useSSL: boolean;
accessKey: string;
secretKey: string;
}) => MinioClient;
}
/** Collect a readable stream into a base64 string. Pure — unit-tested. */
export async function streamToBase64(stream: Readable): Promise<string> {
const chunks: Buffer[] = [];
for await (const chunk of stream) {
chunks.push(Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk as Uint8Array));
}
return Buffer.concat(chunks).toString('base64');
}
/**
* Reads photo bytes from MinIO (the `mac-sync` bucket on black) over the S3 API.
* Uses credentialed access so it works against a PRIVATE bucket not a public
* HTTP GET. The mac-sync server's :3201 object route is localhost-bound on black
* and not the durable path; the object store is the source of truth.
*/
@Injectable()
export class MinioObjectReader implements ObjectReader {
private readonly logger = new Logger(MinioObjectReader.name);
private readonly client: MinioClient;
private readonly bucket: string;
constructor(config: ConfigService) {
const minio = require('minio') as MinioCtor;
this.client = new minio.Client({
endPoint: config.getOrThrow<string>('MINIO_ENDPOINT'),
port: config.get<number>('MINIO_PORT', 9000),
useSSL: config.get<string>('MINIO_USE_SSL', 'false') === 'true',
accessKey: config.getOrThrow<string>('MINIO_ACCESS_KEY'),
secretKey: config.getOrThrow<string>('MINIO_SECRET_KEY'),
});
this.bucket = config.get<string>('MINIO_BUCKET', 'mac-sync');
}
async fetchBase64(storageKey: string): Promise<{ base64: string; mimeType: string }> {
const url = `${this.baseUrl}/${storageKey.replace(/^\/+/, '')}`;
const config: {
responseType: 'arraybuffer';
timeout: number;
headers?: Record<string, string>;
} = { responseType: 'arraybuffer', timeout: this.timeoutMs };
if (this.token) {
config.headers = { Authorization: `Bearer ${this.token}` };
}
const key = storageKey.replace(/^\/+/, '');
try {
const response = await firstValueFrom(this.http.get<ArrayBuffer>(url, config));
const mimeType =
(response.headers['content-type'] as string | undefined) ?? 'application/octet-stream';
return { base64: Buffer.from(response.data).toString('base64'), mimeType };
const stat = await this.client.statObject(this.bucket, key);
const mimeType = stat.metaData?.['content-type'] ?? 'application/octet-stream';
const stream = await this.client.getObject(this.bucket, key);
const base64 = await streamToBase64(stream);
return { base64, mimeType };
} catch (err: unknown) {
this.logger.error(
`fetch object ${storageKey} failed`,
`fetch object ${key} failed`,
err instanceof Error ? err.stack : String(err),
);
throw new ServiceUnavailableException(`object unreachable: ${storageKey}`);
throw new ServiceUnavailableException(`object unreachable: ${key}`);
}
}
}