diff --git a/@packages/imessage/Sources/IMessageSync/APIClient.swift b/@packages/imessage/Sources/IMessageSync/APIClient.swift index a7f802d..836cb6d 100644 --- a/@packages/imessage/Sources/IMessageSync/APIClient.swift +++ b/@packages/imessage/Sources/IMessageSync/APIClient.swift @@ -9,7 +9,7 @@ private let log = AppLogger.logger(for: "IMessage.API") // MARK: - Protocol -protocol APIClientProtocol: AnyObject { +protocol APIClientProtocol: AnyObject, Sendable { var isAuthenticated: Bool { get } func syncMessages(_ payload: SyncMessagesPayload) async throws -> Int func syncMessagesBatch(_ payloads: [SyncMessagesPayload]) async throws -> Int @@ -269,7 +269,7 @@ struct SyncContactPayload: Encodable { } } -struct PendingSendMessage { +struct PendingSendMessage: Sendable { let id: String let phoneNumber: String let body: String diff --git a/@packages/imessage/Sources/IMessageSync/Reader.swift b/@packages/imessage/Sources/IMessageSync/Reader.swift index a6e8507..47f6f23 100644 --- a/@packages/imessage/Sources/IMessageSync/Reader.swift +++ b/@packages/imessage/Sources/IMessageSync/Reader.swift @@ -291,30 +291,36 @@ class iMessageReader: MessageReaderProtocol { let deliveredUnix: TimeInterval? = row["date_delivered_unix"] let readUnix: TimeInterval? = row["date_read_unix"] - let rawText: String? = row["text"] - let rawAttributedBody: Data? = row["attributedBody"] - var attributedBodyBase64: String? - // Prefer the text column; fall back to decoding attributedBody (NSKeyedArchiver) for - // newer messages where Apple stores the string only in the attributed blob. - var resolvedText: String? = rawText.flatMap { $0.isEmpty ? nil : $0 } - - if let bodyData = rawAttributedBody { - attributedBodyBase64 = bodyData.base64EncodedString() - if resolvedText == nil || resolvedText!.isEmpty { - if let decoded = try? NSKeyedUnarchiver.unarchivedObject( - ofClass: NSAttributedString.self, from: bodyData - ) { - let s = decoded.string.trimmingCharacters(in: .whitespacesAndNewlines) - if !s.isEmpty { resolvedText = s } - } - // Secondary heuristic fallback if NSKeyedUnarchiver fails. - if resolvedText == nil || resolvedText!.isEmpty { - resolvedText = self.extractTextFromAttributedBody(bodyData) - } + var extractedText: String? + if let attributedBodyData: Data = row["attributedBody"] { + attributedBodyBase64 = attributedBodyData.base64EncodedString() + // Try NSKeyedUnarchiver first — when the blob is a real + // archived NSAttributedString it gives us the cleanest + // text. Fall back to the heuristic typedstream scan + // (`extractTextFromAttributedBody`) when it isn't, which + // is the common case for modern URL bubbles / reactions. + if let decoded = try? NSKeyedUnarchiver.unarchivedObject( + ofClass: NSAttributedString.self, from: attributedBodyData + ) { + let s = decoded.string.trimmingCharacters(in: .whitespacesAndNewlines) + if !s.isEmpty { extractedText = s } + } + if extractedText == nil || extractedText!.isEmpty { + extractedText = self.extractTextFromAttributedBody(attributedBodyData) } } + // Prefer the chat.db `text` field; fall back to text recovered + // from attributedBody when the row's text is null or empty + // (common for URL bubbles, reactions, expressive sends). + let rawText: String? = row["text"] + let effectiveText: String? = { + if let raw = rawText, !raw.isEmpty { return raw } + if let extracted = extractedText, !extracted.isEmpty { return extracted } + return rawText + }() + let senderId: String? = row["sender_id"] var senderDisplayName: String? var senderPhoneNumber: String? @@ -340,7 +346,7 @@ class iMessageReader: MessageReaderProtocol { date: Date(timeIntervalSince1970: dateUnix), dateDelivered: deliveredUnix.map { Date(timeIntervalSince1970: $0) }, dateRead: readUnix.map { Date(timeIntervalSince1970: $0) }, - text: resolvedText, + text: effectiveText, attributedBody: attributedBodyBase64, associatedMessageType: row["associated_message_type"], associatedMessageGuid: row["associated_message_guid"], @@ -387,86 +393,139 @@ class iMessageReader: MessageReaderProtocol { } } + /// Decoder for the iMessage `attributedBody` NSKeyedArchiver typedstream + /// blob. Mirror of `src/server/src/shared/typedstream/decode.ts` — both + /// sides must agree on which messages yield non-null text. + /// + /// Strategy: + /// 1. Scan every "NSString" class-definition marker; at each, read a + /// length-prefixed UTF-8 run after the 5-byte padding. Keep the + /// longest valid extraction (handles repeated markers via + /// back-references / appended attribute dicts). + /// 2. Fallback: longest printable byte run that contains at least one + /// letter and is not a known typedstream class-name marker. + /// + /// Length prefixes: + /// - 0x00..0x7F → that byte is the length + /// - 0x81 LO HI → 16-bit little-endian + /// - 0x82 B0..B3 → 32-bit little-endian func extractTextFromAttributedBody(_ data: Data) -> String? { guard !data.isEmpty else { return nil } let bytes = [UInt8](data) - let marker: [UInt8] = [78, 83, 83, 116, 114, 105, 110, 103] // "NSString" + if let text = bestNSStringRun(in: bytes) { return text } + return longestPrintableRun(in: bytes) + } - if let markerIndex = findSubsequence(in: bytes, subsequence: marker) { - let contentStart = markerIndex + 8 + 5 + private static let nsStringMarker: [UInt8] = [78, 83, 83, 116, 114, 105, 110, 103] // "NSString" + private static let nsStringPadding: Int = 5 - if contentStart < bytes.count { - let lengthByte = bytes[contentStart] - let textStart: Int - let textLength: Int + private static let typedstreamMarkerWords: Set = [ + "NSString", "NSMutableString", + "NSAttributedString", "NSMutableAttributedString", + "NSDictionary", "NSMutableDictionary", + "NSArray", "NSMutableArray", + "NSNumber", "NSObject", "NSValue", "NSData", "NSDate", + "streamtyped" + ] - if lengthByte == 0x81 { - if contentStart + 3 <= bytes.count { - textLength = Int(bytes[contentStart + 1]) | (Int(bytes[contentStart + 2]) << 8) - textStart = contentStart + 3 - } else { - return nil - } - } else { - textLength = Int(lengthByte) - textStart = contentStart + 1 - } + private func bestNSStringRun(in bytes: [UInt8]) -> String? { + let marker = Self.nsStringMarker + var best: String? + var bestLen = 0 + var searchFrom = 0 - if textStart + textLength <= bytes.count && textLength > 0 { - let textData = Data(bytes[textStart..<(textStart + textLength)]) - if let text = String(data: textData, encoding: .utf8) { - let trimmed = text.trimmingCharacters(in: .whitespacesAndNewlines) - if !trimmed.isEmpty { return trimmed } - } - } + while searchFrom <= bytes.count - marker.count { + guard let idx = findSubsequence(in: bytes, subsequence: marker, from: searchFrom) else { break } + if let candidate = readNSStringRun(in: bytes, markerIndex: idx), candidate.count > bestLen { + best = candidate + bestLen = candidate.count } + searchFrom = idx + 1 } - var longestText: String? + return best + } + + private func readNSStringRun(in bytes: [UInt8], markerIndex: Int) -> String? { + let contentStart = markerIndex + Self.nsStringMarker.count + Self.nsStringPadding + guard contentStart < bytes.count else { return nil } + guard let len = readVarLength(in: bytes, at: contentStart), len.length > 0 else { return nil } + let end = len.nextOffset + len.length + guard end <= bytes.count else { return nil } + + let slice = Data(bytes[len.nextOffset.. VarLength? { + guard offset < bytes.count else { return nil } + let tag = bytes[offset] + if tag < 0x81 { + return VarLength(length: Int(tag), nextOffset: offset + 1) + } + if tag == 0x81 { + guard offset + 3 <= bytes.count else { return nil } + let length = Int(bytes[offset + 1]) | (Int(bytes[offset + 2]) << 8) + return VarLength(length: length, nextOffset: offset + 3) + } + if tag == 0x82 { + guard offset + 5 <= bytes.count else { return nil } + let b0 = UInt32(bytes[offset + 1]) + let b1 = UInt32(bytes[offset + 2]) + let b2 = UInt32(bytes[offset + 3]) + let b3 = UInt32(bytes[offset + 4]) + let length = b0 | (b1 << 8) | (b2 << 16) | (b3 << 24) + return VarLength(length: Int(length), nextOffset: offset + 5) + } + return nil + } + + private func longestPrintableRun(in bytes: [UInt8]) -> String? { + var longest: String? var currentRun = Data() + let flush: (inout Data, inout String?) -> Void = { run, longest in + guard run.count > 3 else { run = Data(); return } + if let str = String(data: run, encoding: .utf8), + str.rangeOfCharacter(from: .letters) != nil { + let trimmed = str.trimmingCharacters(in: .whitespacesAndNewlines) + if trimmed.count > (longest?.count ?? 0) + && !Self.typedstreamMarkerWords.contains(trimmed) + && !trimmed.hasPrefix("NS") + && !trimmed.hasPrefix("__") + && !trimmed.contains("attributedString") { + longest = trimmed + } + } + run = Data() + } + for byte in bytes { if (byte >= 0x20 && byte <= 0x7E) || byte >= 0x80 || byte == 0x0A || byte == 0x0D { currentRun.append(byte) } else { - if currentRun.count > 3 { - if let str = String(data: currentRun, encoding: .utf8), - str.rangeOfCharacter(from: .letters) != nil { - let trimmed = str.trimmingCharacters(in: .whitespacesAndNewlines) - if trimmed.count > (longestText?.count ?? 0) && - !trimmed.hasPrefix("NS") && - !trimmed.hasPrefix("__") && - !trimmed.contains("attributedString") { - longestText = trimmed - } - } - } - currentRun = Data() + flush(¤tRun, &longest) } } + flush(¤tRun, &longest) - if currentRun.count > 3 { - if let str = String(data: currentRun, encoding: .utf8), - str.rangeOfCharacter(from: .letters) != nil { - let trimmed = str.trimmingCharacters(in: .whitespacesAndNewlines) - if trimmed.count > (longestText?.count ?? 0) && - !trimmed.hasPrefix("NS") && - !trimmed.hasPrefix("__") && - !trimmed.contains("attributedString") { - longestText = trimmed - } - } - } - - return longestText + return longest } - func findSubsequence(in array: [UInt8], subsequence: [UInt8]) -> Int? { + func findSubsequence(in array: [UInt8], subsequence: [UInt8], from: Int = 0) -> Int? { guard subsequence.count <= array.count else { return nil } - return (0...(array.count - subsequence.count)) + let start = max(0, from) + let last = array.count - subsequence.count + guard start <= last else { return nil } + return (start...last) .first { Array(array[$0..<($0 + subsequence.count)]) == subsequence } } + } enum iMessageError: LocalizedError { diff --git a/@packages/imessage/Sources/IMessageSync/SyncManager.swift b/@packages/imessage/Sources/IMessageSync/SyncManager.swift index 6d59385..c08a5d3 100644 --- a/@packages/imessage/Sources/IMessageSync/SyncManager.swift +++ b/@packages/imessage/Sources/IMessageSync/SyncManager.swift @@ -7,7 +7,7 @@ import MacSyncShared private let log = AppLogger.logger(for: "IMessage.Sync") -struct SyncStats: Equatable { +struct SyncStats: Equatable, Sendable { var messageCount: Int = 0 var conversationCount: Int = 0 var contactCount: Int = 0 @@ -24,7 +24,7 @@ struct ContactSyncInfo { var hasContactsPermission: Bool = false } -enum SyncError: Equatable { +enum SyncError: Equatable, Sendable { case none case fullDiskAccessRequired case databaseNotFound @@ -41,16 +41,13 @@ enum SyncError: Equatable { } @MainActor -class SyncManager: ObservableObject { +final class SyncManager: BaseSyncManager { static let shared = SyncManager() static let syncSchemaVersion = 3 - @Published var isSyncing = false - @Published var lastSync: Date? - @Published var lastSyncCompletedAt: Date? - @Published var stats = SyncStats() - @Published var syncError: SyncError = .none + // Module-specific @Published state (base owns isSyncing/lastSyncCompletedAt/ + // currentOperation/syncError/stats). @Published var contactSyncInfo = ContactSyncInfo() @Published var isResetting = false @Published var isResyncInProgress = false @@ -60,56 +57,118 @@ class SyncManager: ObservableObject { private let sendService = SendService.shared private let activityLog = ActivityLog.shared private let blobSyncManager: BlobSyncManager - private var syncTimer: Timer? - private var blobSyncTimer: Timer? + + /// First-cycle bootstrap (DB connect, schema-version reset, contacts load) is + /// done lazily inside `performSync` since the base's `startSync` is final. + private var didBootstrap = false + + /// True until the first inbound cycle has chosen a code path (initial batch + /// vs incremental). Reset to true after a `resetAndResync`. private var isInitialLoad = true + /// Separate timer for blob uploads. Runs every 5 minutes, independent of + /// the inbound read cycle and the outbound send queue. + private var blobSyncTimer: Timer? + + /// Outbound send-queue runner. Polls the server's legacy + /// `icloud.send_queue` table via `IMessageSendTransport`, delegates the + /// actual AppleScript send + rate-limit + delivery-tracking to + /// `SendService.shared`, and acks the result. Owns its own 30s timer, + /// independent of the read cycle, so an outbound queue with backlog + /// drains promptly even when no inbound sync runs. + private lazy var sendQueueClient: SendQueueClient = { + let transport = IMessageSendTransport(apiClient: apiClient) + let service = sendService + let activity = activityLog + return SendQueueClient(label: "imessage", transport: transport, interval: 30) { message in + let result = service.send(recipient: message.phoneNumber, body: message.body) + let suffix = String(message.phoneNumber.prefix(4)) + if result.success { + activity.success("Sent message to \(suffix)...") + return .sent + } else { + let reason = result.error ?? "Unknown error" + activity.error("Failed to send to \(suffix)...: \(reason)") + return .failed(reason: reason) + } + } + }() + private init() { self.blobSyncManager = BlobSyncManager(apiClient: APIClient.shared, activityLog: ActivityLog.shared) - lastSync = UserDefaults.standard.object(forKey: "lastSync") as? Date - lastSyncCompletedAt = UserDefaults.standard.object(forKey: "lastSyncCompletedAt") as? Date - log.info("init - lastSync: \(String(describing: self.lastSync))") - } - private func checkSchemaVersionAndReset() async -> Bool { - let storedVersion = UserDefaults.standard.integer(forKey: "syncSchemaVersion") - let currentVersion = Self.syncSchemaVersion + super.init( + initialStats: SyncStats(), + noError: .none, + persistenceKey: "imessage", + timerInterval: 30 + ) - log.info("Schema version check - stored: \(storedVersion), current: \(currentVersion)") - - if storedVersion != currentVersion { - if storedVersion > 0 { await performReset() } - UserDefaults.standard.set(currentVersion, forKey: "syncSchemaVersion") - return storedVersion > 0 + // Migrate legacy UserDefaults watermark keys (one-time). iMessage used + // unprefixed `lastSync` / `lastSyncCompletedAt`. + if lastSync == nil, let legacy = UserDefaults.standard.object(forKey: "lastSync") as? Date { + setLastSync(legacy) + UserDefaults.standard.removeObject(forKey: "lastSync") + } + if lastSyncCompletedAt == nil, let legacy = UserDefaults.standard.object(forKey: "lastSyncCompletedAt") as? Date { + lastSyncCompletedAt = legacy + UserDefaults.standard.set(legacy, forKey: "imessage.lastSyncCompletedAt") + UserDefaults.standard.removeObject(forKey: "lastSyncCompletedAt") } - return false } - func startSync() { - log.info("startSync called") - activityLog.info("Starting sync service...") - syncError = .none + func openFullDiskAccessSettings() { + if let url = URL(string: "x-apple.systempreferences:com.apple.preference.security?Privacy_AllFiles") { + NSWorkspace.shared.open(url) + } + } - do { - try imessageReader.connect() - log.info("Connected to iMessage database") - activityLog.success("Connected to iMessage database") - } catch { - let errorMsg = error.localizedDescription.lowercased() - if errorMsg.contains("authorization denied") || errorMsg.contains("error 23") { - syncError = .fullDiskAccessRequired - activityLog.error("Full Disk Access required") - } else if errorMsg.contains("not found") { - syncError = .databaseNotFound - activityLog.error("iMessage database not found") - } else { - syncError = .connectionFailed(error.localizedDescription) - activityLog.error("Connection failed: \(error.localizedDescription)") + func retryConnection() { startSync() } + + // MARK: - Lifecycle hooks + + override func didStartSync() { + sendQueueClient.start() + // Blob sync on its own 5-minute cadence, independent of read cycle. + blobSyncTimer = Timer.scheduledTimer(withTimeInterval: 300, repeats: true) { [weak self] _ in + guard let self else { return } + Task { await self.blobSyncManager.syncBlobs() } + } + } + + override func willStopSync() { + sendQueueClient.stop() + blobSyncTimer?.invalidate() + blobSyncTimer = nil + } + + // MARK: - Sync cycle + + override func performSync() async { + // Lazy one-shot bootstrap: DB connect, schema version reset, contacts. + if !didBootstrap { + activityLog.info("Starting sync service...") + syncError = .none + + do { + try imessageReader.connect() + log.info("Connected to iMessage database") + activityLog.success("Connected to iMessage database") + } catch { + let errorMsg = error.localizedDescription.lowercased() + if errorMsg.contains("authorization denied") || errorMsg.contains("error 23") { + syncError = .fullDiskAccessRequired + activityLog.error("Full Disk Access required") + } else if errorMsg.contains("not found") { + syncError = .databaseNotFound + activityLog.error("iMessage database not found") + } else { + syncError = .connectionFailed(error.localizedDescription) + activityLog.error("Connection failed: \(error.localizedDescription)") + } + return } - return - } - Task { let didReset = await checkSchemaVersionAndReset() if didReset { activityLog.info("Schema version updated, resync required") } @@ -123,58 +182,37 @@ class SyncManager: ObservableObject { activityLog.warning("Contacts permission not granted - names won't sync") } - if isInitialLoad && stats.messageCount == 0 { - log.info("Initial load detected - using batch sync") - activityLog.info("First sync detected - using optimized batch endpoint") - await performInitialSync() - isInitialLoad = false - } else { - isInitialLoad = false - await performSync() - } - - // Blob pass runs whenever there are missing attachment blobs, regardless - // of whether this was an initial or incremental sync. - await blobSyncManager.syncBlobs() + await fetchStats() + didBootstrap = true } - syncTimer = Timer.scheduledTimer(withTimeInterval: 30, repeats: true) { [weak self] _ in - Task { @MainActor in self?.syncNow() } - } - blobSyncTimer = Timer.scheduledTimer(withTimeInterval: 300, repeats: true) { [weak self] _ in - guard let self else { return } - Task { await self.blobSyncManager.syncBlobs() } - } - activityLog.info("Scheduled automatic sync every 30s, blob sync every 5m") - } + // Outbound send queue used to be invoked inline here. It now runs on + // its own 30s timer via `sendQueueClient` started from + // `didStartSync()`. We still flush once at the top of every read + // cycle so a manual `syncNow()` drains pending sends promptly. + await sendQueueClient.drainOnce() - func stopSync() { - syncTimer?.invalidate() - syncTimer = nil - blobSyncTimer?.invalidate() - blobSyncTimer = nil - } - - func openFullDiskAccessSettings() { - if let url = URL(string: "x-apple.systempreferences:com.apple.preference.security?Privacy_AllFiles") { - NSWorkspace.shared.open(url) + // Initial-load path: server has nothing, stream a metadata-only batch + // import (attachments without blob bytes). Subsequent cycles use the + // standard incremental path with full attachment data. + if isInitialLoad && stats.messageCount == 0 { + log.info("Initial load detected - using batch sync") + activityLog.info("First sync detected - using optimized batch endpoint") + await performInitialSync() + isInitialLoad = false + } else { + isInitialLoad = false + await runReadCycle() } - } - func retryConnection() { startSync() } - - func syncNow() { - guard !isSyncing else { return } - Task { - await processSendQueue() - await performSync() - } + // Blob pass runs after every read cycle; uploads any messages whose + // attachment bytes haven't been pushed yet. + await blobSyncManager.syncBlobs() } private func performInitialSync() async { log.info("performInitialSync starting - streaming batch mode") activityLog.info("Starting initial batch sync...") - isSyncing = true var latestMessageDate: Date? var totalSynced = 0 @@ -186,7 +224,7 @@ class SyncManager: ObservableObject { activityLog.info("Found \(conversations.count) conversations") // Stream in windows of maxConversationsPerBatch so we never hold - // the full 48k-message history in memory at once. + // the full history in memory at once. var window: [SyncMessagesPayload] = [] var windowMessages = 0 var convIndex = 0 @@ -293,14 +331,9 @@ class SyncManager: ObservableObject { log.info("Initial sync complete - \(totalSynced) messages synced, \(totalFailed) conversations failed") if let newDate = latestMessageDate { - lastSync = newDate - UserDefaults.standard.set(newDate, forKey: "lastSync") + setLastSync(newDate) } - let now = Date() - lastSyncCompletedAt = now - UserDefaults.standard.set(now, forKey: "lastSyncCompletedAt") - activityLog.success("Initial sync complete: \(totalSynced) messages synced") await fetchStats() @@ -309,19 +342,16 @@ class SyncManager: ObservableObject { activityLog.error("Initial sync failed: \(error.localizedDescription)") } - isSyncing = false isResyncInProgress = false } - private func performSync() async { + private func runReadCycle() async { log.info("performSync starting") activityLog.info("Starting sync...") - isSyncing = true var runningMessageCount = stats.messageCount var runningConversationCount = stats.conversationCount var conversationsWithNewMessages = Set() - var latestMessageDate: Date? do { let conversations = try imessageReader.getConversations() @@ -339,12 +369,6 @@ class SyncManager: ObservableObject { if messages.isEmpty { continue } - for msg in messages { - if latestMessageDate == nil || msg.date > latestMessageDate! { - latestMessageDate = msg.date - } - } - let chunks = ChunkingStrategy.createAdaptiveChunks(messages) var conversationSynced = 0 @@ -381,16 +405,6 @@ class SyncManager: ObservableObject { } } - if totalSynced > 0 || failedConversations == 0 { - let newSyncTime = latestMessageDate ?? Date() - lastSync = newSyncTime - UserDefaults.standard.set(newSyncTime, forKey: "lastSync") - } - - let now = Date() - lastSyncCompletedAt = now - UserDefaults.standard.set(now, forKey: "lastSyncCompletedAt") - if failedConversations > 0 { if totalSynced > 0 { activityLog.warning("Sync partial: \(totalSynced) messages, \(failedConversations) conversations failed") @@ -410,10 +424,23 @@ class SyncManager: ObservableObject { activityLog.error("Sync failed: \(error.localizedDescription)") } - isSyncing = false isResyncInProgress = false } + private func checkSchemaVersionAndReset() async -> Bool { + let storedVersion = UserDefaults.standard.integer(forKey: "syncSchemaVersion") + let currentVersion = Self.syncSchemaVersion + + log.info("Schema version check - stored: \(storedVersion), current: \(currentVersion)") + + if storedVersion != currentVersion { + if storedVersion > 0 { await performReset() } + UserDefaults.standard.set(currentVersion, forKey: "syncSchemaVersion") + return storedVersion > 0 + } + return false + } + private func syncContacts() async { activityLog.info("Syncing contacts...") do { @@ -464,47 +491,13 @@ class SyncManager: ObservableObject { contactCount: response.totalContacts ) if let serverLastSync = response.lastSyncAt, lastSync == nil { - lastSync = serverLastSync - UserDefaults.standard.set(serverLastSync, forKey: "lastSync") + setLastSync(serverLastSync) } } catch { log.warning("fetchStats failed: \(error.localizedDescription)") } } - private func processSendQueue() async { - guard apiClient.isAuthenticated else { return } - - do { - let pendingMessages = try await apiClient.getPendingSends() - if pendingMessages.isEmpty { return } - - activityLog.info("Processing \(pendingMessages.count) queued message(s)...") - - for message in pendingMessages { - let result = sendService.send(recipient: message.phoneNumber, body: message.body) - - do { - try await apiClient.reportSendResult( - messageId: message.id, - status: result.success ? "sent" : "failed", - error: result.error - ) - if result.success { - activityLog.success("Sent message to \(message.phoneNumber.prefix(4))...") - } else { - activityLog.error("Failed to send to \(message.phoneNumber.prefix(4))...: \(result.error ?? "Unknown error")") - } - } catch { - activityLog.warning("Failed to report send result: \(error.localizedDescription)") - } - } - } catch { - if case APIError.missingAuthToken = error { return } - log.warning("processSendQueue failed: \(error.localizedDescription)") - } - } - private func syncChunkWithRetry( _ chunk: [iMessage], conversation: iMessageConversation @@ -600,7 +593,8 @@ class SyncManager: ObservableObject { let result = try await apiClient.resetSync() activityLog.info("Cleared \(result.deletedMessages) messages, \(result.deletedConversations) conversations") - UserDefaults.standard.removeObject(forKey: "lastSync") + setLastSync(nil) + isInitialLoad = true isResetting = false await performSyncForResync() } catch { @@ -608,7 +602,7 @@ class SyncManager: ObservableObject { activityLog.error("Reset failed: \(error.localizedDescription)") if let previous = previousLastSync { - UserDefaults.standard.set(previous, forKey: "lastSync") + setLastSync(previous) } isResetting = false isResyncInProgress = false @@ -672,10 +666,9 @@ class SyncManager: ObservableObject { } let newSyncTime = Date() - lastSync = newSyncTime + setLastSync(newSyncTime) lastSyncCompletedAt = newSyncTime - UserDefaults.standard.set(newSyncTime, forKey: "lastSync") - UserDefaults.standard.set(newSyncTime, forKey: "lastSyncCompletedAt") + UserDefaults.standard.set(newSyncTime, forKey: "imessage.lastSyncCompletedAt") if failedConversations > 0 { activityLog.warning("Resync partial: \(totalSynced) messages, \(failedConversations) failed") diff --git a/@packages/iphoto/Sources/IPhotoSync/SyncManager.swift b/@packages/iphoto/Sources/IPhotoSync/SyncManager.swift index 3cc9ec3..79c61ac 100644 --- a/@packages/iphoto/Sources/IPhotoSync/SyncManager.swift +++ b/@packages/iphoto/Sources/IPhotoSync/SyncManager.swift @@ -80,21 +80,11 @@ public enum IPhotoSyncError: Equatable, Sendable { /// Upload concurrency is capped at `maxConcurrentUploads` (4) using `withTaskGroup` /// with explicit batch windows so we never open more than 4 concurrent HTTP streams. @MainActor -public final class SyncManager: ObservableObject { +public final class SyncManager: BaseSyncManager { public static let shared = SyncManager() - // MARK: Public state (observed by MenuBar + local web server) - @Published public var isSyncing = false - @Published public var lastSync: Date? - @Published public var lastSyncCompletedAt: Date? - @Published public var stats = IPhotoSyncStats() - @Published public var syncError: IPhotoSyncError = .none - @Published public var currentOperation: String = "" - - // MARK: Private private let reader = PhotosLibraryReader.shared private let apiClient = APIClient.shared - private var syncTimer: Timer? /// Batch size for metadata sync — 100 photos per request. private let metadataBatchSize = 100 @@ -102,57 +92,43 @@ public final class SyncManager: ObservableObject { private let maxConcurrentUploads = 4 private init() { - lastSync = UserDefaults.standard.object(forKey: "iphotoLastSync") as? Date - lastSyncCompletedAt = UserDefaults.standard.object(forKey: "iphotoLastSyncCompletedAt") as? Date - log.info("init lastSync=\(String(describing: self.lastSync))") - } + super.init( + initialStats: IPhotoSyncStats(), + noError: .none, + persistenceKey: "iphoto", + timerInterval: 300 + ) - // MARK: - Lifecycle - - public func startSync() { - log.info("startSync called") - syncError = .none - - Task { - let authorized = await reader.requestAuthorization() - guard authorized else { - log.warning("Photos access denied") - syncError = .photosAccessRequired - return - } - await performSync() + // Migrate legacy UserDefaults watermark keys (one-time). + if lastSync == nil, let legacy = UserDefaults.standard.object(forKey: "iphotoLastSync") as? Date { + setLastSync(legacy) + UserDefaults.standard.removeObject(forKey: "iphotoLastSync") } - - syncTimer = Timer.scheduledTimer(withTimeInterval: 300, repeats: true) { [weak self] _ in - Task { @MainActor in self?.syncNow() } - } - log.info("Scheduled automatic sync every 5 minutes") - } - - public func stopSync() { - syncTimer?.invalidate() - syncTimer = nil - } - - public func syncNow() { - guard !isSyncing else { return } - Task { - if !reader.isAccessible { - let ok = await reader.requestAuthorization() - guard ok else { - syncError = .photosAccessRequired - return - } - } - await performSync() + if lastSyncCompletedAt == nil, let legacy = UserDefaults.standard.object(forKey: "iphotoLastSyncCompletedAt") as? Date { + lastSyncCompletedAt = legacy + UserDefaults.standard.set(legacy, forKey: "iphoto.lastSyncCompletedAt") + UserDefaults.standard.removeObject(forKey: "iphotoLastSyncCompletedAt") } } + // MARK: - Authorization hooks + + public override func isAuthorized() async -> Bool { reader.isAccessible } + + public override func requestAuthorization() async -> Bool { + await reader.requestAuthorization() + } + + public override func onAuthorizationDenied() { + syncError = .photosAccessRequired + } + + // MARK: - Module-specific entry points + public func forceFullResync() { guard !isSyncing else { return } log.info("Forcing full resync") - lastSync = nil - UserDefaults.standard.removeObject(forKey: "iphotoLastSync") + setLastSync(nil) stats.failedBatches = 0 syncNow() } @@ -160,15 +136,16 @@ public final class SyncManager: ObservableObject { /// Upload only photos that have metadata but no binary on the server. public func uploadPending() { guard !isSyncing else { return } - Task { - if !reader.isAccessible { - let ok = await reader.requestAuthorization() + Task { [weak self] in + guard let self else { return } + if !self.reader.isAccessible { + let ok = await self.reader.requestAuthorization() guard ok else { - syncError = .photosAccessRequired + self.syncError = .photosAccessRequired return } } - await performPendingUpload() + await self.performPendingUpload() } } @@ -180,9 +157,8 @@ public final class SyncManager: ObservableObject { // MARK: - Sync cycle - private func performSync() async { + public override func performSync() async { log.info("performSync starting") - isSyncing = true currentOperation = "Fetching photos…" // Background diagnostic — never blocks sync loop @@ -195,7 +171,6 @@ public final class SyncManager: ObservableObject { if photos.isEmpty { await updateStats() - isSyncing = false currentOperation = "No new photos" await performPendingUpload() return @@ -242,13 +217,8 @@ public final class SyncManager: ObservableObject { log.info("Batch \(idx + 1)/\(batches.count) synced=\(response.synced) needsUpload=\(response.needsUpload.count)") } catch { stats.failedBatches += 1 - let msg = error.localizedDescription.lowercased() - if msg.contains("network") || msg.contains("connection") || - msg.contains("timeout") || msg.contains("unreachable") { - syncError = .backendUnreachable - } else { - log.warning("Batch \(idx + 1) failed: \(error.localizedDescription)") - } + setConnectionError(error) + log.warning("Batch \(idx + 1) failed: \(error.localizedDescription)") } } @@ -261,23 +231,15 @@ public final class SyncManager: ObservableObject { currentOperation = "Syncing albums…" await syncAlbums() - let now = Date() - lastSync = now - lastSyncCompletedAt = now - UserDefaults.standard.set(now, forKey: "iphotoLastSync") - UserDefaults.standard.set(now, forKey: "iphotoLastSyncCompletedAt") - await updateStats() currentOperation = "Sync complete" log.info("performSync complete") - isSyncing = false await performPendingUpload() } private func performPendingUpload() async { log.info("Checking pending uploads") - isSyncing = true currentOperation = "Checking pending uploads…" Task.detached { self.reader.checkAccessibility() } @@ -288,19 +250,16 @@ public final class SyncManager: ObservableObject { guard !pending.isEmpty else { currentOperation = "No pending uploads" - isSyncing = false return } currentOperation = "Uploading pending…" await uploadBatch(pending, label: "Pending upload") await updateStats() - } catch { log.warning("getPendingUploads failed: \(error.localizedDescription)") } - isSyncing = false currentOperation = "" } @@ -439,4 +398,10 @@ public final class SyncManager: ObservableObject { log.warning("updateStats failed: \(error.localizedDescription)") } } + + private func setConnectionError(_ error: Error) { + if SyncConnectionErrorHeuristic.isConnectionError(error) { + syncError = .backendUnreachable + } + } }