macsync/@packages/imessage/Sources/IMessageSync/SyncManager.swift
Natalie 576496ca3e feat(deploy): video-projects FUSE mount over DO Spaces
Generalize the photos-originals rclone-mount pattern to a video-projects
prefix so the video studio (and imajin ETL, per storage-portability-plan
§2.3) can read/write multi-GB project sources/renders as local files while
only hot data stays resident on plum (bounded VFS LRU cache). Lets a
small-disk laptop work with large footage without filling APFS.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-28 21:10:13 -04:00

775 lines
34 KiB
Swift
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import AppKit
import Combine
import Foundation
import LilithAgentCore
import LilithLogging
import MacSyncShared
private let log = AppLogger.logger(for: "IMessage.Sync")
struct SyncStats: Equatable, Sendable {
var messageCount: Int = 0
var conversationCount: Int = 0
var contactCount: Int = 0
}
struct ChunkSyncResult {
let synced: Int
}
struct ContactSyncInfo {
var loadedFromAddressBook: Int = 0
var syncedToServer: Int = 0
var sampleNames: [String] = []
var hasContactsPermission: Bool = false
}
enum SyncError: Equatable, Sendable {
case none
case fullDiskAccessRequired
case databaseNotFound
case connectionFailed(String)
var message: String {
switch self {
case .none: return ""
case .fullDiskAccessRequired: return "Full Disk Access required"
case .databaseNotFound: return "iMessage database not found"
case .connectionFailed(let msg): return msg
}
}
}
@MainActor
final class SyncManager: BaseSyncManager<SyncStats, SyncError> {
static let shared = SyncManager()
static let syncSchemaVersion = 3
/// Overlap lookback for the incremental read. The watermark advances to the
/// newest synced message *date* (see `runReadCycle`), and each cycle re-reads
/// from `watermark overlap` so messages that land below the watermark
/// out-of-order delivery, late SMS, or a chat.dbDate timezone/DST skew are
/// always re-scanned. Server upserts by external_id, so re-scanning is
/// idempotent (no duplicates). This is the fix for silent recent-message drop.
static let incrementalOverlapSeconds: TimeInterval = 2 * 60 * 60
// Module-specific @Published state (base owns isSyncing/lastSyncCompletedAt/
// currentOperation/syncError/stats).
@Published var contactSyncInfo = ContactSyncInfo()
@Published var isResetting = false
@Published var isResyncInProgress = false
private let imessageReader = iMessageReader.shared
private let apiClient = APIClient.shared
private let sendService = SendService.shared
private let activityLog = ActivityLog.shared
private let blobSyncManager: BlobSyncManager
/// Guards the out-of-band blob pass so passes never overlap. The blob pass is
/// fired detached from the read cycle (see performSync) it must never be
/// awaited there, or a failing blob backend wedges the base `isSyncing` flag.
private var blobSyncInFlight = false
/// First-cycle bootstrap (DB connect, schema-version reset, contacts load) is
/// done lazily inside `performSync` since the base's `startSync` is final.
private var didBootstrap = false
/// True until the first inbound cycle has chosen a code path (initial batch
/// vs incremental). Reset to true after a `resetAndResync`.
private var isInitialLoad = true
/// Separate timer for blob uploads. Runs every 5 minutes, independent of
/// the inbound read cycle and the outbound send queue.
private var blobSyncTimer: Timer?
/// Outbound send-queue runner. Polls the server's legacy
/// `icloud.send_queue` table via `IMessageSendTransport`, delegates the
/// actual AppleScript send + rate-limit + delivery-tracking to
/// `SendService.shared`, and acks the result. Owns its own 30s timer,
/// independent of the read cycle, so an outbound queue with backlog
/// drains promptly even when no inbound sync runs.
private lazy var sendQueueClient: SendQueueClient<IMessageSendTransport> = {
let transport = IMessageSendTransport(apiClient: apiClient)
let service = sendService
let activity = activityLog
return SendQueueClient(label: "imessage", transport: transport, interval: 30) { message in
let result = await service.send(recipient: message.toHandle, body: message.body)
let suffix = String(message.toHandle.prefix(4))
if result.success {
activity.success("Sent message to \(suffix)...")
return .sent
} else {
let reason = result.error ?? "Unknown error"
activity.error("Failed to send to \(suffix)...: \(reason)")
return .failed(reason: reason)
}
}
}()
/// Paced outbound outbox drain (Handoff 03). Independent of the legacy
/// `sendQueueClient`; reuses `SendService` but spaces sends to avoid bursts.
private lazy var outboxSendManager = OutboxSendManager(
api: apiClient,
sendService: sendService,
activityLog: activityLog
)
private init() {
self.blobSyncManager = BlobSyncManager(apiClient: APIClient.shared, activityLog: ActivityLog.shared)
super.init(
initialStats: SyncStats(),
noError: .none,
persistenceKey: "imessage",
timerInterval: 30
)
// Migrate legacy UserDefaults watermark keys (one-time). iMessage used
// unprefixed `lastSync` / `lastSyncCompletedAt`.
if lastSync == nil, let legacy = UserDefaults.standard.object(forKey: "lastSync") as? Date {
setLastSync(legacy)
UserDefaults.standard.removeObject(forKey: "lastSync")
}
if lastSyncCompletedAt == nil, let legacy = UserDefaults.standard.object(forKey: "lastSyncCompletedAt") as? Date {
lastSyncCompletedAt = legacy
UserDefaults.standard.set(legacy, forKey: "imessage.lastSyncCompletedAt")
UserDefaults.standard.removeObject(forKey: "lastSyncCompletedAt")
}
}
func openFullDiskAccessSettings() {
if let url = URL(string: "x-apple.systempreferences:com.apple.preference.security?Privacy_AllFiles") {
NSWorkspace.shared.open(url)
}
}
func retryConnection() { startSync() }
// MARK: - Lifecycle hooks
override func didStartSync() {
TraceLog.write("[imessage] IMessageSyncManager.didStartSync — starting sendQueueClient")
sendQueueClient.start()
outboxSendManager.start()
TraceLog.write("[imessage] IMessageSyncManager.didStartSync — sendQueueClient.start() returned")
// Blob sync on its own 5-minute cadence, independent of read cycle.
blobSyncTimer = Timer.scheduledTimer(withTimeInterval: 300, repeats: true) { [weak self] _ in
guard let self else { return }
Task { await self.blobSyncManager.syncBlobs() }
}
}
override func willStopSync() {
sendQueueClient.stop()
outboxSendManager.stop()
blobSyncTimer?.invalidate()
blobSyncTimer = nil
}
// MARK: - Sync cycle
override func performSync() async {
// Lazy one-shot bootstrap: DB connect, schema version reset, contacts.
if !didBootstrap {
activityLog.info("Starting sync service...")
syncError = .none
do {
try imessageReader.connect()
log.info("Connected to iMessage database")
activityLog.success("Connected to iMessage database")
} catch {
let errorMsg = error.localizedDescription.lowercased()
if errorMsg.contains("authorization denied") || errorMsg.contains("error 23") {
syncError = .fullDiskAccessRequired
activityLog.error("Full Disk Access required")
} else if errorMsg.contains("not found") {
syncError = .databaseNotFound
activityLog.error("iMessage database not found")
} else {
syncError = .connectionFailed(error.localizedDescription)
activityLog.error("Connection failed: \(error.localizedDescription)")
}
return
}
let didReset = await checkSchemaVersionAndReset()
if didReset { activityLog.info("Schema version updated, resync required") }
activityLog.info("Loading contacts...")
let contactsLoaded = await imessageReader.loadContacts()
contactSyncInfo.hasContactsPermission = contactsLoaded
if contactsLoaded {
await syncContacts()
} else {
activityLog.warning("Contacts permission not granted - names won't sync")
}
await fetchStats()
didBootstrap = true
}
// Outbound send queue used to be invoked inline here. It now runs on
// its own 30s timer via `sendQueueClient` started from
// `didStartSync()`. We still flush once at the top of every read
// cycle so a manual `syncNow()` drains pending sends promptly.
await sendQueueClient.drainOnce()
// Initial-load path: server has nothing, stream a metadata-only batch
// import (attachments without blob bytes). Subsequent cycles use the
// standard incremental path with full attachment data.
// Re-enter performInitialSync whenever the server is empty. The previous
// gate flipped `isInitialLoad = false` after the first run regardless of
// success a partial-failure run would then route to the incremental
// path on the next cycle even though the server still had zero messages,
// making the missing data unrecoverable without a manual restart.
if stats.messageCount == 0 {
log.info("Server empty — entering initial-batch sync path")
activityLog.info("First sync detected - using optimized batch endpoint")
await performInitialSync()
isInitialLoad = false
} else {
isInitialLoad = false
await runReadCycle()
}
// Blob pass runs OUT OF BAND never awaited on the read cycle. The
// missing-attachments endpoint has no cursor, so a failing blob backend
// returns the same full page every fetch; awaiting that here held the
// base `isSyncing` flag indefinitely and starved message ingestion (every
// 30s read-timer tick got swallowed by syncNow's guard). Fire it detached,
// guarded so passes don't overlap.
if !blobSyncInFlight {
blobSyncInFlight = true
Task { [weak self, blobSyncManager] in
await blobSyncManager.syncBlobs()
self?.blobSyncInFlight = false
}
}
}
private func performInitialSync() async {
log.info("performInitialSync starting - streaming batch mode")
activityLog.info("Starting initial batch sync...")
var latestMessageDate: Date?
var totalSynced = 0
var totalFailed = 0
do {
let conversations = try imessageReader.getConversations()
log.info("Found \(conversations.count) conversations for batch sync")
activityLog.info("Found \(conversations.count) conversations")
// Stream in windows of maxConversationsPerBatch so we never hold
// the full history in memory at once.
var window: [SyncMessagesPayload] = []
var windowMessages = 0
var convIndex = 0
func flushWindow() async {
guard !window.isEmpty else { return }
let batchMsgs = windowMessages
activityLog.info("Sending \(window.count) convs / \(batchMsgs) msgs (\(convIndex)/\(conversations.count))...")
do {
let sent = try await apiClient.syncMessagesBatch(window)
totalSynced += sent
log.info("Batch sent: \(sent) messages (running total: \(totalSynced))")
} catch {
totalFailed += window.count
let detail = (error as NSError).userInfo.description
log.warning("Batch failed (\(window.count) convs, \(batchMsgs) msgs): \(error.localizedDescription) | \(detail)")
activityLog.warning("Batch failed (\(window.count) convs): \(error.localizedDescription)")
}
window.removeAll()
windowMessages = 0
await Task.yield()
}
for conversation in conversations {
convIndex += 1
if convIndex % 50 == 0 {
log.info("initial-sync progress: \(convIndex)/\(conversations.count) iterated, synced=\(totalSynced) failed=\(totalFailed)")
activityLog.info("Progress: \(convIndex)/\(conversations.count) (synced \(totalSynced) msgs)")
}
do {
let messages = try imessageReader.getMessages(conversationId: conversation.id, since: nil)
if messages.isEmpty { continue }
for msg in messages {
if latestMessageDate == nil || msg.date > latestMessageDate! {
latestMessageDate = msg.date
}
}
let allPayloadMessages = messages.map { msg in
SyncMessagePayload(
imessageGuid: msg.guid,
senderId: msg.senderId,
direction: msg.isFromMe ? "outgoing" : "incoming",
sentAt: ISO8601DateFormatter().string(from: msg.date),
deliveredAt: msg.dateDelivered.map { ISO8601DateFormatter().string(from: $0) },
readAt: msg.dateRead.map { ISO8601DateFormatter().string(from: $0) },
text: msg.text,
attributedBody: nil, // stripped in initial sync reduces payload 10-50x
associatedMessageType: msg.associatedMessageType,
associatedMessageGuid: msg.associatedMessageGuid,
isAudioMessage: msg.isAudioMessage,
expressiveSendStyleId: msg.expressiveSendStyleId,
replyToGuid: msg.replyToGuid,
threadOriginatorGuid: msg.threadOriginatorGuid,
groupTitle: msg.groupTitle,
balloonBundleId: msg.balloonBundleId,
service: msg.service,
senderIdentifier: msg.senderIdentifier,
senderDisplayName: msg.senderDisplayName,
senderPhoneNumber: msg.senderPhoneNumber,
senderEmail: msg.senderEmail,
attachments: msg.attachments.map { att in
SyncAttachmentPayload(
filename: att.filename,
mimeType: att.mimeType,
transferName: att.transferName,
size: att.size,
data: nil // metadata-only in initial sync; blobs synced separately
)
},
attachmentsCount: msg.attachmentsCount,
attachmentsTotalSize: msg.attachmentsTotalSize,
attachmentsFiletypes: msg.attachmentsFiletypes
)
}
// Split oversized conversations so no single payload exceeds the message limit.
let msgLimit = ChunkingStrategy.maxMessagesPerBatch
let slices = stride(from: 0, to: allPayloadMessages.count, by: msgLimit).map {
Array(allPayloadMessages[$0..<min($0 + msgLimit, allPayloadMessages.count)])
}
for slice in slices {
let payload = SyncMessagesPayload(
conversationImessageId: conversation.id,
conversationDisplayName: conversation.displayName,
isGroup: conversation.isGroup,
participantIds: conversation.participantIds,
messages: slice
)
window.append(payload)
windowMessages += slice.count
if window.count >= ChunkingStrategy.maxConversationsPerBatch ||
windowMessages >= ChunkingStrategy.maxMessagesPerBatch {
await flushWindow()
}
}
} catch {
log.warning("Failed to read conversation '\(conversation.displayName)': \(error.localizedDescription)")
activityLog.warning("Skipped \(conversation.displayName)")
}
}
// Flush any remaining conversations.
await flushWindow()
log.info("Initial sync complete - \(totalSynced) messages synced, \(totalFailed) conversations failed")
// Only advance the watermark when every batch was accepted. A partial
// initial sync that advances the watermark would mask the missing
// conversations forever the regular incremental path filters by
// `date > lastSync` and would never retry them. Leaving the watermark
// unset means the next cycle re-enters performInitialSync (gated on
// server-side message count == 0) only after the operator truncates;
// for partial-failure mid-run we want the user to clear and retry,
// or for the loop to retry on the next syncNow.
if totalFailed == 0, let newDate = latestMessageDate {
setLastSync(newDate)
activityLog.success("Initial sync complete: \(totalSynced) messages synced")
} else if totalFailed > 0 {
log.warning("Initial sync partial — watermark NOT advanced (\(totalFailed) convs failed). Re-run will retry.")
activityLog.warning("Initial sync partial: \(totalSynced) synced, \(totalFailed) failed — will retry")
} else {
activityLog.success("Initial sync complete: \(totalSynced) messages synced")
}
await fetchStats()
} catch {
log.warning("Initial sync failed: \(error.localizedDescription)")
activityLog.error("Initial sync failed: \(error.localizedDescription)")
}
isResyncInProgress = false
}
private func runReadCycle() async {
log.info("performSync starting")
activityLog.info("Starting sync...")
var runningMessageCount = stats.messageCount
var runningConversationCount = stats.conversationCount
var conversationsWithNewMessages = Set<String>()
do {
let conversations = try imessageReader.getConversations()
log.info("Found \(conversations.count) conversations")
activityLog.info("Found \(conversations.count) conversations")
var totalSynced = 0
var failedConversations = 0
// Watermark advances to the newest synced message *date* (data time),
// never wall-clock start from the current watermark.
var latestMessageDate: Date? = lastSync
// Re-read from `watermark overlap` so messages below the watermark
// (out-of-order / late / skewed) are always re-scanned; upsert dedupes.
let readSince = lastSync.map { $0.addingTimeInterval(-Self.incrementalOverlapSeconds) }
for conversation in conversations {
do {
let messages = try imessageReader.getMessages(
conversationId: conversation.id,
since: readSince
)
if messages.isEmpty { continue }
for m in messages where latestMessageDate == nil || m.date > latestMessageDate! {
latestMessageDate = m.date
}
let chunks = ChunkingStrategy.createAdaptiveChunks(messages)
var conversationSynced = 0
for (chunkIndex, chunk) in chunks.enumerated() {
let result = try await syncChunkWithRetry(chunk, conversation: conversation)
conversationSynced += result.synced
totalSynced += result.synced
if chunks.count > 1 {
log.info("Synced chunk \(chunkIndex + 1)/\(chunks.count) (\(result.synced) messages)")
}
if result.synced > 0 {
runningMessageCount += result.synced
if !conversationsWithNewMessages.contains(conversation.id) {
conversationsWithNewMessages.insert(conversation.id)
runningConversationCount += 1
}
stats = SyncStats(
messageCount: runningMessageCount,
conversationCount: runningConversationCount,
contactCount: stats.contactCount
)
}
}
if conversationSynced > 0 {
activityLog.info("Synced \(conversationSynced) messages from \(conversation.displayName)")
}
} catch {
failedConversations += 1
log.warning("Failed to sync '\(conversation.displayName)': \(error.localizedDescription)")
activityLog.warning("Failed \(conversation.displayName): \(error.localizedDescription)")
}
}
if failedConversations > 0 {
if totalSynced > 0 {
activityLog.warning("Sync partial: \(totalSynced) messages, \(failedConversations) conversations failed")
} else {
activityLog.error("Sync failed: \(failedConversations) conversations had errors")
}
} else if totalSynced > 0 {
activityLog.success("Sync complete: \(totalSynced) new messages")
} else {
activityLog.success("Sync complete: no new messages")
}
// Advance the watermark to the newest synced message date data time,
// not wall-clock and only on a clean cycle so a partial failure
// retries the gap next time instead of skipping past it. Monotonic:
// never moves the watermark backwards.
if failedConversations == 0, let newDate = latestMessageDate,
lastSync == nil || newDate > lastSync! {
setLastSync(newDate)
}
await fetchStats()
} catch {
log.warning("Sync failed: \(error.localizedDescription)")
activityLog.error("Sync failed: \(error.localizedDescription)")
}
isResyncInProgress = false
}
private func checkSchemaVersionAndReset() async -> Bool {
let storedVersion = UserDefaults.standard.integer(forKey: "syncSchemaVersion")
let currentVersion = Self.syncSchemaVersion
log.info("Schema version check - stored: \(storedVersion), current: \(currentVersion)")
if storedVersion != currentVersion {
if storedVersion > 0 { await performReset() }
UserDefaults.standard.set(currentVersion, forKey: "syncSchemaVersion")
return storedVersion > 0
}
return false
}
private func syncContacts() async {
activityLog.info("Syncing contacts...")
do {
let contacts = imessageReader.getAllContacts()
guard !contacts.isEmpty else {
activityLog.info("No contacts from address book")
contactSyncInfo.loadedFromAddressBook = 0
contactSyncInfo.sampleNames = []
return
}
contactSyncInfo.loadedFromAddressBook = contacts.count
contactSyncInfo.sampleNames = Array(contacts.prefix(5).map { $0.displayName })
let dateFormatter = ISO8601DateFormatter()
let payloads = contacts.map { contact in
SyncContactPayload(
appleId: nil,
phoneNumber: contact.phoneNumber,
email: contact.email,
displayName: contact.displayName,
avatarHash: nil,
birthday: contact.birthday.map { dateFormatter.string(from: $0) }
)
}
let synced = try await apiClient.syncContacts(payloads)
contactSyncInfo.syncedToServer = synced
let sampleStr = contactSyncInfo.sampleNames.prefix(3).joined(separator: ", ")
if !sampleStr.isEmpty {
activityLog.success("Synced \(synced) contacts: \(sampleStr)...")
} else {
activityLog.success("Synced \(synced) contacts")
}
} catch {
log.warning("syncContacts failed: \(error.localizedDescription)")
activityLog.error("Contact sync failed")
}
}
private func fetchStats() async {
do {
let response = try await apiClient.getStats()
stats = SyncStats(
messageCount: response.totalMessages,
conversationCount: response.totalConversations,
contactCount: response.totalContacts
)
if let serverLastSync = response.lastSyncAt, lastSync == nil {
setLastSync(serverLastSync)
}
} catch {
log.warning("fetchStats failed: \(error.localizedDescription)")
}
}
private func syncChunkWithRetry(
_ chunk: [iMessage],
conversation: iMessageConversation
) async throws -> ChunkSyncResult {
do {
let synced = try await syncSingleChunk(chunk, conversation: conversation)
return ChunkSyncResult(synced: synced)
} catch let error as APIError {
if case .serverError(_, let message) = error, (message ?? "").contains("Payload too large") {
if chunk.count == 1 {
let msg = chunk[0]
let sizeInfo = msg.attachmentsTotalSize > 0 ? "(\(msg.attachmentsTotalSize) bytes)" : "(no attachments)"
let label = msg.attachments.first?.filename ?? msg.guid
throw APIError.serverError(statusCode: 413, message: "Message too large: \(label) \(sizeInfo)")
}
let midpoint = chunk.count / 2
let firstResult = try await syncChunkWithRetry(Array(chunk[0..<midpoint]), conversation: conversation)
let secondResult = try await syncChunkWithRetry(Array(chunk[midpoint..<chunk.count]), conversation: conversation)
return ChunkSyncResult(synced: firstResult.synced + secondResult.synced)
}
throw error
}
}
private func syncSingleChunk(
_ chunk: [iMessage],
conversation: iMessageConversation
) async throws -> Int {
let payload = SyncMessagesPayload(
conversationImessageId: conversation.id,
conversationDisplayName: conversation.displayName,
isGroup: conversation.isGroup,
participantIds: conversation.participantIds,
messages: chunk.map { msg in
SyncMessagePayload(
imessageGuid: msg.guid,
senderId: msg.senderId,
direction: msg.isFromMe ? "outgoing" : "incoming",
sentAt: ISO8601DateFormatter().string(from: msg.date),
deliveredAt: msg.dateDelivered.map { ISO8601DateFormatter().string(from: $0) },
readAt: msg.dateRead.map { ISO8601DateFormatter().string(from: $0) },
text: msg.text,
attributedBody: msg.attributedBody,
associatedMessageType: msg.associatedMessageType,
associatedMessageGuid: msg.associatedMessageGuid,
isAudioMessage: msg.isAudioMessage,
expressiveSendStyleId: msg.expressiveSendStyleId,
replyToGuid: msg.replyToGuid,
threadOriginatorGuid: msg.threadOriginatorGuid,
groupTitle: msg.groupTitle,
balloonBundleId: msg.balloonBundleId,
service: msg.service,
senderIdentifier: msg.senderIdentifier,
senderDisplayName: msg.senderDisplayName,
senderPhoneNumber: msg.senderPhoneNumber,
senderEmail: msg.senderEmail,
attachments: msg.attachments.map { att in
SyncAttachmentPayload(
filename: att.filename,
mimeType: att.mimeType,
transferName: att.transferName,
size: att.size,
data: att.data
)
},
attachmentsCount: msg.attachmentsCount,
attachmentsTotalSize: msg.attachmentsTotalSize,
attachmentsFiletypes: msg.attachmentsFiletypes
)
}
)
return try await apiClient.syncMessages(payload)
}
// MARK: - Reset
func resetAndResync() {
guard !isResetting && !isSyncing else { return }
Task { await performReset() }
}
private func performReset() async {
log.info("performReset starting")
activityLog.info("Starting full reset...")
isResetting = true
isResyncInProgress = true
let previousLastSync = lastSync
do {
let result = try await apiClient.resetSync()
activityLog.info("Cleared \(result.deletedMessages) messages, \(result.deletedConversations) conversations")
setLastSync(nil)
isInitialLoad = true
isResetting = false
await performSyncForResync()
} catch {
log.warning("Reset failed: \(error.localizedDescription)")
activityLog.error("Reset failed: \(error.localizedDescription)")
if let previous = previousLastSync {
setLastSync(previous)
}
isResetting = false
isResyncInProgress = false
}
}
private func performSyncForResync() async {
log.info("performSyncForResync starting")
activityLog.info("Starting full resync...")
isSyncing = true
var runningMessageCount = 0
var runningConversationCount = 0
var conversationsWithNewMessages = Set<String>()
stats = SyncStats()
do {
let conversations = try imessageReader.getConversations()
activityLog.info("Found \(conversations.count) conversations")
var totalSynced = 0
var failedConversations = 0
var latestMessageDate: Date?
for conversation in conversations {
do {
let messages = try imessageReader.getMessages(conversationId: conversation.id, since: nil)
if messages.isEmpty { continue }
for m in messages where latestMessageDate == nil || m.date > latestMessageDate! {
latestMessageDate = m.date
}
let chunks = ChunkingStrategy.createAdaptiveChunks(messages)
var conversationSynced = 0
for (chunkIndex, chunk) in chunks.enumerated() {
let result = try await syncChunkWithRetry(chunk, conversation: conversation)
conversationSynced += result.synced
totalSynced += result.synced
if chunks.count > 1 {
log.info("Resynced chunk \(chunkIndex + 1)/\(chunks.count) (\(result.synced) messages)")
}
if result.synced > 0 {
runningMessageCount += result.synced
if !conversationsWithNewMessages.contains(conversation.id) {
conversationsWithNewMessages.insert(conversation.id)
runningConversationCount += 1
}
stats = SyncStats(
messageCount: runningMessageCount,
conversationCount: runningConversationCount,
contactCount: stats.contactCount
)
}
}
if conversationSynced > 0 {
activityLog.info("Resynced \(conversationSynced) messages from \(conversation.displayName)")
}
} catch {
failedConversations += 1
activityLog.warning("Failed \(conversation.displayName): \(error.localizedDescription)")
}
}
// Read watermark = newest synced message date (data time), never
// wall-clock otherwise messages older than the resync clock are
// filtered out on the next incremental cycle and silently lost.
// `lastSyncCompletedAt` is a UI "when did we last run" stamp, so
// wall-clock is correct there.
let completedAt = Date()
if let newDate = latestMessageDate {
setLastSync(newDate)
}
lastSyncCompletedAt = completedAt
UserDefaults.standard.set(completedAt, forKey: "imessage.lastSyncCompletedAt")
if failedConversations > 0 {
activityLog.warning("Resync partial: \(totalSynced) messages, \(failedConversations) failed")
} else {
activityLog.success("Resync complete: \(totalSynced) messages")
}
await fetchStats()
} catch {
activityLog.error("Resync failed: \(error.localizedDescription)")
}
isSyncing = false
isResyncInProgress = false
}
}