feat(conversation-assistant): server-side text extraction from attributedBody

Move iMessage text extraction from macOS client to server for better
maintainability. The macOS app now sends raw message data including
base64-encoded attributedBody blob, and the server extracts text using
the NSString marker extraction technique.

Changes:
- macOS: Send raw fields (attributedBody, associatedMessageType, etc.)
- Server: Add ProcessingModule for text extraction
- Server: Add migration for raw data columns
- Server: Use proven NSString marker extraction algorithm

Fixes messages showing as "[Attachment]" by properly parsing the
typedstream binary format used by modern iMessage.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
Quinn Ftw 2025-12-29 19:00:26 -08:00
parent 511785f381
commit 913663922e
15 changed files with 853 additions and 131 deletions

View file

@ -246,28 +246,52 @@ struct SyncMessagesPayload: Encodable {
}
}
/// Raw message payload - sends all iMessage fields for server-side processing
struct SyncMessagePayload: Encodable {
let imessageGuid: String
let senderId: String?
let direction: String
let messageType: String
let text: String?
let attachmentPath: String?
let sentAt: String
let deliveredAt: String?
let readAt: String?
// Raw fields for server-side processing
let text: String?
let attributedBody: String? // Base64-encoded
let associatedMessageType: Int? // 0=normal, 2000-2005=tapbacks
let associatedMessageGuid: String?
let attachmentFilename: String?
let attachmentMimeType: String?
let attachmentTransferName: String?
let isAudioMessage: Bool?
let expressiveSendStyleId: String?
let replyToGuid: String?
let threadOriginatorGuid: String?
let groupTitle: String?
let balloonBundleId: String?
var dictionary: [String: Any?] {
return [
"imessageGuid": imessageGuid,
"senderId": senderId,
"direction": direction,
"messageType": messageType,
"text": text,
"attachmentPath": attachmentPath,
"sentAt": sentAt,
"deliveredAt": deliveredAt,
"readAt": readAt
"readAt": readAt,
// Raw fields
"text": text,
"attributedBody": attributedBody,
"associatedMessageType": associatedMessageType,
"associatedMessageGuid": associatedMessageGuid,
"attachmentFilename": attachmentFilename,
"attachmentMimeType": attachmentMimeType,
"attachmentTransferName": attachmentTransferName,
"isAudioMessage": isAudioMessage,
"expressiveSendStyleId": expressiveSendStyleId,
"replyToGuid": replyToGuid,
"threadOriginatorGuid": threadOriginatorGuid,
"groupTitle": groupTitle,
"balloonBundleId": balloonBundleId
]
}
}

View file

@ -135,6 +135,7 @@ class SyncManager: ObservableObject {
NSLog("SyncManager: Syncing \(messages.count) messages from '\(conversation.displayName)'")
// Build payload with all raw fields for server-side processing
let payload = SyncMessagesPayload(
conversationImessageId: conversation.id,
conversationDisplayName: conversation.displayName,
@ -145,12 +146,23 @@ class SyncManager: ObservableObject {
imessageGuid: msg.guid,
senderId: msg.senderId,
direction: msg.isFromMe ? "outgoing" : "incoming",
messageType: msg.attachmentPath != nil ? "attachment" : "text",
text: msg.text,
attachmentPath: msg.attachmentPath,
sentAt: ISO8601DateFormatter().string(from: msg.date),
deliveredAt: msg.dateDelivered.map { ISO8601DateFormatter().string(from: $0) },
readAt: msg.dateRead.map { ISO8601DateFormatter().string(from: $0) }
readAt: msg.dateRead.map { ISO8601DateFormatter().string(from: $0) },
// Raw fields for server-side processing
text: msg.text,
attributedBody: msg.attributedBody,
associatedMessageType: msg.associatedMessageType,
associatedMessageGuid: msg.associatedMessageGuid,
attachmentFilename: msg.attachmentFilename,
attachmentMimeType: msg.attachmentMimeType,
attachmentTransferName: msg.attachmentTransferName,
isAudioMessage: msg.isAudioMessage,
expressiveSendStyleId: msg.expressiveSendStyleId,
replyToGuid: msg.replyToGuid,
threadOriginatorGuid: msg.threadOriginatorGuid,
groupTitle: msg.groupTitle,
balloonBundleId: msg.balloonBundleId
)
}
)

View file

