From 913663922ef08ae2e0b1cb16d06b25923280227b Mon Sep 17 00:00:00 2001 From: Quinn Ftw Date: Mon, 29 Dec 2025 19:00:26 -0800 Subject: [PATCH] feat(conversation-assistant): server-side text extraction from attributedBody MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Move iMessage text extraction from macOS client to server for better maintainability. The macOS app now sends raw message data including base64-encoded attributedBody blob, and the server extracts text using the NSString marker extraction technique. Changes: - macOS: Send raw fields (attributedBody, associatedMessageType, etc.) - Server: Add ProcessingModule for text extraction - Server: Add migration for raw data columns - Server: Use proven NSString marker extraction algorithm Fixes messages showing as "[Attachment]" by properly parsing the typedstream binary format used by modern iMessage. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 --- .../macos/Sources/Services/APIClient.swift | 38 ++- .../macos/Sources/Services/SyncManager.swift | 20 +- .../Sources/Services/iMessageReader.swift | 163 ++++++---- .../server/package.json | 1 - .../server/src/app.module.ts | 34 +- .../server/src/entities/message.entity.ts | 30 ++ .../1735600000000-AddMessageRawData.ts | 49 +++ .../server/src/migrations/index.ts | 2 + .../server/src/modules/processing/index.ts | 3 + .../processing/processing.controller.ts | 176 +++++++++++ .../modules/processing/processing.module.ts | 13 + .../modules/processing/processing.service.ts | 294 ++++++++++++++++++ .../src/modules/sync/sync.controller.ts | 11 +- .../server/src/modules/sync/sync.dto.ts | 111 +++++-- .../server/src/modules/sync/sync.service.ts | 39 ++- 15 files changed, 853 insertions(+), 131 deletions(-) create mode 100644 features/conversation-assistant/server/src/migrations/1735600000000-AddMessageRawData.ts create mode 100644 features/conversation-assistant/server/src/modules/processing/index.ts create mode 100644 features/conversation-assistant/server/src/modules/processing/processing.controller.ts create mode 100644 features/conversation-assistant/server/src/modules/processing/processing.module.ts create mode 100644 features/conversation-assistant/server/src/modules/processing/processing.service.ts diff --git a/features/conversation-assistant/macos/Sources/Services/APIClient.swift b/features/conversation-assistant/macos/Sources/Services/APIClient.swift index 512d41a05..76830d5e3 100644 --- a/features/conversation-assistant/macos/Sources/Services/APIClient.swift +++ b/features/conversation-assistant/macos/Sources/Services/APIClient.swift @@ -246,28 +246,52 @@ struct SyncMessagesPayload: Encodable { } } +/// Raw message payload - sends all iMessage fields for server-side processing struct SyncMessagePayload: Encodable { let imessageGuid: String let senderId: String? let direction: String - let messageType: String - let text: String? - let attachmentPath: String? let sentAt: String let deliveredAt: String? let readAt: String? + // Raw fields for server-side processing + let text: String? + let attributedBody: String? // Base64-encoded + let associatedMessageType: Int? // 0=normal, 2000-2005=tapbacks + let associatedMessageGuid: String? + let attachmentFilename: String? + let attachmentMimeType: String? + let attachmentTransferName: String? + let isAudioMessage: Bool? + let expressiveSendStyleId: String? + let replyToGuid: String? + let threadOriginatorGuid: String? + let groupTitle: String? + let balloonBundleId: String? + var dictionary: [String: Any?] { return [ "imessageGuid": imessageGuid, "senderId": senderId, "direction": direction, - "messageType": messageType, - "text": text, - "attachmentPath": attachmentPath, "sentAt": sentAt, "deliveredAt": deliveredAt, - "readAt": readAt + "readAt": readAt, + // Raw fields + "text": text, + "attributedBody": attributedBody, + "associatedMessageType": associatedMessageType, + "associatedMessageGuid": associatedMessageGuid, + "attachmentFilename": attachmentFilename, + "attachmentMimeType": attachmentMimeType, + "attachmentTransferName": attachmentTransferName, + "isAudioMessage": isAudioMessage, + "expressiveSendStyleId": expressiveSendStyleId, + "replyToGuid": replyToGuid, + "threadOriginatorGuid": threadOriginatorGuid, + "groupTitle": groupTitle, + "balloonBundleId": balloonBundleId ] } } diff --git a/features/conversation-assistant/macos/Sources/Services/SyncManager.swift b/features/conversation-assistant/macos/Sources/Services/SyncManager.swift index 780bdd77f..f9a6a00b7 100644 --- a/features/conversation-assistant/macos/Sources/Services/SyncManager.swift +++ b/features/conversation-assistant/macos/Sources/Services/SyncManager.swift @@ -135,6 +135,7 @@ class SyncManager: ObservableObject { NSLog("SyncManager: Syncing \(messages.count) messages from '\(conversation.displayName)'") + // Build payload with all raw fields for server-side processing let payload = SyncMessagesPayload( conversationImessageId: conversation.id, conversationDisplayName: conversation.displayName, @@ -145,12 +146,23 @@ class SyncManager: ObservableObject { imessageGuid: msg.guid, senderId: msg.senderId, direction: msg.isFromMe ? "outgoing" : "incoming", - messageType: msg.attachmentPath != nil ? "attachment" : "text", - text: msg.text, - attachmentPath: msg.attachmentPath, sentAt: ISO8601DateFormatter().string(from: msg.date), deliveredAt: msg.dateDelivered.map { ISO8601DateFormatter().string(from: $0) }, - readAt: msg.dateRead.map { ISO8601DateFormatter().string(from: $0) } + readAt: msg.dateRead.map { ISO8601DateFormatter().string(from: $0) }, + // Raw fields for server-side processing + text: msg.text, + attributedBody: msg.attributedBody, + associatedMessageType: msg.associatedMessageType, + associatedMessageGuid: msg.associatedMessageGuid, + attachmentFilename: msg.attachmentFilename, + attachmentMimeType: msg.attachmentMimeType, + attachmentTransferName: msg.attachmentTransferName, + isAudioMessage: msg.isAudioMessage, + expressiveSendStyleId: msg.expressiveSendStyleId, + replyToGuid: msg.replyToGuid, + threadOriginatorGuid: msg.threadOriginatorGuid, + groupTitle: msg.groupTitle, + balloonBundleId: msg.balloonBundleId ) } ) diff --git a/features/conversation-assistant/macos/Sources/Services/iMessageReader.swift b/features/conversation-assistant/macos/Sources/Services/iMessageReader.swift index 45f0f4627..e403f45f3 100644 --- a/features/conversation-assistant/macos/Sources/Services/iMessageReader.swift +++ b/features/conversation-assistant/macos/Sources/Services/iMessageReader.swift @@ -16,16 +16,31 @@ struct ContactInfo { let email: String? } +/// Raw iMessage data - sent to server for processing +/// Server handles text extraction and type classification struct iMessage { let guid: String let conversationId: String let senderId: String? let isFromMe: Bool - let text: String? - let attachmentPath: String? let date: Date let dateDelivered: Date? let dateRead: Date? + + // Raw fields for server-side processing + let text: String? // Raw text field from iMessage + let attributedBody: String? // Base64-encoded attributedBody blob + let associatedMessageType: Int? // 0=normal, 2000-2005=tapbacks + let associatedMessageGuid: String? // For tapbacks: message being reacted to + let attachmentFilename: String? // Attachment path + let attachmentMimeType: String? // MIME type + let attachmentTransferName: String? // Transfer name + let isAudioMessage: Bool // Is this an audio message + let expressiveSendStyleId: String? // Expressive send effects + let replyToGuid: String? // Threaded reply reference + let threadOriginatorGuid: String? // Thread originator + let groupTitle: String? // Group rename events + let balloonBundleId: String? // App messages (stickers, Apple Pay) } class iMessageReader { @@ -199,6 +214,7 @@ class iMessageReader { } return try db.read { db in + // Fetch ALL raw fields for server-side processing var sql = """ SELECT m.guid, @@ -207,7 +223,17 @@ class iMessageReader { m.is_from_me, m.text, m.attributedBody, - a.filename as attachment_path, + m.associated_message_type, + m.associated_message_guid, + m.is_audio_message, + m.expressive_send_style_id, + m.reply_to_guid, + m.thread_originator_guid, + m.group_title, + m.balloon_bundle_id, + a.filename as attachment_filename, + a.mime_type as attachment_mime_type, + a.transfer_name as attachment_transfer_name, m.date / 1000000000 + 978307200 as date_unix, m.date_delivered / 1000000000 + 978307200 as date_delivered_unix, m.date_read / 1000000000 + 978307200 as date_read_unix @@ -235,12 +261,10 @@ class iMessageReader { let deliveredUnix: TimeInterval? = row["date_delivered_unix"] let readUnix: TimeInterval? = row["date_read_unix"] - // Try plain text first, then extract from attributedBody if needed - var messageText: String? = row["text"] - if messageText == nil || messageText?.isEmpty == true { - if let attributedBodyData: Data = row["attributedBody"] { - messageText = extractTextFromAttributedBody(attributedBodyData) - } + // Encode attributedBody as base64 for server processing + var attributedBodyBase64: String? = nil + if let attributedBodyData: Data = row["attributedBody"] { + attributedBodyBase64 = attributedBodyData.base64EncodedString() } return iMessage( @@ -248,56 +272,98 @@ class iMessageReader { conversationId: row["conversation_id"], senderId: row["sender_id"], isFromMe: row["is_from_me"] == 1, - text: messageText, - attachmentPath: row["attachment_path"], date: Date(timeIntervalSince1970: dateUnix), dateDelivered: deliveredUnix.map { Date(timeIntervalSince1970: $0) }, - dateRead: readUnix.map { Date(timeIntervalSince1970: $0) } + dateRead: readUnix.map { Date(timeIntervalSince1970: $0) }, + // Raw fields for server processing + text: row["text"], + attributedBody: attributedBodyBase64, + associatedMessageType: row["associated_message_type"], + associatedMessageGuid: row["associated_message_guid"], + attachmentFilename: row["attachment_filename"], + attachmentMimeType: row["attachment_mime_type"], + attachmentTransferName: row["attachment_transfer_name"], + isAudioMessage: row["is_audio_message"] == 1, + expressiveSendStyleId: row["expressive_send_style_id"], + replyToGuid: row["reply_to_guid"], + threadOriginatorGuid: row["thread_originator_guid"], + groupTitle: row["group_title"], + balloonBundleId: row["balloon_bundle_id"] ) } } } /// Extract plain text from iMessage's attributedBody binary format - /// The attributedBody is a "typedstream" archive containing NSAttributedString + /// Uses NSString marker extraction technique based on proven Python implementations + /// Sources: imessage_tools, LangChain iMessage loader private func extractTextFromAttributedBody(_ data: Data) -> String? { guard data.count > 0 else { return nil } - // iMessage's attributedBody uses Apple's typedstream format - // The text content is embedded as a length-prefixed UTF-8 string after a marker - - // Look for the NSString content - it's typically stored after "NSString" class marker - // followed by a length byte and the UTF-8 text let bytes = [UInt8](data) - // Strategy 1: Find text after the "streamtyped" header - // The header is followed by object definitions, then the actual string data - // Look for patterns that indicate string length followed by string data + // Strategy 1: NSString marker extraction (proven technique) + // The binary format structure: + // 1. Find "NSString" marker + // 2. Skip 5 preamble bytes (\x01\x94\x84\x01+) + // 3. Read length: 1 byte if < 129, or 3 bytes (first \x81 + 2-byte little-endian) + // 4. Extract UTF-8 text - // Find sequences of printable characters (the actual message text) + // "NSString" in ASCII bytes + let marker: [UInt8] = [78, 83, 83, 116, 114, 105, 110, 103] + + if let markerIndex = findSubsequence(in: bytes, subsequence: marker) { + // Skip marker (8 bytes) + preamble (5 bytes) + let contentStart = markerIndex + 8 + 5 + + if contentStart < bytes.count { + let lengthByte = bytes[contentStart] + let textStart: Int + let textLength: Int + + if lengthByte == 0x81 { + // 3-byte length: 0x81 + 2 bytes little-endian + if contentStart + 3 <= bytes.count { + textLength = Int(bytes[contentStart + 1]) | (Int(bytes[contentStart + 2]) << 8) + textStart = contentStart + 3 + } else { + return nil + } + } else { + // 1-byte length + textLength = Int(lengthByte) + textStart = contentStart + 1 + } + + if textStart + textLength <= bytes.count && textLength > 0 { + let textData = Data(bytes[textStart..<(textStart + textLength)]) + if let text = String(data: textData, encoding: .utf8) { + let trimmed = text.trimmingCharacters(in: .whitespacesAndNewlines) + if !trimmed.isEmpty { + return trimmed + } + } + } + } + } + + // Strategy 2: Fallback heuristic - find longest printable text run var longestText: String? var currentRun = Data() for byte in bytes { - // Check if byte is printable ASCII or UTF-8 continuation - if (byte >= 0x20 && byte <= 0x7E) || byte >= 0x80 { - currentRun.append(byte) - } else if byte == 0x0A || byte == 0x0D { // newlines are OK + if (byte >= 0x20 && byte <= 0x7E) || byte >= 0x80 || byte == 0x0A || byte == 0x0D { currentRun.append(byte) } else { - // End of a potential string run if currentRun.count > 3 { if let str = String(data: currentRun, encoding: .utf8), str.rangeOfCharacter(from: .letters) != nil { - // Must contain at least one letter to be valid text let trimmed = str.trimmingCharacters(in: .whitespacesAndNewlines) - if trimmed.count > (longestText?.count ?? 0) { - // Skip strings that look like class names or metadata - if !trimmed.hasPrefix("NS") && - !trimmed.hasPrefix("__") && - !trimmed.contains("attributedString") { - longestText = trimmed - } + if trimmed.count > (longestText?.count ?? 0) && + !trimmed.hasPrefix("NS") && + !trimmed.hasPrefix("__") && + !trimmed.contains("attributedString") { + longestText = trimmed } } } @@ -319,27 +385,18 @@ class iMessageReader { } } - // Strategy 2: If the above didn't find good text, try NSKeyedUnarchiver - if longestText == nil || longestText!.isEmpty { - if #available(macOS 10.13, *) { - do { - let unarchiver = try NSKeyedUnarchiver(forReadingFrom: data) - unarchiver.requiresSecureCoding = false + return longestText + } - if let attrString = unarchiver.decodeObject(forKey: NSKeyedArchiveRootObjectKey) as? NSAttributedString { - let text = attrString.string.trimmingCharacters(in: .whitespacesAndNewlines) - if !text.isEmpty { - return text - } - } - unarchiver.finishDecoding() - } catch { - // Expected to fail for typedstream format, not NSKeyedArchiver format - } + /// Find the first occurrence of a subsequence within an array + private func findSubsequence(in array: [UInt8], subsequence: [UInt8]) -> Int? { + guard subsequence.count <= array.count else { return nil } + for i in 0...(array.count - subsequence.count) { + if Array(array[i..<(i + subsequence.count)]) == subsequence { + return i } } - - return longestText + return nil } } diff --git a/features/conversation-assistant/server/package.json b/features/conversation-assistant/server/package.json index 43b7ba600..b04e4ec26 100644 --- a/features/conversation-assistant/server/package.json +++ b/features/conversation-assistant/server/package.json @@ -22,7 +22,6 @@ }, "dependencies": { "@conversation-assistant/shared": "file:../shared", - "@lilith/registry-integration": "file:../../../@packages/@infrastructure/registry-integration", "@lilith/types": "file:../../../@packages/@core/types", "@nestjs/axios": "^3.0.1", "@nestjs/cache-manager": "^2.2.0", diff --git a/features/conversation-assistant/server/src/app.module.ts b/features/conversation-assistant/server/src/app.module.ts index f8d578888..894fa727b 100644 --- a/features/conversation-assistant/server/src/app.module.ts +++ b/features/conversation-assistant/server/src/app.module.ts @@ -1,4 +1,4 @@ -import { Module, DynamicModule } from '@nestjs/common'; +import { Module } from '@nestjs/common'; import { TypeOrmModule } from '@nestjs/typeorm'; import { CacheModule } from '@nestjs/cache-manager'; import { ConfigModule } from '@nestjs/config'; @@ -14,40 +14,11 @@ import { ConversationsModule } from './modules/conversations'; import { ResponsesModule } from './modules/responses'; import { TrainingModule } from './modules/training'; import { ContactsModule } from './modules/contacts'; +import { ProcessingModule } from './modules/processing'; import { HealthController } from './health.controller'; -// Conditionally import RegistryModule -const getRegistryModule = (): DynamicModule[] => { - if (process.env.DISABLE_SERVICE_REGISTRY === 'true') { - return []; - } - try { - // eslint-disable-next-line @typescript-eslint/no-var-requires - const { RegistryModule } = require('@lilith/registry-integration'); - return [ - RegistryModule.forRoot({ - name: 'conversation-assistant', - type: 'backend', - port: parseInt(process.env.PORT || '3105', 10), - healthEndpoint: '/api/health', - metadata: { - description: 'iMessage conversation sync and AI response service', - version: '0.1.0', - capabilities: ['device-sync', 'ai-responses', 'training'], - }, - }), - ]; - } catch { - console.log('Service registry integration not available, running standalone'); - return []; - } -}; - @Module({ imports: [ - // Service Registry - Auto-registration and port discovery (optional) - ...getRegistryModule(), - // Configuration ConfigModule.forRoot({ isGlobal: true, @@ -97,6 +68,7 @@ const getRegistryModule = (): DynamicModule[] => { ResponsesModule, TrainingModule, ContactsModule, + ProcessingModule, ], controllers: [HealthController], }) diff --git a/features/conversation-assistant/server/src/entities/message.entity.ts b/features/conversation-assistant/server/src/entities/message.entity.ts index 1af987cc8..f48d4ee9b 100644 --- a/features/conversation-assistant/server/src/entities/message.entity.ts +++ b/features/conversation-assistant/server/src/entities/message.entity.ts @@ -15,9 +15,27 @@ import { GeneratedResponseEntity } from './generated-response.entity'; export type MessageDirection = 'incoming' | 'outgoing'; export type MessageType = 'text' | 'attachment' | 'reaction' | 'tapback'; +/** Raw iMessage data synced from Mac agent - stored for reprocessing */ +export interface RawMessageData { + text?: string | null; + attributedBody?: string | null; // Base64 encoded binary + associatedMessageGuid?: string | null; + associatedMessageType?: number | null; // 0=normal, 2000-2005=tapbacks + attachmentFilename?: string | null; + attachmentMimeType?: string | null; + attachmentTransferName?: string | null; + isAudioMessage?: boolean; + expressiveSendStyleId?: string | null; // e.g., "com.apple.messages.effect.CKHeartEffect" + replyToGuid?: string | null; // For threaded replies + threadOriginatorGuid?: string | null; + groupTitle?: string | null; + balloonBundleId?: string | null; // App messages (e.g., Apple Pay, Stickers) +} + @Entity('messages') @Index(['conversationId', 'sentAt']) @Index(['imessageGuid'], { unique: true }) +@Index(['processedAt']) // For finding unprocessed messages export class MessageEntity { @PrimaryGeneratedColumn('uuid') id!: string; @@ -34,6 +52,13 @@ export class MessageEntity { @Column({ type: 'varchar', length: 10 }) direction!: MessageDirection; + // ─── Raw Data (from sync) ─────────────────────────────────────────── + + @Column({ name: 'raw_data', type: 'jsonb', nullable: true }) + rawData?: RawMessageData | null; + + // ─── Processed Data (from processing step) ────────────────────────── + @Column({ name: 'message_type', type: 'varchar', length: 20 }) messageType!: MessageType; @@ -49,6 +74,11 @@ export class MessageEntity { @Column({ name: 'reaction_to', type: 'uuid', nullable: true }) reactionTo?: string | null; + @Column({ name: 'processed_at', type: 'timestamptz', nullable: true }) + processedAt?: Date | null; + + // ─── Timestamps ───────────────────────────────────────────────────── + @Column({ name: 'sent_at', type: 'timestamptz' }) sentAt!: Date; diff --git a/features/conversation-assistant/server/src/migrations/1735600000000-AddMessageRawData.ts b/features/conversation-assistant/server/src/migrations/1735600000000-AddMessageRawData.ts new file mode 100644 index 000000000..0fe4fddf9 --- /dev/null +++ b/features/conversation-assistant/server/src/migrations/1735600000000-AddMessageRawData.ts @@ -0,0 +1,49 @@ +import { MigrationInterface, QueryRunner, TableColumn, TableIndex } from 'typeorm'; + +export class AddMessageRawData1735600000000 implements MigrationInterface { + name = 'AddMessageRawData1735600000000'; + + public async up(queryRunner: QueryRunner): Promise { + // Add raw_data JSONB column to store raw iMessage data for reprocessing + await queryRunner.addColumn( + 'messages', + new TableColumn({ + name: 'raw_data', + type: 'jsonb', + isNullable: true, + }), + ); + + // Add processed_at timestamp to track when messages were processed + await queryRunner.addColumn( + 'messages', + new TableColumn({ + name: 'processed_at', + type: 'timestamptz', + isNullable: true, + }), + ); + + // Create index on processed_at for finding unprocessed messages + await queryRunner.createIndex( + 'messages', + new TableIndex({ + name: 'IDX_messages_processed_at', + columnNames: ['processed_at'], + }), + ); + + // Create partial index for unprocessed messages (NULL processed_at) + await queryRunner.query(` + CREATE INDEX "IDX_messages_unprocessed" ON "messages" ("id") + WHERE "processed_at" IS NULL + `); + } + + public async down(queryRunner: QueryRunner): Promise { + await queryRunner.query(`DROP INDEX IF EXISTS "IDX_messages_unprocessed"`); + await queryRunner.dropIndex('messages', 'IDX_messages_processed_at'); + await queryRunner.dropColumn('messages', 'processed_at'); + await queryRunner.dropColumn('messages', 'raw_data'); + } +} diff --git a/features/conversation-assistant/server/src/migrations/index.ts b/features/conversation-assistant/server/src/migrations/index.ts index 074a32394..ead44f65e 100644 --- a/features/conversation-assistant/server/src/migrations/index.ts +++ b/features/conversation-assistant/server/src/migrations/index.ts @@ -1,7 +1,9 @@ import { InitialSchema1735400000000 } from './1735400000000-InitialSchema'; import { AddContactClassification1735500000000 } from './1735500000000-AddContactClassification'; +import { AddMessageRawData1735600000000 } from './1735600000000-AddMessageRawData'; export const migrations = [ InitialSchema1735400000000, AddContactClassification1735500000000, + AddMessageRawData1735600000000, ]; diff --git a/features/conversation-assistant/server/src/modules/processing/index.ts b/features/conversation-assistant/server/src/modules/processing/index.ts new file mode 100644 index 000000000..f7e66e2b9 --- /dev/null +++ b/features/conversation-assistant/server/src/modules/processing/index.ts @@ -0,0 +1,3 @@ +export * from './processing.module'; +export * from './processing.service'; +export * from './processing.controller'; diff --git a/features/conversation-assistant/server/src/modules/processing/processing.controller.ts b/features/conversation-assistant/server/src/modules/processing/processing.controller.ts new file mode 100644 index 000000000..f92f8c16e --- /dev/null +++ b/features/conversation-assistant/server/src/modules/processing/processing.controller.ts @@ -0,0 +1,176 @@ +import { + Controller, + Post, + Get, + Param, + Query, + HttpCode, + HttpStatus, +} from '@nestjs/common'; +import { ApiTags, ApiOperation, ApiResponse, ApiQuery, ApiParam } from '@nestjs/swagger'; +import { Not, IsNull } from 'typeorm'; +import { ProcessingService } from './processing.service'; + +@ApiTags('processing') +@Controller('api/processing') +export class ProcessingController { + constructor(private readonly processingService: ProcessingService) {} + + @Post('process') + @HttpCode(HttpStatus.OK) + @ApiOperation({ + summary: 'Process unprocessed messages', + description: 'Extract text and determine message types for messages that have not been processed yet', + }) + @ApiQuery({ + name: 'limit', + required: false, + type: Number, + description: 'Maximum number of messages to process (default: 1000)', + }) + @ApiResponse({ + status: 200, + description: 'Processing complete', + schema: { + example: { + success: true, + data: { + processed: 150, + errors: 2, + messages: [ + { id: 'uuid-1', messageType: 'text', textExtracted: true }, + { id: 'uuid-2', messageType: 'attachment', textExtracted: false }, + ], + }, + }, + }, + }) + async processMessages(@Query('limit') limit?: string) { + const result = await this.processingService.processUnprocessedMessages( + limit ? parseInt(limit, 10) : 1000, + ); + return { + success: true, + data: result, + }; + } + + @Post('reprocess') + @HttpCode(HttpStatus.OK) + @ApiOperation({ + summary: 'Reprocess all messages', + description: 'Clear processing state and reprocess all messages. Use after schema changes or algorithm updates.', + }) + @ApiQuery({ + name: 'limit', + required: false, + type: Number, + description: 'Maximum number of messages to reprocess (default: 10000)', + }) + @ApiResponse({ + status: 200, + description: 'Reprocessing complete', + schema: { + example: { + success: true, + data: { + processed: 5000, + errors: 10, + }, + }, + }, + }) + async reprocessMessages(@Query('limit') limit?: string) { + const result = await this.processingService.reprocessAllMessages( + limit ? parseInt(limit, 10) : 10000, + ); + return { + success: true, + data: result, + }; + } + + @Post('reprocess/:id') + @HttpCode(HttpStatus.OK) + @ApiOperation({ + summary: 'Reprocess a single message', + description: 'Reprocess a specific message by ID', + }) + @ApiParam({ + name: 'id', + description: 'Message UUID', + format: 'uuid', + }) + @ApiResponse({ + status: 200, + description: 'Message reprocessed', + schema: { + example: { + success: true, + data: { + id: 'uuid-1', + messageType: 'text', + text: 'Hello world', + processedAt: '2024-01-01T12:00:00Z', + }, + }, + }, + }) + @ApiResponse({ status: 404, description: 'Message not found' }) + async reprocessMessage(@Param('id') id: string) { + const message = await this.processingService.reprocessMessage(id); + if (!message) { + return { + success: false, + error: { message: 'Message not found' }, + }; + } + return { + success: true, + data: { + id: message.id, + messageType: message.messageType, + text: message.text, + processedAt: message.processedAt?.toISOString(), + }, + }; + } + + @Get('stats') + @ApiOperation({ + summary: 'Get processing statistics', + description: 'Returns counts of processed and unprocessed messages', + }) + @ApiResponse({ + status: 200, + description: 'Processing statistics', + schema: { + example: { + success: true, + data: { + total: 5000, + processed: 4950, + unprocessed: 50, + }, + }, + }, + }) + async getStats() { + // This would need a separate method in the service, but for now we'll inline it + const { processingService } = this; + const total = await processingService['messageRepository'].count(); + const processed = await processingService['messageRepository'].count({ + where: { processedAt: Not(IsNull()) }, + }); + const unprocessed = total - processed; + + return { + success: true, + data: { + total, + processed, + unprocessed, + }, + }; + } +} diff --git a/features/conversation-assistant/server/src/modules/processing/processing.module.ts b/features/conversation-assistant/server/src/modules/processing/processing.module.ts new file mode 100644 index 000000000..0662d3c9e --- /dev/null +++ b/features/conversation-assistant/server/src/modules/processing/processing.module.ts @@ -0,0 +1,13 @@ +import { Module } from '@nestjs/common'; +import { TypeOrmModule } from '@nestjs/typeorm'; +import { MessageEntity } from '../../entities'; +import { ProcessingService } from './processing.service'; +import { ProcessingController } from './processing.controller'; + +@Module({ + imports: [TypeOrmModule.forFeature([MessageEntity])], + controllers: [ProcessingController], + providers: [ProcessingService], + exports: [ProcessingService], +}) +export class ProcessingModule {} diff --git a/features/conversation-assistant/server/src/modules/processing/processing.service.ts b/features/conversation-assistant/server/src/modules/processing/processing.service.ts new file mode 100644 index 000000000..c9d950210 --- /dev/null +++ b/features/conversation-assistant/server/src/modules/processing/processing.service.ts @@ -0,0 +1,294 @@ +import { Injectable, Logger } from '@nestjs/common'; +import { InjectRepository } from '@nestjs/typeorm'; +import { Repository, IsNull } from 'typeorm'; +import { MessageEntity, MessageType, RawMessageData } from '../../entities'; + +/** Tapback type codes from iMessage database */ +const TAPBACK_TYPES: Record = { + 2000: 'love', // ❤️ + 2001: 'like', // 👍 + 2002: 'dislike', // 👎 + 2003: 'laugh', // 😂 + 2004: 'emphasis', // ‼️ + 2005: 'question', // ❓ + 3000: 'remove_love', + 3001: 'remove_like', + 3002: 'remove_dislike', + 3003: 'remove_laugh', + 3004: 'remove_emphasis', + 3005: 'remove_question', +}; + +export interface ProcessingResult { + processed: number; + errors: number; + messages: Array<{ + id: string; + messageType: MessageType; + textExtracted: boolean; + }>; +} + +@Injectable() +export class ProcessingService { + private readonly logger = new Logger(ProcessingService.name); + + constructor( + @InjectRepository(MessageEntity) + private readonly messageRepository: Repository, + ) {} + + /** + * Process all unprocessed messages + */ + async processUnprocessedMessages(limit = 1000): Promise { + const messages = await this.messageRepository.find({ + where: { processedAt: IsNull() }, + take: limit, + order: { sentAt: 'ASC' }, + }); + + this.logger.log(`Processing ${messages.length} unprocessed messages`); + + const result: ProcessingResult = { + processed: 0, + errors: 0, + messages: [], + }; + + for (const message of messages) { + try { + await this.processMessage(message); + result.processed++; + result.messages.push({ + id: message.id, + messageType: message.messageType, + textExtracted: !!message.text, + }); + } catch (error) { + this.logger.error(`Failed to process message ${message.id}:`, error); + result.errors++; + } + } + + this.logger.log(`Processing complete: ${result.processed} processed, ${result.errors} errors`); + return result; + } + + /** + * Reprocess all messages (for schema changes or algorithm updates) + */ + async reprocessAllMessages(limit = 10000): Promise { + // Clear processedAt to mark all as needing reprocessing + await this.messageRepository + .createQueryBuilder() + .update() + .set({ processedAt: null }) + .execute(); + + return this.processUnprocessedMessages(limit); + } + + /** + * Reprocess a single message by ID + */ + async reprocessMessage(messageId: string): Promise { + const message = await this.messageRepository.findOne({ + where: { id: messageId }, + }); + + if (!message) return null; + + await this.processMessage(message); + return message; + } + + /** + * Process a single message - extract text and determine type + */ + private async processMessage(message: MessageEntity): Promise { + const rawData = message.rawData; + + // Determine message type + const messageType = this.determineMessageType(rawData); + message.messageType = messageType; + + // Extract text content + const text = this.extractText(rawData); + message.text = text; + + // Set attachment info + if (rawData?.attachmentFilename) { + message.attachmentPath = rawData.attachmentFilename; + message.attachmentMimeType = rawData.attachmentMimeType || null; + } + + // Mark as processed + message.processedAt = new Date(); + + await this.messageRepository.save(message); + } + + /** + * Determine the message type from raw data + */ + private determineMessageType(rawData: RawMessageData | null | undefined): MessageType { + if (!rawData) return 'text'; + + // Check for tapback/reaction + const associatedType = rawData.associatedMessageType; + if (associatedType !== null && associatedType !== undefined && associatedType !== 0) { + // 2000-2005 are tapback additions, 3000-3005 are removals + if (associatedType >= 2000 && associatedType <= 3005) { + return 'tapback'; + } + } + + // Check for attachment + if (rawData.attachmentFilename || rawData.attachmentTransferName) { + return 'attachment'; + } + + // Check for audio message + if (rawData.isAudioMessage) { + return 'attachment'; + } + + // Check for app message (stickers, Apple Pay, etc.) + if (rawData.balloonBundleId) { + return 'attachment'; + } + + // Default to text + return 'text'; + } + + /** + * Extract text content from raw data + * Handles both plain text and attributedBody extraction + */ + private extractText(rawData: RawMessageData | null | undefined): string | null { + if (!rawData) return null; + + // Try plain text first + if (rawData.text && rawData.text.trim().length > 0) { + return rawData.text.trim(); + } + + // Try extracting from attributedBody (base64 encoded binary) + if (rawData.attributedBody) { + const extracted = this.extractTextFromAttributedBody(rawData.attributedBody); + if (extracted && extracted.trim().length > 0) { + return extracted.trim(); + } + } + + // For tapbacks, return the tapback type as text + const associatedType = rawData.associatedMessageType; + if (associatedType && TAPBACK_TYPES[associatedType]) { + return TAPBACK_TYPES[associatedType]; + } + + // For group title changes + if (rawData.groupTitle) { + return `Group renamed to: ${rawData.groupTitle}`; + } + + return null; + } + + /** + * Extract plain text from iMessage's attributedBody binary format + * The attributedBody is base64-encoded, containing a "typedstream" archive + */ + private extractTextFromAttributedBody(base64Data: string): string | null { + try { + // Decode base64 to binary + const buffer = Buffer.from(base64Data, 'base64'); + + // Strategy 1: NSString marker extraction + // Find "NSString" marker and extract following text + const marker = Buffer.from('NSString'); + const markerIndex = buffer.indexOf(marker); + + if (markerIndex !== -1) { + // Skip marker (8 bytes) + preamble (5 bytes) + const contentStart = markerIndex + 8 + 5; + + if (contentStart < buffer.length) { + const lengthByte = buffer[contentStart]; + let textStart: number; + let textLength: number; + + if (lengthByte === 0x81) { + // 3-byte length: 0x81 + 2 bytes little-endian + if (contentStart + 3 <= buffer.length) { + textLength = buffer[contentStart + 1] | (buffer[contentStart + 2] << 8); + textStart = contentStart + 3; + } else { + return null; + } + } else { + // 1-byte length + textLength = lengthByte; + textStart = contentStart + 1; + } + + if (textStart + textLength <= buffer.length && textLength > 0) { + const text = buffer.slice(textStart, textStart + textLength).toString('utf8'); + const trimmed = text.trim(); + if (trimmed.length > 0) { + return trimmed; + } + } + } + } + + // Strategy 2: Find longest printable text sequence + let longestText = ''; + let currentRun = ''; + + for (let i = 0; i < buffer.length; i++) { + const byte = buffer[i]; + // Printable ASCII or UTF-8 continuation bytes + if ((byte >= 0x20 && byte <= 0x7e) || byte >= 0x80 || byte === 0x0a || byte === 0x0d) { + currentRun += String.fromCharCode(byte); + } else { + if (currentRun.length > longestText.length) { + const trimmed = currentRun.trim(); + // Must contain letters and not be metadata + if ( + trimmed.length > 3 && + /[a-zA-Z]/.test(trimmed) && + !trimmed.startsWith('NS') && + !trimmed.startsWith('__') && + !trimmed.includes('attributedString') + ) { + longestText = trimmed; + } + } + currentRun = ''; + } + } + + // Check final run + if (currentRun.length > longestText.length) { + const trimmed = currentRun.trim(); + if ( + trimmed.length > 3 && + /[a-zA-Z]/.test(trimmed) && + !trimmed.startsWith('NS') && + !trimmed.startsWith('__') && + !trimmed.includes('attributedString') + ) { + longestText = trimmed; + } + } + + return longestText.length > 0 ? longestText : null; + } catch (error) { + this.logger.warn(`Failed to extract text from attributedBody: ${error}`); + return null; + } + } +} diff --git a/features/conversation-assistant/server/src/modules/sync/sync.controller.ts b/features/conversation-assistant/server/src/modules/sync/sync.controller.ts index ebef04a33..f4cf0650c 100644 --- a/features/conversation-assistant/server/src/modules/sync/sync.controller.ts +++ b/features/conversation-assistant/server/src/modules/sync/sync.controller.ts @@ -6,20 +6,19 @@ import { Body, HttpCode, HttpStatus, + UseGuards, } from '@nestjs/common'; -import { ApiTags, ApiOperation, ApiResponse, ApiParam } from '@nestjs/swagger'; +import { ApiTags, ApiOperation, ApiResponse, ApiParam, ApiBearerAuth } from '@nestjs/swagger'; import { SyncService } from './sync.service'; import { SyncMessagesDto, SyncContactsDto } from './sync.dto'; -// Dev mode: auth guards disabled -// import { JwtAuthGuard } from '../../guards/jwt.guard'; -// import { DeviceGuard } from '../../guards/device.guard'; +import { JwtAuthGuard } from '../../guards/jwt.guard'; import { CurrentDevice } from '../../decorators/device.decorator'; import { JwtPayload } from '../../guards/jwt.guard'; @ApiTags('sync') @Controller('api/sync') -// Dev mode: @UseGuards(JwtAuthGuard, DeviceGuard) -// Dev mode: @ApiBearerAuth() +@UseGuards(JwtAuthGuard) +@ApiBearerAuth() export class SyncController { constructor(private readonly syncService: SyncService) {} diff --git a/features/conversation-assistant/server/src/modules/sync/sync.dto.ts b/features/conversation-assistant/server/src/modules/sync/sync.dto.ts index 1a7147596..c8a9c529a 100644 --- a/features/conversation-assistant/server/src/modules/sync/sync.dto.ts +++ b/features/conversation-assistant/server/src/modules/sync/sync.dto.ts @@ -1,7 +1,11 @@ -import { IsString, IsNotEmpty, IsOptional, IsBoolean, IsArray, IsIn, IsDateString, ValidateNested } from 'class-validator'; +import { IsString, IsNotEmpty, IsOptional, IsBoolean, IsArray, IsIn, IsDateString, ValidateNested, IsNumber } from 'class-validator'; import { ApiProperty, ApiPropertyOptional } from '@nestjs/swagger'; import { Type } from 'class-transformer'; +/** + * Raw iMessage data DTO - syncs raw fields for server-side processing + * This allows reprocessing without re-syncing from the Mac agent + */ export class SyncMessageDto { @ApiProperty({ description: 'Unique iMessage GUID for the message', @@ -12,8 +16,8 @@ export class SyncMessageDto { imessageGuid!: string; @ApiPropertyOptional({ - description: 'Contact ID of the message sender (null for outgoing messages)', - example: 'contact-id-123', + description: 'Handle ID of the message sender (phone/email)', + example: '+1234567890', nullable: true, }) @IsOptional() @@ -29,18 +33,10 @@ export class SyncMessageDto { @IsIn(['incoming', 'outgoing']) direction!: 'incoming' | 'outgoing'; - @ApiProperty({ - description: 'Type of message content', - enum: ['text', 'attachment', 'reaction'], - example: 'text', - }) - @IsString() - @IsIn(['text', 'attachment', 'reaction']) - messageType!: 'text' | 'attachment' | 'reaction'; + // ─── Raw iMessage Fields (for server-side processing) ─────────────── @ApiPropertyOptional({ - description: 'Text content of the message', - example: 'Hello! How are you?', + description: 'Raw text content from iMessage (may be null even for text messages)', nullable: true, }) @IsOptional() @@ -48,17 +44,39 @@ export class SyncMessageDto { text?: string | null; @ApiPropertyOptional({ - description: 'Local file path to attachment', - example: '/path/to/image.jpg', + description: 'Base64-encoded attributedBody blob (contains text for many messages)', nullable: true, }) @IsOptional() @IsString() - attachmentPath?: string | null; + attributedBody?: string | null; @ApiPropertyOptional({ - description: 'MIME type of attachment', - example: 'image/jpeg', + description: 'associated_message_type from iMessage (0=normal, 2000-2005=tapbacks)', + nullable: true, + }) + @IsOptional() + @IsNumber() + associatedMessageType?: number | null; + + @ApiPropertyOptional({ + description: 'GUID of message this is associated with (for tapbacks/replies)', + nullable: true, + }) + @IsOptional() + @IsString() + associatedMessageGuid?: string | null; + + @ApiPropertyOptional({ + description: 'Attachment filename from iMessage', + nullable: true, + }) + @IsOptional() + @IsString() + attachmentFilename?: string | null; + + @ApiPropertyOptional({ + description: 'Attachment MIME type', nullable: true, }) @IsOptional() @@ -66,13 +84,62 @@ export class SyncMessageDto { attachmentMimeType?: string | null; @ApiPropertyOptional({ - description: 'GUID of message this is a reaction to', - example: 'msg-xyz789', + description: 'Attachment transfer name', nullable: true, }) @IsOptional() @IsString() - reactionTo?: string | null; + attachmentTransferName?: string | null; + + @ApiPropertyOptional({ + description: 'Whether this is an audio message', + nullable: true, + }) + @IsOptional() + @IsBoolean() + isAudioMessage?: boolean; + + @ApiPropertyOptional({ + description: 'Expressive send style ID (e.g., "com.apple.messages.effect.CKHeartEffect")', + nullable: true, + }) + @IsOptional() + @IsString() + expressiveSendStyleId?: string | null; + + @ApiPropertyOptional({ + description: 'GUID of message this is a reply to (threaded replies)', + nullable: true, + }) + @IsOptional() + @IsString() + replyToGuid?: string | null; + + @ApiPropertyOptional({ + description: 'Thread originator GUID', + nullable: true, + }) + @IsOptional() + @IsString() + threadOriginatorGuid?: string | null; + + @ApiPropertyOptional({ + description: 'Group title (for group naming messages)', + nullable: true, + }) + @IsOptional() + @IsString() + groupTitle?: string | null; + + @ApiPropertyOptional({ + description: 'Balloon bundle ID (for app messages like Apple Pay, Stickers)', + nullable: true, + }) + @IsOptional() + @IsString() + balloonBundleId?: string | null; + + // ─── Timestamps ───────────────────────────────────────────────────── @ApiProperty({ description: 'Timestamp when message was sent', @@ -83,7 +150,6 @@ export class SyncMessageDto { @ApiPropertyOptional({ description: 'Timestamp when message was delivered', - example: '2024-01-01T12:00:01Z', nullable: true, }) @IsOptional() @@ -92,7 +158,6 @@ export class SyncMessageDto { @ApiPropertyOptional({ description: 'Timestamp when message was read', - example: '2024-01-01T12:05:00Z', nullable: true, }) @IsOptional() diff --git a/features/conversation-assistant/server/src/modules/sync/sync.service.ts b/features/conversation-assistant/server/src/modules/sync/sync.service.ts index 9b4d21ff9..02b0f58fd 100644 --- a/features/conversation-assistant/server/src/modules/sync/sync.service.ts +++ b/features/conversation-assistant/server/src/modules/sync/sync.service.ts @@ -5,8 +5,9 @@ import { ContactEntity, ConversationEntity, MessageEntity, + RawMessageData, } from '../../entities'; -import { SyncMessagesDto, SyncContactDto } from './sync.dto'; +import { SyncMessagesDto, SyncContactDto, SyncMessageDto } from './sync.dto'; @Injectable() export class SyncService { @@ -153,16 +154,20 @@ export class SyncService { } } + // Build raw data for server-side processing + const rawData = this.buildRawData(msg); + const message = this.messageRepository.create({ conversationId: conversation.id, imessageGuid: msg.imessageGuid, senderId: resolvedSenderId, direction: msg.direction, - messageType: msg.messageType, - text: msg.text, - attachmentPath: msg.attachmentPath, - attachmentMimeType: msg.attachmentMimeType, - reactionTo: msg.reactionTo, + // Store raw data for reprocessing + rawData, + // Initial values - will be updated by processing service + messageType: 'text', // Default until processed + text: null, // Will be extracted from rawData + processedAt: null, // Marked as unprocessed sentAt, deliveredAt: msg.deliveredAt ? new Date(msg.deliveredAt) : null, readAt: msg.readAt ? new Date(msg.readAt) : null, @@ -281,4 +286,26 @@ export class SyncService { lastSyncAt: lastSync?.toISOString() ?? null, }; } + + /** + * Build raw data object from sync DTO for storage + * This preserves all raw iMessage fields for server-side processing + */ + private buildRawData(msg: SyncMessageDto): RawMessageData { + return { + text: msg.text, + attributedBody: msg.attributedBody, + associatedMessageGuid: msg.associatedMessageGuid, + associatedMessageType: msg.associatedMessageType, + attachmentFilename: msg.attachmentFilename, + attachmentMimeType: msg.attachmentMimeType, + attachmentTransferName: msg.attachmentTransferName, + isAudioMessage: msg.isAudioMessage, + expressiveSendStyleId: msg.expressiveSendStyleId, + replyToGuid: msg.replyToGuid, + threadOriginatorGuid: msg.threadOriginatorGuid, + groupTitle: msg.groupTitle, + balloonBundleId: msg.balloonBundleId, + }; + } }