import AppKit import Combine import Foundation import LilithAgentCore import LilithLogging import MacSyncShared private let log = AppLogger.logger(for: "IMessage.Sync") struct SyncStats: Equatable, Sendable { var messageCount: Int = 0 var conversationCount: Int = 0 var contactCount: Int = 0 } struct ChunkSyncResult { let synced: Int } struct ContactSyncInfo { var loadedFromAddressBook: Int = 0 var syncedToServer: Int = 0 var sampleNames: [String] = [] var hasContactsPermission: Bool = false } enum SyncError: Equatable, Sendable { case none case fullDiskAccessRequired case databaseNotFound case connectionFailed(String) var message: String { switch self { case .none: return "" case .fullDiskAccessRequired: return "Full Disk Access required" case .databaseNotFound: return "iMessage database not found" case .connectionFailed(let msg): return msg } } } @MainActor final class SyncManager: BaseSyncManager { static let shared = SyncManager() static let syncSchemaVersion = 3 // Module-specific @Published state (base owns isSyncing/lastSyncCompletedAt/ // currentOperation/syncError/stats). @Published var contactSyncInfo = ContactSyncInfo() @Published var isResetting = false @Published var isResyncInProgress = false private let imessageReader = iMessageReader.shared private let apiClient = APIClient.shared private let sendService = SendService.shared private let activityLog = ActivityLog.shared private let blobSyncManager: BlobSyncManager /// First-cycle bootstrap (DB connect, schema-version reset, contacts load) is /// done lazily inside `performSync` since the base's `startSync` is final. private var didBootstrap = false /// True until the first inbound cycle has chosen a code path (initial batch /// vs incremental). Reset to true after a `resetAndResync`. private var isInitialLoad = true /// Separate timer for blob uploads. Runs every 5 minutes, independent of /// the inbound read cycle and the outbound send queue. private var blobSyncTimer: Timer? /// Outbound send-queue runner. Polls the server's legacy /// `icloud.send_queue` table via `IMessageSendTransport`, delegates the /// actual AppleScript send + rate-limit + delivery-tracking to /// `SendService.shared`, and acks the result. Owns its own 30s timer, /// independent of the read cycle, so an outbound queue with backlog /// drains promptly even when no inbound sync runs. private lazy var sendQueueClient: SendQueueClient = { let transport = IMessageSendTransport(apiClient: apiClient) let service = sendService let activity = activityLog return SendQueueClient(label: "imessage", transport: transport, interval: 30) { message in let result = service.send(recipient: message.toHandle, body: message.body) let suffix = String(message.toHandle.prefix(4)) if result.success { activity.success("Sent message to \(suffix)...") return .sent } else { let reason = result.error ?? "Unknown error" activity.error("Failed to send to \(suffix)...: \(reason)") return .failed(reason: reason) } } }() private init() { self.blobSyncManager = BlobSyncManager(apiClient: APIClient.shared, activityLog: ActivityLog.shared) super.init( initialStats: SyncStats(), noError: .none, persistenceKey: "imessage", timerInterval: 30 ) // Migrate legacy UserDefaults watermark keys (one-time). iMessage used // unprefixed `lastSync` / `lastSyncCompletedAt`. if lastSync == nil, let legacy = UserDefaults.standard.object(forKey: "lastSync") as? Date { setLastSync(legacy) UserDefaults.standard.removeObject(forKey: "lastSync") } if lastSyncCompletedAt == nil, let legacy = UserDefaults.standard.object(forKey: "lastSyncCompletedAt") as? Date { lastSyncCompletedAt = legacy UserDefaults.standard.set(legacy, forKey: "imessage.lastSyncCompletedAt") UserDefaults.standard.removeObject(forKey: "lastSyncCompletedAt") } } func openFullDiskAccessSettings() { if let url = URL(string: "x-apple.systempreferences:com.apple.preference.security?Privacy_AllFiles") { NSWorkspace.shared.open(url) } } func retryConnection() { startSync() } // MARK: - Lifecycle hooks override func didStartSync() { sendQueueClient.start() // Blob sync on its own 5-minute cadence, independent of read cycle. blobSyncTimer = Timer.scheduledTimer(withTimeInterval: 300, repeats: true) { [weak self] _ in guard let self else { return } Task { await self.blobSyncManager.syncBlobs() } } } override func willStopSync() { sendQueueClient.stop() blobSyncTimer?.invalidate() blobSyncTimer = nil } // MARK: - Sync cycle override func performSync() async { // Lazy one-shot bootstrap: DB connect, schema version reset, contacts. if !didBootstrap { activityLog.info("Starting sync service...") syncError = .none do { try imessageReader.connect() log.info("Connected to iMessage database") activityLog.success("Connected to iMessage database") } catch { let errorMsg = error.localizedDescription.lowercased() if errorMsg.contains("authorization denied") || errorMsg.contains("error 23") { syncError = .fullDiskAccessRequired activityLog.error("Full Disk Access required") } else if errorMsg.contains("not found") { syncError = .databaseNotFound activityLog.error("iMessage database not found") } else { syncError = .connectionFailed(error.localizedDescription) activityLog.error("Connection failed: \(error.localizedDescription)") } return } let didReset = await checkSchemaVersionAndReset() if didReset { activityLog.info("Schema version updated, resync required") } activityLog.info("Loading contacts...") let contactsLoaded = await imessageReader.loadContacts() contactSyncInfo.hasContactsPermission = contactsLoaded if contactsLoaded { await syncContacts() } else { activityLog.warning("Contacts permission not granted - names won't sync") } await fetchStats() didBootstrap = true } // Outbound send queue used to be invoked inline here. It now runs on // its own 30s timer via `sendQueueClient` started from // `didStartSync()`. We still flush once at the top of every read // cycle so a manual `syncNow()` drains pending sends promptly. await sendQueueClient.drainOnce() // Initial-load path: server has nothing, stream a metadata-only batch // import (attachments without blob bytes). Subsequent cycles use the // standard incremental path with full attachment data. if isInitialLoad && stats.messageCount == 0 { log.info("Initial load detected - using batch sync") activityLog.info("First sync detected - using optimized batch endpoint") await performInitialSync() isInitialLoad = false } else { isInitialLoad = false await runReadCycle() } // Blob pass runs after every read cycle; uploads any messages whose // attachment bytes haven't been pushed yet. await blobSyncManager.syncBlobs() } private func performInitialSync() async { log.info("performInitialSync starting - streaming batch mode") activityLog.info("Starting initial batch sync...") var latestMessageDate: Date? var totalSynced = 0 var totalFailed = 0 do { let conversations = try imessageReader.getConversations() log.info("Found \(conversations.count) conversations for batch sync") activityLog.info("Found \(conversations.count) conversations") // Stream in windows of maxConversationsPerBatch so we never hold // the full history in memory at once. var window: [SyncMessagesPayload] = [] var windowMessages = 0 var convIndex = 0 func flushWindow() async { guard !window.isEmpty else { return } let batchMsgs = windowMessages activityLog.info("Sending \(window.count) convs / \(batchMsgs) msgs (\(convIndex)/\(conversations.count))...") do { let sent = try await apiClient.syncMessagesBatch(window) totalSynced += sent log.info("Batch sent: \(sent) messages (running total: \(totalSynced))") } catch { totalFailed += window.count log.warning("Batch failed (\(window.count) convs): \(error.localizedDescription)") activityLog.warning("Batch failed, continuing...") } window.removeAll() windowMessages = 0 await Task.yield() } for conversation in conversations { convIndex += 1 do { let messages = try imessageReader.getMessages(conversationId: conversation.id, since: nil) if messages.isEmpty { continue } for msg in messages { if latestMessageDate == nil || msg.date > latestMessageDate! { latestMessageDate = msg.date } } let allPayloadMessages = messages.map { msg in SyncMessagePayload( imessageGuid: msg.guid, senderId: msg.senderId, direction: msg.isFromMe ? "outgoing" : "incoming", sentAt: ISO8601DateFormatter().string(from: msg.date), deliveredAt: msg.dateDelivered.map { ISO8601DateFormatter().string(from: $0) }, readAt: msg.dateRead.map { ISO8601DateFormatter().string(from: $0) }, text: msg.text, attributedBody: nil, // stripped in initial sync — reduces payload 10-50x associatedMessageType: msg.associatedMessageType, associatedMessageGuid: msg.associatedMessageGuid, isAudioMessage: msg.isAudioMessage, expressiveSendStyleId: msg.expressiveSendStyleId, replyToGuid: msg.replyToGuid, threadOriginatorGuid: msg.threadOriginatorGuid, groupTitle: msg.groupTitle, balloonBundleId: msg.balloonBundleId, service: msg.service, senderIdentifier: msg.senderIdentifier, senderDisplayName: msg.senderDisplayName, senderPhoneNumber: msg.senderPhoneNumber, senderEmail: msg.senderEmail, attachments: msg.attachments.map { att in SyncAttachmentPayload( filename: att.filename, mimeType: att.mimeType, transferName: att.transferName, size: att.size, data: nil // metadata-only in initial sync; blobs synced separately ) }, attachmentsCount: msg.attachmentsCount, attachmentsTotalSize: msg.attachmentsTotalSize, attachmentsFiletypes: msg.attachmentsFiletypes ) } // Split oversized conversations so no single payload exceeds the message limit. let msgLimit = ChunkingStrategy.maxMessagesPerBatch let slices = stride(from: 0, to: allPayloadMessages.count, by: msgLimit).map { Array(allPayloadMessages[$0..= ChunkingStrategy.maxConversationsPerBatch || windowMessages >= ChunkingStrategy.maxMessagesPerBatch { await flushWindow() } } } catch { log.warning("Failed to read conversation '\(conversation.displayName)': \(error.localizedDescription)") activityLog.warning("Skipped \(conversation.displayName)") } } // Flush any remaining conversations. await flushWindow() log.info("Initial sync complete - \(totalSynced) messages synced, \(totalFailed) conversations failed") if let newDate = latestMessageDate { setLastSync(newDate) } activityLog.success("Initial sync complete: \(totalSynced) messages synced") await fetchStats() } catch { log.warning("Initial sync failed: \(error.localizedDescription)") activityLog.error("Initial sync failed: \(error.localizedDescription)") } isResyncInProgress = false } private func runReadCycle() async { log.info("performSync starting") activityLog.info("Starting sync...") var runningMessageCount = stats.messageCount var runningConversationCount = stats.conversationCount var conversationsWithNewMessages = Set() do { let conversations = try imessageReader.getConversations() log.info("Found \(conversations.count) conversations") activityLog.info("Found \(conversations.count) conversations") var totalSynced = 0 var failedConversations = 0 for conversation in conversations { do { let messages = try imessageReader.getMessages( conversationId: conversation.id, since: lastSync ) if messages.isEmpty { continue } let chunks = ChunkingStrategy.createAdaptiveChunks(messages) var conversationSynced = 0 for (chunkIndex, chunk) in chunks.enumerated() { let result = try await syncChunkWithRetry(chunk, conversation: conversation) conversationSynced += result.synced totalSynced += result.synced if chunks.count > 1 { log.info("Synced chunk \(chunkIndex + 1)/\(chunks.count) (\(result.synced) messages)") } if result.synced > 0 { runningMessageCount += result.synced if !conversationsWithNewMessages.contains(conversation.id) { conversationsWithNewMessages.insert(conversation.id) runningConversationCount += 1 } stats = SyncStats( messageCount: runningMessageCount, conversationCount: runningConversationCount, contactCount: stats.contactCount ) } } if conversationSynced > 0 { activityLog.info("Synced \(conversationSynced) messages from \(conversation.displayName)") } } catch { failedConversations += 1 log.warning("Failed to sync '\(conversation.displayName)': \(error.localizedDescription)") activityLog.warning("Failed \(conversation.displayName): \(error.localizedDescription)") } } if failedConversations > 0 { if totalSynced > 0 { activityLog.warning("Sync partial: \(totalSynced) messages, \(failedConversations) conversations failed") } else { activityLog.error("Sync failed: \(failedConversations) conversations had errors") } } else if totalSynced > 0 { activityLog.success("Sync complete: \(totalSynced) new messages") } else { activityLog.success("Sync complete: no new messages") } await fetchStats() } catch { log.warning("Sync failed: \(error.localizedDescription)") activityLog.error("Sync failed: \(error.localizedDescription)") } isResyncInProgress = false } private func checkSchemaVersionAndReset() async -> Bool { let storedVersion = UserDefaults.standard.integer(forKey: "syncSchemaVersion") let currentVersion = Self.syncSchemaVersion log.info("Schema version check - stored: \(storedVersion), current: \(currentVersion)") if storedVersion != currentVersion { if storedVersion > 0 { await performReset() } UserDefaults.standard.set(currentVersion, forKey: "syncSchemaVersion") return storedVersion > 0 } return false } private func syncContacts() async { activityLog.info("Syncing contacts...") do { let contacts = imessageReader.getAllContacts() guard !contacts.isEmpty else { activityLog.info("No contacts from address book") contactSyncInfo.loadedFromAddressBook = 0 contactSyncInfo.sampleNames = [] return } contactSyncInfo.loadedFromAddressBook = contacts.count contactSyncInfo.sampleNames = Array(contacts.prefix(5).map { $0.displayName }) let dateFormatter = ISO8601DateFormatter() let payloads = contacts.map { contact in SyncContactPayload( appleId: nil, phoneNumber: contact.phoneNumber, email: contact.email, displayName: contact.displayName, avatarHash: nil, birthday: contact.birthday.map { dateFormatter.string(from: $0) } ) } let synced = try await apiClient.syncContacts(payloads) contactSyncInfo.syncedToServer = synced let sampleStr = contactSyncInfo.sampleNames.prefix(3).joined(separator: ", ") if !sampleStr.isEmpty { activityLog.success("Synced \(synced) contacts: \(sampleStr)...") } else { activityLog.success("Synced \(synced) contacts") } } catch { log.warning("syncContacts failed: \(error.localizedDescription)") activityLog.error("Contact sync failed") } } private func fetchStats() async { do { let response = try await apiClient.getStats() stats = SyncStats( messageCount: response.totalMessages, conversationCount: response.totalConversations, contactCount: response.totalContacts ) if let serverLastSync = response.lastSyncAt, lastSync == nil { setLastSync(serverLastSync) } } catch { log.warning("fetchStats failed: \(error.localizedDescription)") } } private func syncChunkWithRetry( _ chunk: [iMessage], conversation: iMessageConversation ) async throws -> ChunkSyncResult { do { let synced = try await syncSingleChunk(chunk, conversation: conversation) return ChunkSyncResult(synced: synced) } catch let error as APIError { if case .serverError(_, let message) = error, (message ?? "").contains("Payload too large") { if chunk.count == 1 { let msg = chunk[0] let sizeInfo = msg.attachmentsTotalSize > 0 ? "(\(msg.attachmentsTotalSize) bytes)" : "(no attachments)" let label = msg.attachments.first?.filename ?? msg.guid throw APIError.serverError(statusCode: 413, message: "Message too large: \(label) \(sizeInfo)") } let midpoint = chunk.count / 2 let firstResult = try await syncChunkWithRetry(Array(chunk[0.. Int { let payload = SyncMessagesPayload( conversationImessageId: conversation.id, conversationDisplayName: conversation.displayName, isGroup: conversation.isGroup, participantIds: conversation.participantIds, messages: chunk.map { msg in SyncMessagePayload( imessageGuid: msg.guid, senderId: msg.senderId, direction: msg.isFromMe ? "outgoing" : "incoming", sentAt: ISO8601DateFormatter().string(from: msg.date), deliveredAt: msg.dateDelivered.map { ISO8601DateFormatter().string(from: $0) }, readAt: msg.dateRead.map { ISO8601DateFormatter().string(from: $0) }, text: msg.text, attributedBody: msg.attributedBody, associatedMessageType: msg.associatedMessageType, associatedMessageGuid: msg.associatedMessageGuid, isAudioMessage: msg.isAudioMessage, expressiveSendStyleId: msg.expressiveSendStyleId, replyToGuid: msg.replyToGuid, threadOriginatorGuid: msg.threadOriginatorGuid, groupTitle: msg.groupTitle, balloonBundleId: msg.balloonBundleId, service: msg.service, senderIdentifier: msg.senderIdentifier, senderDisplayName: msg.senderDisplayName, senderPhoneNumber: msg.senderPhoneNumber, senderEmail: msg.senderEmail, attachments: msg.attachments.map { att in SyncAttachmentPayload( filename: att.filename, mimeType: att.mimeType, transferName: att.transferName, size: att.size, data: att.data ) }, attachmentsCount: msg.attachmentsCount, attachmentsTotalSize: msg.attachmentsTotalSize, attachmentsFiletypes: msg.attachmentsFiletypes ) } ) return try await apiClient.syncMessages(payload) } // MARK: - Reset func resetAndResync() { guard !isResetting && !isSyncing else { return } Task { await performReset() } } private func performReset() async { log.info("performReset starting") activityLog.info("Starting full reset...") isResetting = true isResyncInProgress = true let previousLastSync = lastSync do { let result = try await apiClient.resetSync() activityLog.info("Cleared \(result.deletedMessages) messages, \(result.deletedConversations) conversations") setLastSync(nil) isInitialLoad = true isResetting = false await performSyncForResync() } catch { log.warning("Reset failed: \(error.localizedDescription)") activityLog.error("Reset failed: \(error.localizedDescription)") if let previous = previousLastSync { setLastSync(previous) } isResetting = false isResyncInProgress = false } } private func performSyncForResync() async { log.info("performSyncForResync starting") activityLog.info("Starting full resync...") isSyncing = true var runningMessageCount = 0 var runningConversationCount = 0 var conversationsWithNewMessages = Set() stats = SyncStats() do { let conversations = try imessageReader.getConversations() activityLog.info("Found \(conversations.count) conversations") var totalSynced = 0 var failedConversations = 0 for conversation in conversations { do { let messages = try imessageReader.getMessages(conversationId: conversation.id, since: nil) if messages.isEmpty { continue } let chunks = ChunkingStrategy.createAdaptiveChunks(messages) var conversationSynced = 0 for (chunkIndex, chunk) in chunks.enumerated() { let result = try await syncChunkWithRetry(chunk, conversation: conversation) conversationSynced += result.synced totalSynced += result.synced if chunks.count > 1 { log.info("Resynced chunk \(chunkIndex + 1)/\(chunks.count) (\(result.synced) messages)") } if result.synced > 0 { runningMessageCount += result.synced if !conversationsWithNewMessages.contains(conversation.id) { conversationsWithNewMessages.insert(conversation.id) runningConversationCount += 1 } stats = SyncStats( messageCount: runningMessageCount, conversationCount: runningConversationCount, contactCount: stats.contactCount ) } } if conversationSynced > 0 { activityLog.info("Resynced \(conversationSynced) messages from \(conversation.displayName)") } } catch { failedConversations += 1 activityLog.warning("Failed \(conversation.displayName): \(error.localizedDescription)") } } let newSyncTime = Date() setLastSync(newSyncTime) lastSyncCompletedAt = newSyncTime UserDefaults.standard.set(newSyncTime, forKey: "imessage.lastSyncCompletedAt") if failedConversations > 0 { activityLog.warning("Resync partial: \(totalSynced) messages, \(failedConversations) failed") } else { activityLog.success("Resync complete: \(totalSynced) messages") } await fetchStats() } catch { activityLog.error("Resync failed: \(error.localizedDescription)") } isSyncing = false isResyncInProgress = false } }