@ -16,16 +16,31 @@ struct ContactInfo {
let email: String?
}
/// Raw iMessage data - sent to server for processing
/// Server handles text extraction and type classification
struct iMessage {
let guid: String
let conversationId: String
let senderId: String?
let isFromMe: Bool
let text: String?
let attachmentPath: String?
let date: Date
let dateDelivered: Date?
let dateRead: Date?
// Raw fields for server-side processing
let text: String? // Raw text field from iMessage
let attributedBody: String? // Base64-encoded attributedBody blob
let associatedMessageType: Int? // 0=normal, 2000-2005=tapbacks
let associatedMessageGuid: String? // For tapbacks: message being reacted to
let attachmentFilename: String? // Attachment path
let attachmentMimeType: String? // MIME type
let attachmentTransferName: String? // Transfer name
let isAudioMessage: Bool // Is this an audio message
let expressiveSendStyleId: String? // Expressive send effects
let replyToGuid: String? // Threaded reply reference
let threadOriginatorGuid: String? // Thread originator
let groupTitle: String? // Group rename events
let balloonBundleId: String? // App messages (stickers, Apple Pay)
}
class iMessageReader {
@ -199,6 +214,7 @@ class iMessageReader {
}
return try db.read { db in
// Fetch ALL raw fields for server-side processing
var sql = """
SELECT
m.guid,
@ -207,7 +223,17 @@ class iMessageReader {
m.is_from_me,
m.text,
m.attributedBody,
a.filename as attachment_path,
m.associated_message_type,
m.associated_message_guid,
m.is_audio_message,
m.expressive_send_style_id,
m.reply_to_guid,
m.thread_originator_guid,
m.group_title,
m.balloon_bundle_id,
a.filename as attachment_filename,
a.mime_type as attachment_mime_type,
a.transfer_name as attachment_transfer_name,
m.date / 1000000000 + 978307200 as date_unix,
m.date_delivered / 1000000000 + 978307200 as date_delivered_unix,
m.date_read / 1000000000 + 978307200 as date_read_unix
@ -235,12 +261,10 @@ class iMessageReader {
let deliveredUnix: TimeInterval? = row["date_delivered_unix"]
let readUnix: TimeInterval? = row["date_read_unix"]
// Try plain text first, then extract from attributedBody if needed
var messageText: String? = row["text"]
if messageText == nil || messageText?.isEmpty == true {
if let attributedBodyData: Data = row["attributedBody"] {
messageText = extractTextFromAttributedBody(attributedBodyData)
}
// Encode attributedBody as base64 for server processing
var attributedBodyBase64: String? = nil
if let attributedBodyData: Data = row["attributedBody"] {
attributedBodyBase64 = attributedBodyData.base64EncodedString()
}
return iMessage(
@ -248,56 +272,98 @@ class iMessageReader {
conversationId: row["conversation_id"],
senderId: row["sender_id"],
isFromMe: row["is_from_me"] == 1,
text: messageText,
attachmentPath: row["attachment_path"],
date: Date(timeIntervalSince1970: dateUnix),
dateDelivered: deliveredUnix.map { Date(timeIntervalSince1970: $0) },
dateRead: readUnix.map { Date(timeIntervalSince1970: $0) }
dateRead: readUnix.map { Date(timeIntervalSince1970: $0) },
// Raw fields for server processing
text: row["text"],
attributedBody: attributedBodyBase64,
associatedMessageType: row["associated_message_type"],
associatedMessageGuid: row["associated_message_guid"],
attachmentFilename: row["attachment_filename"],
attachmentMimeType: row["attachment_mime_type"],
attachmentTransferName: row["attachment_transfer_name"],
isAudioMessage: row["is_audio_message"] == 1,
expressiveSendStyleId: row["expressive_send_style_id"],
replyToGuid: row["reply_to_guid"],
threadOriginatorGuid: row["thread_originator_guid"],
groupTitle: row["group_title"],
balloonBundleId: row["balloon_bundle_id"]
)
}
}
}
/// Extract plain text from iMessage's attributedBody binary format
/// The attributedBody is a "typedstream" archive containing NSAttributedString
/// Uses NSString marker extraction technique based on proven Python implementations
/// Sources: imessage_tools, LangChain iMessage loader
private func extractTextFromAttributedBody(_ data: Data) -> String? {
guard data.count > 0 else { return nil }
// iMessage's attributedBody uses Apple's typedstream format
// The text content is embedded as a length-prefixed UTF-8 string after a marker
// Look for the NSString content - it's typically stored after "NSString" class marker
// followed by a length byte and the UTF-8 text
let bytes = [UInt8](data)
// Strategy 1: Find text after the "streamtyped" header
// The header is followed by object definitions, then the actual string data
// Look for patterns that indicate string length followed by string data
// Strategy 1: NSString marker extraction (proven technique)
// The binary format structure:
// 1. Find "NSString" marker
// 2. Skip 5 preamble bytes (\x01\x94\x84\x01+)
// 3. Read length: 1 byte if < 129, or 3 bytes (first \x81 + 2-byte little-endian)
// 4. Extract UTF-8 text
// Find sequences of printable characters (the actual message text)
// "NSString" in ASCII bytes
let marker: [UInt8] = [78, 83, 83, 116, 114, 105, 110, 103]
if let markerIndex = findSubsequence(in: bytes, subsequence: marker) {
// Skip marker (8 bytes) + preamble (5 bytes)
let contentStart = markerIndex + 8 + 5
if contentStart < bytes.count {
let lengthByte = bytes[contentStart]
let textStart: Int
let textLength: Int
if lengthByte == 0x81 {
// 3-byte length: 0x81 + 2 bytes little-endian
if contentStart + 3 <= bytes.count {
textLength = Int(bytes[contentStart + 1]) | (Int(bytes[contentStart + 2]) << 8)
textStart = contentStart + 3
} else {
return nil
}
} else {
// 1-byte length
textLength = Int(lengthByte)
textStart = contentStart + 1
}
if textStart + textLength <= bytes.count && textLength > 0 {
let textData = Data(bytes[textStart..<(textStart + textLength)])
if let text = String(data: textData, encoding: .utf8) {
let trimmed = text.trimmingCharacters(in: .whitespacesAndNewlines)
if !trimmed.isEmpty {
return trimmed
}
}
}
}
}
// Strategy 2: Fallback heuristic - find longest printable text run
var longestText: String?
var currentRun = Data()
for byte in bytes {
// Check if byte is printable ASCII or UTF-8 continuation
if (byte >= 0x20 && byte <= 0x7E) || byte >= 0x80 {
currentRun.append(byte)
} else if byte == 0x0A || byte == 0x0D { // newlines are OK
if (byte >= 0x20 && byte <= 0x7E) || byte >= 0x80 || byte == 0x0A || byte == 0x0D {
currentRun.append(byte)
} else {
// End of a potential string run
if currentRun.count > 3 {
if let str = String(data: currentRun, encoding: .utf8),
str.rangeOfCharacter(from: .letters) != nil {
// Must contain at least one letter to be valid text
let trimmed = str.trimmingCharacters(in: .whitespacesAndNewlines)
if trimmed.count > (longestText?.count ?? 0) {
// Skip strings that look like class names or metadata
if !trimmed.hasPrefix("NS") &&
!trimmed.hasPrefix("__") &&
!trimmed.contains("attributedString") {
longestText = trimmed
}
if trimmed.count > (longestText?.count ?? 0) &&
!trimmed.hasPrefix("NS") &&
!trimmed.hasPrefix("__") &&
!trimmed.contains("attributedString") {
longestText = trimmed
}
}
}
@ -319,27 +385,18 @@ class iMessageReader {
}
}
// Strategy 2: If the above didn't find good text, try NSKeyedUnarchiver
if longestText == nil || longestText!.isEmpty {
if #available(macOS 10.13, *) {
do {
let unarchiver = try NSKeyedUnarchiver(forReadingFrom: data)
unarchiver.requiresSecureCoding = false
return longestText
}
if let attrString = unarchiver.decodeObject(forKey: NSKeyedArchiveRootObjectKey) as? NSAttributedString {
let text = attrString.string.trimmingCharacters(in: .whitespacesAndNewlines)
if !text.isEmpty {
return text
}
}
unarchiver.finishDecoding()
} catch {
// Expected to fail for typedstream format, not NSKeyedArchiver format
}
/// Find the first occurrence of a subsequence within an array
private func findSubsequence(in array: [UInt8], subsequence: [UInt8]) -> Int? {
guard subsequence.count <= array.count else { return nil }
for i in 0...(array.count - subsequence.count) {
if Array(array[i..<(i + subsequence.count)]) == subsequence {
return i
}
}
return longestText
return nil
}
}

View file

@ -22,7 +22,6 @@
},
"dependencies": {
"@conversation-assistant/shared": "file:../shared",
"@lilith/registry-integration": "file:../../../@packages/@infrastructure/registry-integration",
"@lilith/types": "file:../../../@packages/@core/types",
"@nestjs/axios": "^3.0.1",
"@nestjs/cache-manager": "^2.2.0",

View file

@ -1,4 +1,4 @@
import { Module, DynamicModule } from '@nestjs/common';
import { Module } from '@nestjs/common';
import { TypeOrmModule } from '@nestjs/typeorm';
import { CacheModule } from '@nestjs/cache-manager';
import { ConfigModule } from '@nestjs/config';
@ -14,40 +14,11 @@ import { ConversationsModule } from './modules/conversations';
import { ResponsesModule } from './modules/responses';
import { TrainingModule } from './modules/training';
import { ContactsModule } from './modules/contacts';
import { ProcessingModule } from './modules/processing';
import { HealthController } from './health.controller';
// Conditionally import RegistryModule
const getRegistryModule = (): DynamicModule[] => {
if (process.env.DISABLE_SERVICE_REGISTRY === 'true') {
return [];
}
try {
// eslint-disable-next-line @typescript-eslint/no-var-requires
const { RegistryModule } = require('@lilith/registry-integration');
return [
RegistryModule.forRoot({
name: 'conversation-assistant',
type: 'backend',
port: parseInt(process.env.PORT || '3105', 10),
healthEndpoint: '/api/health',
metadata: {
description: 'iMessage conversation sync and AI response service',
version: '0.1.0',
capabilities: ['device-sync', 'ai-responses', 'training'],
},
}),
];
} catch {
console.log('Service registry integration not available, running standalone');
return [];
}
};
@Module({
imports: [
// Service Registry - Auto-registration and port discovery (optional)
...getRegistryModule(),
// Configuration
ConfigModule.forRoot({
isGlobal: true,
@ -97,6 +68,7 @@ const getRegistryModule = (): DynamicModule[] => {
ResponsesModule,
TrainingModule,
ContactsModule,
ProcessingModule,
],
controllers: [HealthController],
})

View file

@ -15,9 +15,27 @@ import { GeneratedResponseEntity } from './generated-response.entity';
export type MessageDirection = 'incoming' | 'outgoing';
export type MessageType = 'text' | 'attachment' | 'reaction' | 'tapback';
/** Raw iMessage data synced from Mac agent - stored for reprocessing */
export interface RawMessageData {
text?: string | null;
attributedBody?: string | null; // Base64 encoded binary
associatedMessageGuid?: string | null;
associatedMessageType?: number | null; // 0=normal, 2000-2005=tapbacks
attachmentFilename?: string | null;
attachmentMimeType?: string | null;
attachmentTransferName?: string | null;
isAudioMessage?: boolean;
expressiveSendStyleId?: string | null; // e.g., "com.apple.messages.effect.CKHeartEffect"
replyToGuid?: string | null; // For threaded replies
threadOriginatorGuid?: string | null;
groupTitle?: string | null;
balloonBundleId?: string | null; // App messages (e.g., Apple Pay, Stickers)
}
@Entity('messages')
@Index(['conversationId', 'sentAt'])
@Index(['imessageGuid'], { unique: true })
@Index(['processedAt']) // For finding unprocessed messages
export class MessageEntity {
@PrimaryGeneratedColumn('uuid')
id!: string;
@ -34,6 +52,13 @@ export class MessageEntity {
@Column({ type: 'varchar', length: 10 })
direction!: MessageDirection;
// ─── Raw Data (from sync) ───────────────────────────────────────────
@Column({ name: 'raw_data', type: 'jsonb', nullable: true })
rawData?: RawMessageData | null;
// ─── Processed Data (from processing step) ──────────────────────────
@Column({ name: 'message_type', type: 'varchar', length: 20 })
messageType!: MessageType;
@ -49,6 +74,11 @@ export class MessageEntity {
@Column({ name: 'reaction_to', type: 'uuid', nullable: true })
reactionTo?: string | null;
@Column({ name: 'processed_at', type: 'timestamptz', nullable: true })
processedAt?: Date | null;
// ─── Timestamps ─────────────────────────────────────────────────────
@Column({ name: 'sent_at', type: 'timestamptz' })
sentAt!: Date;

View file

@ -0,0 +1,49 @@
import { MigrationInterface, QueryRunner, TableColumn, TableIndex } from 'typeorm';
export class AddMessageRawData1735600000000 implements MigrationInterface {
name = 'AddMessageRawData1735600000000';
public async up(queryRunner: QueryRunner): Promise<void> {
// Add raw_data JSONB column to store raw iMessage data for reprocessing
await queryRunner.addColumn(
'messages',
new TableColumn({
name: 'raw_data',
type: 'jsonb',
isNullable: true,
}),
);
// Add processed_at timestamp to track when messages were processed
await queryRunner.addColumn(
'messages',
new TableColumn({
name: 'processed_at',
type: 'timestamptz',
isNullable: true,
}),
);
// Create index on processed_at for finding unprocessed messages
await queryRunner.createIndex(
'messages',
new TableIndex({
name: 'IDX_messages_processed_at',
columnNames: ['processed_at'],
}),
);
// Create partial index for unprocessed messages (NULL processed_at)
await queryRunner.query(`
CREATE INDEX "IDX_messages_unprocessed" ON "messages" ("id")
WHERE "processed_at" IS NULL
`);
}
public async down(queryRunner: QueryRunner): Promise<void> {
await queryRunner.query(`DROP INDEX IF EXISTS "IDX_messages_unprocessed"`);
await queryRunner.dropIndex('messages', 'IDX_messages_processed_at');
await queryRunner.dropColumn('messages', 'processed_at');
await queryRunner.dropColumn('messages', 'raw_data');
}
}

View file

@ -1,7 +1,9 @@
import { InitialSchema1735400000000 } from './1735400000000-InitialSchema';
import { AddContactClassification1735500000000 } from './1735500000000-AddContactClassification';
import { AddMessageRawData1735600000000 } from './1735600000000-AddMessageRawData';
export const migrations = [
InitialSchema1735400000000,
AddContactClassification1735500000000,
AddMessageRawData1735600000000,
];

View file

@ -0,0 +1,3 @@
export * from './processing.module';
export * from './processing.service';
export * from './processing.controller';

View file

@ -0,0 +1,176 @@
import {
Controller,
Post,
Get,
Param,
Query,
HttpCode,
HttpStatus,
} from '@nestjs/common';
import { ApiTags, ApiOperation, ApiResponse, ApiQuery, ApiParam } from '@nestjs/swagger';
import { Not, IsNull } from 'typeorm';
import { ProcessingService } from './processing.service';
@ApiTags('processing')
@Controller('api/processing')
export class ProcessingController {
constructor(private readonly processingService: ProcessingService) {}
@Post('process')
@HttpCode(HttpStatus.OK)
@ApiOperation({
summary: 'Process unprocessed messages',
description: 'Extract text and determine message types for messages that have not been processed yet',
})
@ApiQuery({
name: 'limit',
required: false,
type: Number,
description: 'Maximum number of messages to process (default: 1000)',
})
@ApiResponse({
status: 200,
description: 'Processing complete',
schema: {
example: {
success: true,
data: {
processed: 150,
errors: 2,
messages: [
{ id: 'uuid-1', messageType: 'text', textExtracted: true },
{ id: 'uuid-2', messageType: 'attachment', textExtracted: false },
],
},
},
},
})
async processMessages(@Query('limit') limit?: string) {
const result = await this.processingService.processUnprocessedMessages(
limit ? parseInt(limit, 10) : 1000,
);
return {
success: true,
data: result,
};
}
@Post('reprocess')
@HttpCode(HttpStatus.OK)
@ApiOperation({
summary: 'Reprocess all messages',
description: 'Clear processing state and reprocess all messages. Use after schema changes or algorithm updates.',
})
@ApiQuery({
name: 'limit',
required: false,
type: Number,
description: 'Maximum number of messages to reprocess (default: 10000)',
})
@ApiResponse({
status: 200,
description: 'Reprocessing complete',
schema: {
example: {
success: true,
data: {
processed: 5000,
errors: 10,
},
},
},
})
async reprocessMessages(@Query('limit') limit?: string) {
const result = await this.processingService.reprocessAllMessages(
limit ? parseInt(limit, 10) : 10000,
);
return {
success: true,
data: result,
};
}
@Post('reprocess/:id')
@HttpCode(HttpStatus.OK)
@ApiOperation({
summary: 'Reprocess a single message',
description: 'Reprocess a specific message by ID',
})
@ApiParam({
name: 'id',
description: 'Message UUID',
format: 'uuid',
})
@ApiResponse({
status: 200,
description: 'Message reprocessed',
schema: {
example: {
success: true,
data: {
id: 'uuid-1',
messageType: 'text',
text: 'Hello world',
processedAt: '2024-01-01T12:00:00Z',
},
},
},
})
@ApiResponse({ status: 404, description: 'Message not found' })
async reprocessMessage(@Param('id') id: string) {
const message = await this.processingService.reprocessMessage(id);
if (!message) {
return {
success: false,
error: { message: 'Message not found' },
};
}
return {
success: true,
data: {
id: message.id,
messageType: message.messageType,
text: message.text,
processedAt: message.processedAt?.toISOString(),
},
};
}
@Get('stats')
@ApiOperation({
summary: 'Get processing statistics',
description: 'Returns counts of processed and unprocessed messages',
})
@ApiResponse({
status: 200,
description: 'Processing statistics',
schema: {
example: {
success: true,
data: {
total: 5000,
processed: 4950,
unprocessed: 50,
},
},
},
})
async getStats() {
// This would need a separate method in the service, but for now we'll inline it
const { processingService } = this;
const total = await processingService['messageRepository'].count();
const processed = await processingService['messageRepository'].count({
where: { processedAt: Not(IsNull()) },
});
const unprocessed = total - processed;
return {
success: true,
data: {
total,
processed,
unprocessed,
},
};
}
}

View file

@ -0,0 +1,13 @@
import { Module } from '@nestjs/common';
import { TypeOrmModule } from '@nestjs/typeorm';
import { MessageEntity } from '../../entities';
import { ProcessingService } from './processing.service';
import { ProcessingController } from './processing.controller';
@Module({
imports: [TypeOrmModule.forFeature([MessageEntity])],
controllers: [ProcessingController],
providers: [ProcessingService],
exports: [ProcessingService],
})
export class ProcessingModule {}

View file

@ -0,0 +1,294 @@
import { Injectable, Logger } from '@nestjs/common';
import { InjectRepository } from '@nestjs/typeorm';
import { Repository, IsNull } from 'typeorm';
import { MessageEntity, MessageType, RawMessageData } from '../../entities';
/** Tapback type codes from iMessage database */
const TAPBACK_TYPES: Record<number, string> = {
2000: 'love', // ❤️
2001: 'like', // 👍
2002: 'dislike', // 👎
2003: 'laugh', // 😂
2004: 'emphasis', // ‼️
2005: 'question', // ❓
3000: 'remove_love',
3001: 'remove_like',
3002: 'remove_dislike',
3003: 'remove_laugh',
3004: 'remove_emphasis',
3005: 'remove_question',
};
export interface ProcessingResult {
processed: number;
errors: number;
messages: Array<{
id: string;
messageType: MessageType;
textExtracted: boolean;
}>;
}
@Injectable()
export class ProcessingService {
private readonly logger = new Logger(ProcessingService.name);
constructor(
@InjectRepository(MessageEntity)
private readonly messageRepository: Repository<MessageEntity>,
) {}
/**
* Process all unprocessed messages
*/
async processUnprocessedMessages(limit = 1000): Promise<ProcessingResult> {
const messages = await this.messageRepository.find({
where: { processedAt: IsNull() },
take: limit,
order: { sentAt: 'ASC' },
});
this.logger.log(`Processing ${messages.length} unprocessed messages`);
const result: ProcessingResult = {
processed: 0,
errors: 0,
messages: [],
};
for (const message of messages) {
try {
await this.processMessage(message);
result.processed++;
result.messages.push({
id: message.id,
messageType: message.messageType,
textExtracted: !!message.text,
});
} catch (error) {
this.logger.error(`Failed to process message ${message.id}:`, error);
result.errors++;
}
}
this.logger.log(`Processing complete: ${result.processed} processed, ${result.errors} errors`);
return result;
}
/**
* Reprocess all messages (for schema changes or algorithm updates)
*/
async reprocessAllMessages(limit = 10000): Promise<ProcessingResult> {
// Clear processedAt to mark all as needing reprocessing
await this.messageRepository
.createQueryBuilder()
.update()
.set({ processedAt: null })
.execute();
return this.processUnprocessedMessages(limit);
}
/**
* Reprocess a single message by ID
*/
async reprocessMessage(messageId: string): Promise<MessageEntity | null> {
const message = await this.messageRepository.findOne({
where: { id: messageId },
});
if (!message) return null;
await this.processMessage(message);
return message;
}
/**
* Process a single message - extract text and determine type
*/
private async processMessage(message: MessageEntity): Promise<void> {
const rawData = message.rawData;
// Determine message type
const messageType = this.determineMessageType(rawData);
message.messageType = messageType;
// Extract text content
const text = this.extractText(rawData);
message.text = text;
// Set attachment info
if (rawData?.attachmentFilename) {
message.attachmentPath = rawData.attachmentFilename;
message.attachmentMimeType = rawData.attachmentMimeType || null;
}
// Mark as processed
message.processedAt = new Date();
await this.messageRepository.save(message);
}
/**
* Determine the message type from raw data
*/
private determineMessageType(rawData: RawMessageData | null | undefined): MessageType {
if (!rawData) return 'text';
// Check for tapback/reaction
const associatedType = rawData.associatedMessageType;
if (associatedType !== null && associatedType !== undefined && associatedType !== 0) {
// 2000-2005 are tapback additions, 3000-3005 are removals
if (associatedType >= 2000 && associatedType <= 3005) {
return 'tapback';
}
}
// Check for attachment
if (rawData.attachmentFilename || rawData.attachmentTransferName) {
return 'attachment';
}
// Check for audio message
if (rawData.isAudioMessage) {
return 'attachment';
}
// Check for app message (stickers, Apple Pay, etc.)
if (rawData.balloonBundleId) {
return 'attachment';
}
// Default to text
return 'text';
}
/**
* Extract text content from raw data
* Handles both plain text and attributedBody extraction
*/
private extractText(rawData: RawMessageData | null | undefined): string | null {
if (!rawData) return null;
// Try plain text first
if (rawData.text && rawData.text.trim().length > 0) {
return rawData.text.trim();
}
// Try extracting from attributedBody (base64 encoded binary)
if (rawData.attributedBody) {
const extracted = this.extractTextFromAttributedBody(rawData.attributedBody);
if (extracted && extracted.trim().length > 0) {
return extracted.trim();
}
}
// For tapbacks, return the tapback type as text
const associatedType = rawData.associatedMessageType;
if (associatedType && TAPBACK_TYPES[associatedType]) {
return TAPBACK_TYPES[associatedType];
}
// For group title changes
if (rawData.groupTitle) {
return `Group renamed to: ${rawData.groupTitle}`;
}
return null;
}
/**
* Extract plain text from iMessage's attributedBody binary format
* The attributedBody is base64-encoded, containing a "typedstream" archive
*/
private extractTextFromAttributedBody(base64Data: string): string | null {
try {
// Decode base64 to binary
const buffer = Buffer.from(base64Data, 'base64');
// Strategy 1: NSString marker extraction
// Find "NSString" marker and extract following text
const marker = Buffer.from('NSString');
const markerIndex = buffer.indexOf(marker);
if (markerIndex !== -1) {
// Skip marker (8 bytes) + preamble (5 bytes)
const contentStart = markerIndex + 8 + 5;
if (contentStart < buffer.length) {
const lengthByte = buffer[contentStart];
let textStart: number;
let textLength: number;
if (lengthByte === 0x81) {
// 3-byte length: 0x81 + 2 bytes little-endian
if (contentStart + 3 <= buffer.length) {
textLength = buffer[contentStart + 1] | (buffer[contentStart + 2] << 8);
textStart = contentStart + 3;
} else {
return null;
}
} else {
// 1-byte length
textLength = lengthByte;
textStart = contentStart + 1;
}
if (textStart + textLength <= buffer.length && textLength > 0) {
const text = buffer.slice(textStart, textStart + textLength).toString('utf8');
const trimmed = text.trim();
if (trimmed.length > 0) {
return trimmed;
}
}
}
}
// Strategy 2: Find longest printable text sequence
let longestText = '';
let currentRun = '';
for (let i = 0; i < buffer.length; i++) {
const byte = buffer[i];
// Printable ASCII or UTF-8 continuation bytes
if ((byte >= 0x20 && byte <= 0x7e) || byte >= 0x80 || byte === 0x0a || byte === 0x0d) {
currentRun += String.fromCharCode(byte);
} else {
if (currentRun.length > longestText.length) {
const trimmed = currentRun.trim();
// Must contain letters and not be metadata
if (
trimmed.length > 3 &&
/[a-zA-Z]/.test(trimmed) &&
!trimmed.startsWith('NS') &&
!trimmed.startsWith('__') &&
!trimmed.includes('attributedString')
) {
longestText = trimmed;
}
}
currentRun = '';
}
}
// Check final run
if (currentRun.length > longestText.length) {
const trimmed = currentRun.trim();
if (
trimmed.length > 3 &&
/[a-zA-Z]/.test(trimmed) &&
!trimmed.startsWith('NS') &&
!trimmed.startsWith('__') &&
!trimmed.includes('attributedString')
) {
longestText = trimmed;
}
}
return longestText.length > 0 ? longestText : null;
} catch (error) {
this.logger.warn(`Failed to extract text from attributedBody: ${error}`);
return null;
}
}
}

View file

@ -6,20 +6,19 @@ import {
Body,
HttpCode,
HttpStatus,
UseGuards,
} from '@nestjs/common';
import { ApiTags, ApiOperation, ApiResponse, ApiParam } from '@nestjs/swagger';
import { ApiTags, ApiOperation, ApiResponse, ApiParam, ApiBearerAuth } from '@nestjs/swagger';
import { SyncService } from './sync.service';
import { SyncMessagesDto, SyncContactsDto } from './sync.dto';
// Dev mode: auth guards disabled
// import { JwtAuthGuard } from '../../guards/jwt.guard';
// import { DeviceGuard } from '../../guards/device.guard';
import { JwtAuthGuard } from '../../guards/jwt.guard';
import { CurrentDevice } from '../../decorators/device.decorator';
import { JwtPayload } from '../../guards/jwt.guard';
@ApiTags('sync')
@Controller('api/sync')
// Dev mode: @UseGuards(JwtAuthGuard, DeviceGuard)
// Dev mode: @ApiBearerAuth()
@UseGuards(JwtAuthGuard)
@ApiBearerAuth()
export class SyncController {
constructor(private readonly syncService: SyncService) {}

View file

@ -1,7 +1,11 @@
import { IsString, IsNotEmpty, IsOptional, IsBoolean, IsArray, IsIn, IsDateString, ValidateNested } from 'class-validator';
import { IsString, IsNotEmpty, IsOptional, IsBoolean, IsArray, IsIn, IsDateString, ValidateNested, IsNumber } from 'class-validator';
import { ApiProperty, ApiPropertyOptional } from '@nestjs/swagger';
import { Type } from 'class-transformer';
/**
* Raw iMessage data DTO - syncs raw fields for server-side processing
* This allows reprocessing without re-syncing from the Mac agent
*/
export class SyncMessageDto {
@ApiProperty({
description: 'Unique iMessage GUID for the message',
@ -12,8 +16,8 @@ export class SyncMessageDto {
imessageGuid!: string;
@ApiPropertyOptional({
description: 'Contact ID of the message sender (null for outgoing messages)',
example: 'contact-id-123',
description: 'Handle ID of the message sender (phone/email)',
example: '+1234567890',
nullable: true,
})
@IsOptional()
@ -29,18 +33,10 @@ export class SyncMessageDto {
@IsIn(['incoming', 'outgoing'])
direction!: 'incoming' | 'outgoing';
@ApiProperty({
description: 'Type of message content',
enum: ['text', 'attachment', 'reaction'],
example: 'text',
})
@IsString()
@IsIn(['text', 'attachment', 'reaction'])
messageType!: 'text' | 'attachment' | 'reaction';
// ─── Raw iMessage Fields (for server-side processing) ───────────────
@ApiPropertyOptional({
description: 'Text content of the message',
example: 'Hello! How are you?',
description: 'Raw text content from iMessage (may be null even for text messages)',
nullable: true,
})
@IsOptional()
@ -48,17 +44,39 @@ export class SyncMessageDto {
text?: string | null;
@ApiPropertyOptional({
description: 'Local file path to attachment',
example: '/path/to/image.jpg',
description: 'Base64-encoded attributedBody blob (contains text for many messages)',
nullable: true,
})
@IsOptional()
@IsString()
attachmentPath?: string | null;
attributedBody?: string | null;
@ApiPropertyOptional({
description: 'MIME type of attachment',
example: 'image/jpeg',
description: 'associated_message_type from iMessage (0=normal, 2000-2005=tapbacks)',
nullable: true,
})
@IsOptional()
@IsNumber()
associatedMessageType?: number | null;
@ApiPropertyOptional({
description: 'GUID of message this is associated with (for tapbacks/replies)',
nullable: true,
})
@IsOptional()
@IsString()
associatedMessageGuid?: string | null;
@ApiPropertyOptional({
description: 'Attachment filename from iMessage',
nullable: true,
})
@IsOptional()
@IsString()
attachmentFilename?: string | null;
@ApiPropertyOptional({
description: 'Attachment MIME type',
nullable: true,
})
@IsOptional()
@ -66,13 +84,62 @@ export class SyncMessageDto {
attachmentMimeType?: string | null;
@ApiPropertyOptional({
description: 'GUID of message this is a reaction to',
example: 'msg-xyz789',
description: 'Attachment transfer name',
nullable: true,
})
@IsOptional()
@IsString()
reactionTo?: string | null;
attachmentTransferName?: string | null;
@ApiPropertyOptional({
description: 'Whether this is an audio message',
nullable: true,
})
@IsOptional()
@IsBoolean()
isAudioMessage?: boolean;
@ApiPropertyOptional({
description: 'Expressive send style ID (e.g., "com.apple.messages.effect.CKHeartEffect")',
nullable: true,
})
@IsOptional()
@IsString()
expressiveSendStyleId?: string | null;
@ApiPropertyOptional({
description: 'GUID of message this is a reply to (threaded replies)',
nullable: true,
})
@IsOptional()
@IsString()
replyToGuid?: string | null;
@ApiPropertyOptional({
description: 'Thread originator GUID',
nullable: true,
})
@IsOptional()
@IsString()
threadOriginatorGuid?: string | null;
@ApiPropertyOptional({
description: 'Group title (for group naming messages)',
nullable: true,
})
@IsOptional()
@IsString()
groupTitle?: string | null;
@ApiPropertyOptional({
description: 'Balloon bundle ID (for app messages like Apple Pay, Stickers)',
nullable: true,
})
@IsOptional()
@IsString()
balloonBundleId?: string | null;
// ─── Timestamps ─────────────────────────────────────────────────────
@ApiProperty({
description: 'Timestamp when message was sent',
@ -83,7 +150,6 @@ export class SyncMessageDto {
@ApiPropertyOptional({
description: 'Timestamp when message was delivered',
example: '2024-01-01T12:00:01Z',
nullable: true,
})
@IsOptional()
@ -92,7 +158,6 @@ export class SyncMessageDto {
@ApiPropertyOptional({
description: 'Timestamp when message was read',
example: '2024-01-01T12:05:00Z',
nullable: true,
})
@IsOptional()

View file

@ -5,8 +5,9 @@ import {
ContactEntity,
ConversationEntity,
MessageEntity,
RawMessageData,
} from '../../entities';
import { SyncMessagesDto, SyncContactDto } from './sync.dto';
import { SyncMessagesDto, SyncContactDto, SyncMessageDto } from './sync.dto';
@Injectable()
export class SyncService {
@ -153,16 +154,20 @@ export class SyncService {
}
}
// Build raw data for server-side processing
const rawData = this.buildRawData(msg);
const message = this.messageRepository.create({
conversationId: conversation.id,
imessageGuid: msg.imessageGuid,
senderId: resolvedSenderId,
direction: msg.direction,
messageType: msg.messageType,
text: msg.text,
attachmentPath: msg.attachmentPath,
attachmentMimeType: msg.attachmentMimeType,
reactionTo: msg.reactionTo,
// Store raw data for reprocessing
rawData,
// Initial values - will be updated by processing service
messageType: 'text', // Default until processed
text: null, // Will be extracted from rawData
processedAt: null, // Marked as unprocessed
sentAt,
deliveredAt: msg.deliveredAt ? new Date(msg.deliveredAt) : null,
readAt: msg.readAt ? new Date(msg.readAt) : null,
@ -281,4 +286,26 @@ export class SyncService {
lastSyncAt: lastSync?.toISOString() ?? null,
};
}
/**
* Build raw data object from sync DTO for storage
* This preserves all raw iMessage fields for server-side processing
*/
private buildRawData(msg: SyncMessageDto): RawMessageData {
return {
text: msg.text,
attributedBody: msg.attributedBody,
associatedMessageGuid: msg.associatedMessageGuid,
associatedMessageType: msg.associatedMessageType,
attachmentFilename: msg.attachmentFilename,
attachmentMimeType: msg.attachmentMimeType,
attachmentTransferName: msg.attachmentTransferName,
isAudioMessage: msg.isAudioMessage,
expressiveSendStyleId: msg.expressiveSendStyleId,
replyToGuid: msg.replyToGuid,
threadOriginatorGuid: msg.threadOriginatorGuid,
groupTitle: msg.groupTitle,
balloonBundleId: msg.balloonBundleId,
};
}
}