The iMessage read cycle is driven by BaseSyncManager's 30s timer → syncNow(), which is gated by 'guard !isSyncing'. performSync awaited blobSyncManager.syncBlobs() inline, and that blob pass infinite-loops when the upload backend is failing: /attachments/missing has no cursor, so a full page of perpetually-failing uploads is re-fetched and re-failed forever, the loop only breaking on a < pageSize page. performSync never returned → isSyncing stuck true → every 30s read tick swallowed. Net effect: messages only synced on app launch, drifting hours behind between restarts (send-queue timers are independent, so they kept polling — the tell that the timer fired but syncNow was gated). Two fixes: - Decouple the blob pass: fire it detached + in-flight-guarded instead of awaiting it on the read cycle, so a slow/failing blob backend can never hold isSyncing. - Bound the blob loop: stop a pass after any full page that produced zero successful uploads (the same missing set would be re-fetched), instead of spinning forever. Verified: read cycle now fires every ~30s on the live process without a restart; blob pass logs 'stopping pass' and returns; store lag ~7s. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
765 lines
34 KiB
Swift
765 lines
34 KiB
Swift
import AppKit
|
||
import Combine
|
||
import Foundation
|
||
import LilithAgentCore
|
||
import LilithLogging
|
||
import MacSyncShared
|
||
|
||
private let log = AppLogger.logger(for: "IMessage.Sync")
|
||
|
||
struct SyncStats: Equatable, Sendable {
|
||
var messageCount: Int = 0
|
||
var conversationCount: Int = 0
|
||
var contactCount: Int = 0
|
||
}
|
||
|
||
struct ChunkSyncResult {
|
||
let synced: Int
|
||
}
|
||
|
||
struct ContactSyncInfo {
|
||
var loadedFromAddressBook: Int = 0
|
||
var syncedToServer: Int = 0
|
||
var sampleNames: [String] = []
|
||
var hasContactsPermission: Bool = false
|
||
}
|
||
|
||
enum SyncError: Equatable, Sendable {
|
||
case none
|
||
case fullDiskAccessRequired
|
||
case databaseNotFound
|
||
case connectionFailed(String)
|
||
|
||
var message: String {
|
||
switch self {
|
||
case .none: return ""
|
||
case .fullDiskAccessRequired: return "Full Disk Access required"
|
||
case .databaseNotFound: return "iMessage database not found"
|
||
case .connectionFailed(let msg): return msg
|
||
}
|
||
}
|
||
}
|
||
|
||
@MainActor
|
||
final class SyncManager: BaseSyncManager<SyncStats, SyncError> {
|
||
static let shared = SyncManager()
|
||
|
||
static let syncSchemaVersion = 3
|
||
|
||
/// Overlap lookback for the incremental read. The watermark advances to the
|
||
/// newest synced message *date* (see `runReadCycle`), and each cycle re-reads
|
||
/// from `watermark − overlap` so messages that land below the watermark —
|
||
/// out-of-order delivery, late SMS, or a chat.db→Date timezone/DST skew — are
|
||
/// always re-scanned. Server upserts by external_id, so re-scanning is
|
||
/// idempotent (no duplicates). This is the fix for silent recent-message drop.
|
||
static let incrementalOverlapSeconds: TimeInterval = 2 * 60 * 60
|
||
|
||
// Module-specific @Published state (base owns isSyncing/lastSyncCompletedAt/
|
||
// currentOperation/syncError/stats).
|
||
@Published var contactSyncInfo = ContactSyncInfo()
|
||
@Published var isResetting = false
|
||
@Published var isResyncInProgress = false
|
||
|
||
private let imessageReader = iMessageReader.shared
|
||
private let apiClient = APIClient.shared
|
||
private let sendService = SendService.shared
|
||
private let activityLog = ActivityLog.shared
|
||
private let blobSyncManager: BlobSyncManager
|
||
/// Guards the out-of-band blob pass so passes never overlap. The blob pass is
|
||
/// fired detached from the read cycle (see performSync) — it must never be
|
||
/// awaited there, or a failing blob backend wedges the base `isSyncing` flag.
|
||
private var blobSyncInFlight = false
|
||
|
||
/// First-cycle bootstrap (DB connect, schema-version reset, contacts load) is
|
||
/// done lazily inside `performSync` since the base's `startSync` is final.
|
||
private var didBootstrap = false
|
||
|
||
/// True until the first inbound cycle has chosen a code path (initial batch
|
||
/// vs incremental). Reset to true after a `resetAndResync`.
|
||
private var isInitialLoad = true
|
||
|
||
/// Separate timer for blob uploads. Runs every 5 minutes, independent of
|
||
/// the inbound read cycle and the outbound send queue.
|
||
private var blobSyncTimer: Timer?
|
||
|
||
/// Outbound send-queue runner. Polls the server's legacy
|
||
/// `icloud.send_queue` table via `IMessageSendTransport`, delegates the
|
||
/// actual AppleScript send + rate-limit + delivery-tracking to
|
||
/// `SendService.shared`, and acks the result. Owns its own 30s timer,
|
||
/// independent of the read cycle, so an outbound queue with backlog
|
||
/// drains promptly even when no inbound sync runs.
|
||
private lazy var sendQueueClient: SendQueueClient<IMessageSendTransport> = {
|
||
let transport = IMessageSendTransport(apiClient: apiClient)
|
||
let service = sendService
|
||
let activity = activityLog
|
||
return SendQueueClient(label: "imessage", transport: transport, interval: 30) { message in
|
||
let result = await service.send(recipient: message.toHandle, body: message.body)
|
||
let suffix = String(message.toHandle.prefix(4))
|
||
if result.success {
|
||
activity.success("Sent message to \(suffix)...")
|
||
return .sent
|
||
} else {
|
||
let reason = result.error ?? "Unknown error"
|
||
activity.error("Failed to send to \(suffix)...: \(reason)")
|
||
return .failed(reason: reason)
|
||
}
|
||
}
|
||
}()
|
||
|
||
private init() {
|
||
self.blobSyncManager = BlobSyncManager(apiClient: APIClient.shared, activityLog: ActivityLog.shared)
|
||
|
||
super.init(
|
||
initialStats: SyncStats(),
|
||
noError: .none,
|
||
persistenceKey: "imessage",
|
||
timerInterval: 30
|
||
)
|
||
|
||
// Migrate legacy UserDefaults watermark keys (one-time). iMessage used
|
||
// unprefixed `lastSync` / `lastSyncCompletedAt`.
|
||
if lastSync == nil, let legacy = UserDefaults.standard.object(forKey: "lastSync") as? Date {
|
||
setLastSync(legacy)
|
||
UserDefaults.standard.removeObject(forKey: "lastSync")
|
||
}
|
||
if lastSyncCompletedAt == nil, let legacy = UserDefaults.standard.object(forKey: "lastSyncCompletedAt") as? Date {
|
||
lastSyncCompletedAt = legacy
|
||
UserDefaults.standard.set(legacy, forKey: "imessage.lastSyncCompletedAt")
|
||
UserDefaults.standard.removeObject(forKey: "lastSyncCompletedAt")
|
||
}
|
||
}
|
||
|
||
func openFullDiskAccessSettings() {
|
||
if let url = URL(string: "x-apple.systempreferences:com.apple.preference.security?Privacy_AllFiles") {
|
||
NSWorkspace.shared.open(url)
|
||
}
|
||
}
|
||
|
||
func retryConnection() { startSync() }
|
||
|
||
// MARK: - Lifecycle hooks
|
||
|
||
override func didStartSync() {
|
||
TraceLog.write("[imessage] IMessageSyncManager.didStartSync — starting sendQueueClient")
|
||
sendQueueClient.start()
|
||
TraceLog.write("[imessage] IMessageSyncManager.didStartSync — sendQueueClient.start() returned")
|
||
// Blob sync on its own 5-minute cadence, independent of read cycle.
|
||
blobSyncTimer = Timer.scheduledTimer(withTimeInterval: 300, repeats: true) { [weak self] _ in
|
||
guard let self else { return }
|
||
Task { await self.blobSyncManager.syncBlobs() }
|
||
}
|
||
}
|
||
|
||
override func willStopSync() {
|
||
sendQueueClient.stop()
|
||
blobSyncTimer?.invalidate()
|
||
blobSyncTimer = nil
|
||
}
|
||
|
||
// MARK: - Sync cycle
|
||
|
||
override func performSync() async {
|
||
// Lazy one-shot bootstrap: DB connect, schema version reset, contacts.
|
||
if !didBootstrap {
|
||
activityLog.info("Starting sync service...")
|
||
syncError = .none
|
||
|
||
do {
|
||
try imessageReader.connect()
|
||
log.info("Connected to iMessage database")
|
||
activityLog.success("Connected to iMessage database")
|
||
} catch {
|
||
let errorMsg = error.localizedDescription.lowercased()
|
||
if errorMsg.contains("authorization denied") || errorMsg.contains("error 23") {
|
||
syncError = .fullDiskAccessRequired
|
||
activityLog.error("Full Disk Access required")
|
||
} else if errorMsg.contains("not found") {
|
||
syncError = .databaseNotFound
|
||
activityLog.error("iMessage database not found")
|
||
} else {
|
||
syncError = .connectionFailed(error.localizedDescription)
|
||
activityLog.error("Connection failed: \(error.localizedDescription)")
|
||
}
|
||
return
|
||
}
|
||
|
||
let didReset = await checkSchemaVersionAndReset()
|
||
if didReset { activityLog.info("Schema version updated, resync required") }
|
||
|
||
activityLog.info("Loading contacts...")
|
||
let contactsLoaded = await imessageReader.loadContacts()
|
||
contactSyncInfo.hasContactsPermission = contactsLoaded
|
||
|
||
if contactsLoaded {
|
||
await syncContacts()
|
||
} else {
|
||
activityLog.warning("Contacts permission not granted - names won't sync")
|
||
}
|
||
|
||
await fetchStats()
|
||
didBootstrap = true
|
||
}
|
||
|
||
// Outbound send queue used to be invoked inline here. It now runs on
|
||
// its own 30s timer via `sendQueueClient` started from
|
||
// `didStartSync()`. We still flush once at the top of every read
|
||
// cycle so a manual `syncNow()` drains pending sends promptly.
|
||
await sendQueueClient.drainOnce()
|
||
|
||
// Initial-load path: server has nothing, stream a metadata-only batch
|
||
// import (attachments without blob bytes). Subsequent cycles use the
|
||
// standard incremental path with full attachment data.
|
||
// Re-enter performInitialSync whenever the server is empty. The previous
|
||
// gate flipped `isInitialLoad = false` after the first run regardless of
|
||
// success — a partial-failure run would then route to the incremental
|
||
// path on the next cycle even though the server still had zero messages,
|
||
// making the missing data unrecoverable without a manual restart.
|
||
if stats.messageCount == 0 {
|
||
log.info("Server empty — entering initial-batch sync path")
|
||
activityLog.info("First sync detected - using optimized batch endpoint")
|
||
await performInitialSync()
|
||
isInitialLoad = false
|
||
} else {
|
||
isInitialLoad = false
|
||
await runReadCycle()
|
||
}
|
||
|
||
// Blob pass runs OUT OF BAND — never awaited on the read cycle. The
|
||
// missing-attachments endpoint has no cursor, so a failing blob backend
|
||
// returns the same full page every fetch; awaiting that here held the
|
||
// base `isSyncing` flag indefinitely and starved message ingestion (every
|
||
// 30s read-timer tick got swallowed by syncNow's guard). Fire it detached,
|
||
// guarded so passes don't overlap.
|
||
if !blobSyncInFlight {
|
||
blobSyncInFlight = true
|
||
Task { [weak self, blobSyncManager] in
|
||
await blobSyncManager.syncBlobs()
|
||
self?.blobSyncInFlight = false
|
||
}
|
||
}
|
||
}
|
||
|
||
private func performInitialSync() async {
|
||
log.info("performInitialSync starting - streaming batch mode")
|
||
activityLog.info("Starting initial batch sync...")
|
||
|
||
var latestMessageDate: Date?
|
||
var totalSynced = 0
|
||
var totalFailed = 0
|
||
|
||
do {
|
||
let conversations = try imessageReader.getConversations()
|
||
log.info("Found \(conversations.count) conversations for batch sync")
|
||
activityLog.info("Found \(conversations.count) conversations")
|
||
|
||
// Stream in windows of maxConversationsPerBatch so we never hold
|
||
// the full history in memory at once.
|
||
var window: [SyncMessagesPayload] = []
|
||
var windowMessages = 0
|
||
var convIndex = 0
|
||
|
||
func flushWindow() async {
|
||
guard !window.isEmpty else { return }
|
||
let batchMsgs = windowMessages
|
||
activityLog.info("Sending \(window.count) convs / \(batchMsgs) msgs (\(convIndex)/\(conversations.count))...")
|
||
do {
|
||
let sent = try await apiClient.syncMessagesBatch(window)
|
||
totalSynced += sent
|
||
log.info("Batch sent: \(sent) messages (running total: \(totalSynced))")
|
||
} catch {
|
||
totalFailed += window.count
|
||
let detail = (error as NSError).userInfo.description
|
||
log.warning("Batch failed (\(window.count) convs, \(batchMsgs) msgs): \(error.localizedDescription) | \(detail)")
|
||
activityLog.warning("Batch failed (\(window.count) convs): \(error.localizedDescription)")
|
||
}
|
||
window.removeAll()
|
||
windowMessages = 0
|
||
await Task.yield()
|
||
}
|
||
|
||
for conversation in conversations {
|
||
convIndex += 1
|
||
if convIndex % 50 == 0 {
|
||
log.info("initial-sync progress: \(convIndex)/\(conversations.count) iterated, synced=\(totalSynced) failed=\(totalFailed)")
|
||
activityLog.info("Progress: \(convIndex)/\(conversations.count) (synced \(totalSynced) msgs)")
|
||
}
|
||
do {
|
||
let messages = try imessageReader.getMessages(conversationId: conversation.id, since: nil)
|
||
if messages.isEmpty { continue }
|
||
|
||
for msg in messages {
|
||
if latestMessageDate == nil || msg.date > latestMessageDate! {
|
||
latestMessageDate = msg.date
|
||
}
|
||
}
|
||
|
||
let allPayloadMessages = messages.map { msg in
|
||
SyncMessagePayload(
|
||
imessageGuid: msg.guid,
|
||
senderId: msg.senderId,
|
||
direction: msg.isFromMe ? "outgoing" : "incoming",
|
||
sentAt: ISO8601DateFormatter().string(from: msg.date),
|
||
deliveredAt: msg.dateDelivered.map { ISO8601DateFormatter().string(from: $0) },
|
||
readAt: msg.dateRead.map { ISO8601DateFormatter().string(from: $0) },
|
||
text: msg.text,
|
||
attributedBody: nil, // stripped in initial sync — reduces payload 10-50x
|
||
associatedMessageType: msg.associatedMessageType,
|
||
associatedMessageGuid: msg.associatedMessageGuid,
|
||
isAudioMessage: msg.isAudioMessage,
|
||
expressiveSendStyleId: msg.expressiveSendStyleId,
|
||
replyToGuid: msg.replyToGuid,
|
||
threadOriginatorGuid: msg.threadOriginatorGuid,
|
||
groupTitle: msg.groupTitle,
|
||
balloonBundleId: msg.balloonBundleId,
|
||
service: msg.service,
|
||
senderIdentifier: msg.senderIdentifier,
|
||
senderDisplayName: msg.senderDisplayName,
|
||
senderPhoneNumber: msg.senderPhoneNumber,
|
||
senderEmail: msg.senderEmail,
|
||
attachments: msg.attachments.map { att in
|
||
SyncAttachmentPayload(
|
||
filename: att.filename,
|
||
mimeType: att.mimeType,
|
||
transferName: att.transferName,
|
||
size: att.size,
|
||
data: nil // metadata-only in initial sync; blobs synced separately
|
||
)
|
||
},
|
||
attachmentsCount: msg.attachmentsCount,
|
||
attachmentsTotalSize: msg.attachmentsTotalSize,
|
||
attachmentsFiletypes: msg.attachmentsFiletypes
|
||
)
|
||
}
|
||
|
||
// Split oversized conversations so no single payload exceeds the message limit.
|
||
let msgLimit = ChunkingStrategy.maxMessagesPerBatch
|
||
let slices = stride(from: 0, to: allPayloadMessages.count, by: msgLimit).map {
|
||
Array(allPayloadMessages[$0..<min($0 + msgLimit, allPayloadMessages.count)])
|
||
}
|
||
|
||
for slice in slices {
|
||
let payload = SyncMessagesPayload(
|
||
conversationImessageId: conversation.id,
|
||
conversationDisplayName: conversation.displayName,
|
||
isGroup: conversation.isGroup,
|
||
participantIds: conversation.participantIds,
|
||
messages: slice
|
||
)
|
||
window.append(payload)
|
||
windowMessages += slice.count
|
||
|
||
if window.count >= ChunkingStrategy.maxConversationsPerBatch ||
|
||
windowMessages >= ChunkingStrategy.maxMessagesPerBatch {
|
||
await flushWindow()
|
||
}
|
||
}
|
||
} catch {
|
||
log.warning("Failed to read conversation '\(conversation.displayName)': \(error.localizedDescription)")
|
||
activityLog.warning("Skipped \(conversation.displayName)")
|
||
}
|
||
}
|
||
|
||
// Flush any remaining conversations.
|
||
await flushWindow()
|
||
|
||
log.info("Initial sync complete - \(totalSynced) messages synced, \(totalFailed) conversations failed")
|
||
|
||
// Only advance the watermark when every batch was accepted. A partial
|
||
// initial sync that advances the watermark would mask the missing
|
||
// conversations forever — the regular incremental path filters by
|
||
// `date > lastSync` and would never retry them. Leaving the watermark
|
||
// unset means the next cycle re-enters performInitialSync (gated on
|
||
// server-side message count == 0) only after the operator truncates;
|
||
// for partial-failure mid-run we want the user to clear and retry,
|
||
// or for the loop to retry on the next syncNow.
|
||
if totalFailed == 0, let newDate = latestMessageDate {
|
||
setLastSync(newDate)
|
||
activityLog.success("Initial sync complete: \(totalSynced) messages synced")
|
||
} else if totalFailed > 0 {
|
||
log.warning("Initial sync partial — watermark NOT advanced (\(totalFailed) convs failed). Re-run will retry.")
|
||
activityLog.warning("Initial sync partial: \(totalSynced) synced, \(totalFailed) failed — will retry")
|
||
} else {
|
||
activityLog.success("Initial sync complete: \(totalSynced) messages synced")
|
||
}
|
||
await fetchStats()
|
||
|
||
} catch {
|
||
log.warning("Initial sync failed: \(error.localizedDescription)")
|
||
activityLog.error("Initial sync failed: \(error.localizedDescription)")
|
||
}
|
||
|
||
isResyncInProgress = false
|
||
}
|
||
|
||
private func runReadCycle() async {
|
||
log.info("performSync starting")
|
||
activityLog.info("Starting sync...")
|
||
|
||
var runningMessageCount = stats.messageCount
|
||
var runningConversationCount = stats.conversationCount
|
||
var conversationsWithNewMessages = Set<String>()
|
||
|
||
do {
|
||
let conversations = try imessageReader.getConversations()
|
||
log.info("Found \(conversations.count) conversations")
|
||
activityLog.info("Found \(conversations.count) conversations")
|
||
var totalSynced = 0
|
||
var failedConversations = 0
|
||
// Watermark advances to the newest synced message *date* (data time),
|
||
// never wall-clock — start from the current watermark.
|
||
var latestMessageDate: Date? = lastSync
|
||
// Re-read from `watermark − overlap` so messages below the watermark
|
||
// (out-of-order / late / skewed) are always re-scanned; upsert dedupes.
|
||
let readSince = lastSync.map { $0.addingTimeInterval(-Self.incrementalOverlapSeconds) }
|
||
|
||
for conversation in conversations {
|
||
do {
|
||
let messages = try imessageReader.getMessages(
|
||
conversationId: conversation.id,
|
||
since: readSince
|
||
)
|
||
|
||
if messages.isEmpty { continue }
|
||
|
||
for m in messages where latestMessageDate == nil || m.date > latestMessageDate! {
|
||
latestMessageDate = m.date
|
||
}
|
||
|
||
let chunks = ChunkingStrategy.createAdaptiveChunks(messages)
|
||
var conversationSynced = 0
|
||
|
||
for (chunkIndex, chunk) in chunks.enumerated() {
|
||
let result = try await syncChunkWithRetry(chunk, conversation: conversation)
|
||
conversationSynced += result.synced
|
||
totalSynced += result.synced
|
||
|
||
if chunks.count > 1 {
|
||
log.info("Synced chunk \(chunkIndex + 1)/\(chunks.count) (\(result.synced) messages)")
|
||
}
|
||
|
||
if result.synced > 0 {
|
||
runningMessageCount += result.synced
|
||
if !conversationsWithNewMessages.contains(conversation.id) {
|
||
conversationsWithNewMessages.insert(conversation.id)
|
||
runningConversationCount += 1
|
||
}
|
||
stats = SyncStats(
|
||
messageCount: runningMessageCount,
|
||
conversationCount: runningConversationCount,
|
||
contactCount: stats.contactCount
|
||
)
|
||
}
|
||
}
|
||
|
||
if conversationSynced > 0 {
|
||
activityLog.info("Synced \(conversationSynced) messages from \(conversation.displayName)")
|
||
}
|
||
} catch {
|
||
failedConversations += 1
|
||
log.warning("Failed to sync '\(conversation.displayName)': \(error.localizedDescription)")
|
||
activityLog.warning("Failed \(conversation.displayName): \(error.localizedDescription)")
|
||
}
|
||
}
|
||
|
||
if failedConversations > 0 {
|
||
if totalSynced > 0 {
|
||
activityLog.warning("Sync partial: \(totalSynced) messages, \(failedConversations) conversations failed")
|
||
} else {
|
||
activityLog.error("Sync failed: \(failedConversations) conversations had errors")
|
||
}
|
||
} else if totalSynced > 0 {
|
||
activityLog.success("Sync complete: \(totalSynced) new messages")
|
||
} else {
|
||
activityLog.success("Sync complete: no new messages")
|
||
}
|
||
|
||
// Advance the watermark to the newest synced message date — data time,
|
||
// not wall-clock — and only on a clean cycle so a partial failure
|
||
// retries the gap next time instead of skipping past it. Monotonic:
|
||
// never moves the watermark backwards.
|
||
if failedConversations == 0, let newDate = latestMessageDate,
|
||
lastSync == nil || newDate > lastSync! {
|
||
setLastSync(newDate)
|
||
}
|
||
|
||
await fetchStats()
|
||
|
||
} catch {
|
||
log.warning("Sync failed: \(error.localizedDescription)")
|
||
activityLog.error("Sync failed: \(error.localizedDescription)")
|
||
}
|
||
|
||
isResyncInProgress = false
|
||
}
|
||
|
||
private func checkSchemaVersionAndReset() async -> Bool {
|
||
let storedVersion = UserDefaults.standard.integer(forKey: "syncSchemaVersion")
|
||
let currentVersion = Self.syncSchemaVersion
|
||
|
||
log.info("Schema version check - stored: \(storedVersion), current: \(currentVersion)")
|
||
|
||
if storedVersion != currentVersion {
|
||
if storedVersion > 0 { await performReset() }
|
||
UserDefaults.standard.set(currentVersion, forKey: "syncSchemaVersion")
|
||
return storedVersion > 0
|
||
}
|
||
return false
|
||
}
|
||
|
||
private func syncContacts() async {
|
||
activityLog.info("Syncing contacts...")
|
||
do {
|
||
let contacts = imessageReader.getAllContacts()
|
||
guard !contacts.isEmpty else {
|
||
activityLog.info("No contacts from address book")
|
||
contactSyncInfo.loadedFromAddressBook = 0
|
||
contactSyncInfo.sampleNames = []
|
||
return
|
||
}
|
||
|
||
contactSyncInfo.loadedFromAddressBook = contacts.count
|
||
contactSyncInfo.sampleNames = Array(contacts.prefix(5).map { $0.displayName })
|
||
|
||
let dateFormatter = ISO8601DateFormatter()
|
||
let payloads = contacts.map { contact in
|
||
SyncContactPayload(
|
||
appleId: nil,
|
||
phoneNumber: contact.phoneNumber,
|
||
email: contact.email,
|
||
displayName: contact.displayName,
|
||
avatarHash: nil,
|
||
birthday: contact.birthday.map { dateFormatter.string(from: $0) }
|
||
)
|
||
}
|
||
|
||
let synced = try await apiClient.syncContacts(payloads)
|
||
contactSyncInfo.syncedToServer = synced
|
||
|
||
let sampleStr = contactSyncInfo.sampleNames.prefix(3).joined(separator: ", ")
|
||
if !sampleStr.isEmpty {
|
||
activityLog.success("Synced \(synced) contacts: \(sampleStr)...")
|
||
} else {
|
||
activityLog.success("Synced \(synced) contacts")
|
||
}
|
||
} catch {
|
||
log.warning("syncContacts failed: \(error.localizedDescription)")
|
||
activityLog.error("Contact sync failed")
|
||
}
|
||
}
|
||
|
||
private func fetchStats() async {
|
||
do {
|
||
let response = try await apiClient.getStats()
|
||
stats = SyncStats(
|
||
messageCount: response.totalMessages,
|
||
conversationCount: response.totalConversations,
|
||
contactCount: response.totalContacts
|
||
)
|
||
if let serverLastSync = response.lastSyncAt, lastSync == nil {
|
||
setLastSync(serverLastSync)
|
||
}
|
||
} catch {
|
||
log.warning("fetchStats failed: \(error.localizedDescription)")
|
||
}
|
||
}
|
||
|
||
private func syncChunkWithRetry(
|
||
_ chunk: [iMessage],
|
||
conversation: iMessageConversation
|
||
) async throws -> ChunkSyncResult {
|
||
do {
|
||
let synced = try await syncSingleChunk(chunk, conversation: conversation)
|
||
return ChunkSyncResult(synced: synced)
|
||
} catch let error as APIError {
|
||
if case .serverError(_, let message) = error, (message ?? "").contains("Payload too large") {
|
||
if chunk.count == 1 {
|
||
let msg = chunk[0]
|
||
let sizeInfo = msg.attachmentsTotalSize > 0 ? "(\(msg.attachmentsTotalSize) bytes)" : "(no attachments)"
|
||
let label = msg.attachments.first?.filename ?? msg.guid
|
||
throw APIError.serverError(statusCode: 413, message: "Message too large: \(label) \(sizeInfo)")
|
||
}
|
||
|
||
let midpoint = chunk.count / 2
|
||
let firstResult = try await syncChunkWithRetry(Array(chunk[0..<midpoint]), conversation: conversation)
|
||
let secondResult = try await syncChunkWithRetry(Array(chunk[midpoint..<chunk.count]), conversation: conversation)
|
||
return ChunkSyncResult(synced: firstResult.synced + secondResult.synced)
|
||
}
|
||
throw error
|
||
}
|
||
}
|
||
|
||
private func syncSingleChunk(
|
||
_ chunk: [iMessage],
|
||
conversation: iMessageConversation
|
||
) async throws -> Int {
|
||
let payload = SyncMessagesPayload(
|
||
conversationImessageId: conversation.id,
|
||
conversationDisplayName: conversation.displayName,
|
||
isGroup: conversation.isGroup,
|
||
participantIds: conversation.participantIds,
|
||
messages: chunk.map { msg in
|
||
SyncMessagePayload(
|
||
imessageGuid: msg.guid,
|
||
senderId: msg.senderId,
|
||
direction: msg.isFromMe ? "outgoing" : "incoming",
|
||
sentAt: ISO8601DateFormatter().string(from: msg.date),
|
||
deliveredAt: msg.dateDelivered.map { ISO8601DateFormatter().string(from: $0) },
|
||
readAt: msg.dateRead.map { ISO8601DateFormatter().string(from: $0) },
|
||
text: msg.text,
|
||
attributedBody: msg.attributedBody,
|
||
associatedMessageType: msg.associatedMessageType,
|
||
associatedMessageGuid: msg.associatedMessageGuid,
|
||
isAudioMessage: msg.isAudioMessage,
|
||
expressiveSendStyleId: msg.expressiveSendStyleId,
|
||
replyToGuid: msg.replyToGuid,
|
||
threadOriginatorGuid: msg.threadOriginatorGuid,
|
||
groupTitle: msg.groupTitle,
|
||
balloonBundleId: msg.balloonBundleId,
|
||
service: msg.service,
|
||
senderIdentifier: msg.senderIdentifier,
|
||
senderDisplayName: msg.senderDisplayName,
|
||
senderPhoneNumber: msg.senderPhoneNumber,
|
||
senderEmail: msg.senderEmail,
|
||
attachments: msg.attachments.map { att in
|
||
SyncAttachmentPayload(
|
||
filename: att.filename,
|
||
mimeType: att.mimeType,
|
||
transferName: att.transferName,
|
||
size: att.size,
|
||
data: att.data
|
||
)
|
||
},
|
||
attachmentsCount: msg.attachmentsCount,
|
||
attachmentsTotalSize: msg.attachmentsTotalSize,
|
||
attachmentsFiletypes: msg.attachmentsFiletypes
|
||
)
|
||
}
|
||
)
|
||
|
||
return try await apiClient.syncMessages(payload)
|
||
}
|
||
|
||
// MARK: - Reset
|
||
|
||
func resetAndResync() {
|
||
guard !isResetting && !isSyncing else { return }
|
||
Task { await performReset() }
|
||
}
|
||
|
||
private func performReset() async {
|
||
log.info("performReset starting")
|
||
activityLog.info("Starting full reset...")
|
||
isResetting = true
|
||
isResyncInProgress = true
|
||
|
||
let previousLastSync = lastSync
|
||
|
||
do {
|
||
let result = try await apiClient.resetSync()
|
||
activityLog.info("Cleared \(result.deletedMessages) messages, \(result.deletedConversations) conversations")
|
||
|
||
setLastSync(nil)
|
||
isInitialLoad = true
|
||
isResetting = false
|
||
await performSyncForResync()
|
||
} catch {
|
||
log.warning("Reset failed: \(error.localizedDescription)")
|
||
activityLog.error("Reset failed: \(error.localizedDescription)")
|
||
|
||
if let previous = previousLastSync {
|
||
setLastSync(previous)
|
||
}
|
||
isResetting = false
|
||
isResyncInProgress = false
|
||
}
|
||
}
|
||
|
||
private func performSyncForResync() async {
|
||
log.info("performSyncForResync starting")
|
||
activityLog.info("Starting full resync...")
|
||
isSyncing = true
|
||
|
||
var runningMessageCount = 0
|
||
var runningConversationCount = 0
|
||
var conversationsWithNewMessages = Set<String>()
|
||
stats = SyncStats()
|
||
|
||
do {
|
||
let conversations = try imessageReader.getConversations()
|
||
activityLog.info("Found \(conversations.count) conversations")
|
||
var totalSynced = 0
|
||
var failedConversations = 0
|
||
|
||
var latestMessageDate: Date?
|
||
|
||
for conversation in conversations {
|
||
do {
|
||
let messages = try imessageReader.getMessages(conversationId: conversation.id, since: nil)
|
||
if messages.isEmpty { continue }
|
||
|
||
for m in messages where latestMessageDate == nil || m.date > latestMessageDate! {
|
||
latestMessageDate = m.date
|
||
}
|
||
|
||
let chunks = ChunkingStrategy.createAdaptiveChunks(messages)
|
||
var conversationSynced = 0
|
||
|
||
for (chunkIndex, chunk) in chunks.enumerated() {
|
||
let result = try await syncChunkWithRetry(chunk, conversation: conversation)
|
||
conversationSynced += result.synced
|
||
totalSynced += result.synced
|
||
|
||
if chunks.count > 1 {
|
||
log.info("Resynced chunk \(chunkIndex + 1)/\(chunks.count) (\(result.synced) messages)")
|
||
}
|
||
|
||
if result.synced > 0 {
|
||
runningMessageCount += result.synced
|
||
if !conversationsWithNewMessages.contains(conversation.id) {
|
||
conversationsWithNewMessages.insert(conversation.id)
|
||
runningConversationCount += 1
|
||
}
|
||
stats = SyncStats(
|
||
messageCount: runningMessageCount,
|
||
conversationCount: runningConversationCount,
|
||
contactCount: stats.contactCount
|
||
)
|
||
}
|
||
}
|
||
|
||
if conversationSynced > 0 {
|
||
activityLog.info("Resynced \(conversationSynced) messages from \(conversation.displayName)")
|
||
}
|
||
} catch {
|
||
failedConversations += 1
|
||
activityLog.warning("Failed \(conversation.displayName): \(error.localizedDescription)")
|
||
}
|
||
}
|
||
|
||
// Read watermark = newest synced message date (data time), never
|
||
// wall-clock — otherwise messages older than the resync clock are
|
||
// filtered out on the next incremental cycle and silently lost.
|
||
// `lastSyncCompletedAt` is a UI "when did we last run" stamp, so
|
||
// wall-clock is correct there.
|
||
let completedAt = Date()
|
||
if let newDate = latestMessageDate {
|
||
setLastSync(newDate)
|
||
}
|
||
lastSyncCompletedAt = completedAt
|
||
UserDefaults.standard.set(completedAt, forKey: "imessage.lastSyncCompletedAt")
|
||
|
||
if failedConversations > 0 {
|
||
activityLog.warning("Resync partial: \(totalSynced) messages, \(failedConversations) failed")
|
||
} else {
|
||
activityLog.success("Resync complete: \(totalSynced) messages")
|
||
}
|
||
|
||
await fetchStats()
|
||
} catch {
|
||
activityLog.error("Resync failed: \(error.localizedDescription)")
|
||
}
|
||
|
||
isSyncing = false
|
||
isResyncInProgress = false
|
||
}
|
||
}
|