macsync/@packages/imessage/Sources/IMessageSync/SyncManager.swift

776 lines
34 KiB
Swift
Raw Permalink Normal View History

import AppKit
import Combine
import Foundation
import LilithAgentCore
import LilithLogging
import MacSyncShared
private let log = AppLogger.logger(for: "IMessage.Sync")
struct SyncStats: Equatable, Sendable {
var messageCount: Int = 0
var conversationCount: Int = 0
var contactCount: Int = 0
}
struct ChunkSyncResult {
let synced: Int
}
struct ContactSyncInfo {
var loadedFromAddressBook: Int = 0
var syncedToServer: Int = 0
var sampleNames: [String] = []
var hasContactsPermission: Bool = false
}
enum SyncError: Equatable, Sendable {
case none
case fullDiskAccessRequired
case databaseNotFound
case connectionFailed(String)
var message: String {
switch self {
case .none: return ""
case .fullDiskAccessRequired: return "Full Disk Access required"
case .databaseNotFound: return "iMessage database not found"
case .connectionFailed(let msg): return msg
}
}
}
@MainActor
final class SyncManager: BaseSyncManager<SyncStats, SyncError> {
static let shared = SyncManager()
static let syncSchemaVersion = 3
/// Overlap lookback for the incremental read. The watermark advances to the
/// newest synced message *date* (see `runReadCycle`), and each cycle re-reads
/// from `watermark overlap` so messages that land below the watermark
/// out-of-order delivery, late SMS, or a chat.dbDate timezone/DST skew are
/// always re-scanned. Server upserts by external_id, so re-scanning is
/// idempotent (no duplicates). This is the fix for silent recent-message drop.
static let incrementalOverlapSeconds: TimeInterval = 2 * 60 * 60
// Module-specific @Published state (base owns isSyncing/lastSyncCompletedAt/
// currentOperation/syncError/stats).
@Published var contactSyncInfo = ContactSyncInfo()
@Published var isResetting = false
@Published var isResyncInProgress = false
private let imessageReader = iMessageReader.shared
private let apiClient = APIClient.shared
private let sendService = SendService.shared
private let activityLog = ActivityLog.shared
private let blobSyncManager: BlobSyncManager
/// Guards the out-of-band blob pass so passes never overlap. The blob pass is
/// fired detached from the read cycle (see performSync) it must never be
/// awaited there, or a failing blob backend wedges the base `isSyncing` flag.
private var blobSyncInFlight = false
/// First-cycle bootstrap (DB connect, schema-version reset, contacts load) is
/// done lazily inside `performSync` since the base's `startSync` is final.
private var didBootstrap = false
/// True until the first inbound cycle has chosen a code path (initial batch
/// vs incremental). Reset to true after a `resetAndResync`.
private var isInitialLoad = true
/// Separate timer for blob uploads. Runs every 5 minutes, independent of
/// the inbound read cycle and the outbound send queue.
private var blobSyncTimer: Timer?
/// Outbound send-queue runner. Polls the server's legacy
/// `icloud.send_queue` table via `IMessageSendTransport`, delegates the
/// actual AppleScript send + rate-limit + delivery-tracking to
/// `SendService.shared`, and acks the result. Owns its own 30s timer,
/// independent of the read cycle, so an outbound queue with backlog
/// drains promptly even when no inbound sync runs.
private lazy var sendQueueClient: SendQueueClient<IMessageSendTransport> = {
let transport = IMessageSendTransport(apiClient: apiClient)
let service = sendService
let activity = activityLog
return SendQueueClient(label: "imessage", transport: transport, interval: 30) { message in
let result = await service.send(recipient: message.toHandle, body: message.body)
let suffix = String(message.toHandle.prefix(4))
if result.success {
activity.success("Sent message to \(suffix)...")
return .sent
} else {
let reason = result.error ?? "Unknown error"
activity.error("Failed to send to \(suffix)...: \(reason)")
return .failed(reason: reason)
}
}
}()
/// Paced outbound outbox drain (Handoff 03). Independent of the legacy
/// `sendQueueClient`; reuses `SendService` but spaces sends to avoid bursts.
private lazy var outboxSendManager = OutboxSendManager(
api: apiClient,
sendService: sendService,
activityLog: activityLog
)
private init() {
self.blobSyncManager = BlobSyncManager(apiClient: APIClient.shared, activityLog: ActivityLog.shared)
super.init(
initialStats: SyncStats(),
noError: .none,
persistenceKey: "imessage",
timerInterval: 30
)
// Migrate legacy UserDefaults watermark keys (one-time). iMessage used
// unprefixed `lastSync` / `lastSyncCompletedAt`.
if lastSync == nil, let legacy = UserDefaults.standard.object(forKey: "lastSync") as? Date {
setLastSync(legacy)
UserDefaults.standard.removeObject(forKey: "lastSync")
}
if lastSyncCompletedAt == nil, let legacy = UserDefaults.standard.object(forKey: "lastSyncCompletedAt") as? Date {
lastSyncCompletedAt = legacy
UserDefaults.standard.set(legacy, forKey: "imessage.lastSyncCompletedAt")
UserDefaults.standard.removeObject(forKey: "lastSyncCompletedAt")
}
}
func openFullDiskAccessSettings() {
if let url = URL(string: "x-apple.systempreferences:com.apple.preference.security?Privacy_AllFiles") {
NSWorkspace.shared.open(url)
}
}
func retryConnection() { startSync() }
// MARK: - Lifecycle hooks
override func didStartSync() {
TraceLog.write("[imessage] IMessageSyncManager.didStartSync — starting sendQueueClient")
sendQueueClient.start()
outboxSendManager.start()
TraceLog.write("[imessage] IMessageSyncManager.didStartSync — sendQueueClient.start() returned")
// Blob sync on its own 5-minute cadence, independent of read cycle.
blobSyncTimer = Timer.scheduledTimer(withTimeInterval: 300, repeats: true) { [weak self] _ in
guard let self else { return }
Task { await self.blobSyncManager.syncBlobs() }
}
}
override func willStopSync() {
sendQueueClient.stop()
outboxSendManager.stop()
blobSyncTimer?.invalidate()
blobSyncTimer = nil
}
// MARK: - Sync cycle
override func performSync() async {
// Lazy one-shot bootstrap: DB connect, schema version reset, contacts.
if !didBootstrap {
activityLog.info("Starting sync service...")
syncError = .none
do {
try imessageReader.connect()
log.info("Connected to iMessage database")
activityLog.success("Connected to iMessage database")
} catch {
let errorMsg = error.localizedDescription.lowercased()
if errorMsg.contains("authorization denied") || errorMsg.contains("error 23") {
syncError = .fullDiskAccessRequired
activityLog.error("Full Disk Access required")
} else if errorMsg.contains("not found") {
syncError = .databaseNotFound
activityLog.error("iMessage database not found")
} else {
syncError = .connectionFailed(error.localizedDescription)
activityLog.error("Connection failed: \(error.localizedDescription)")
}
return
}
let didReset = await checkSchemaVersionAndReset()
if didReset { activityLog.info("Schema version updated, resync required") }
activityLog.info("Loading contacts...")
let contactsLoaded = await imessageReader.loadContacts()
contactSyncInfo.hasContactsPermission = contactsLoaded
if contactsLoaded {
await syncContacts()
} else {
activityLog.warning("Contacts permission not granted - names won't sync")
}
await fetchStats()
didBootstrap = true
}
// Outbound send queue used to be invoked inline here. It now runs on
// its own 30s timer via `sendQueueClient` started from
// `didStartSync()`. We still flush once at the top of every read
// cycle so a manual `syncNow()` drains pending sends promptly.
await sendQueueClient.drainOnce()
// Initial-load path: server has nothing, stream a metadata-only batch
// import (attachments without blob bytes). Subsequent cycles use the
// standard incremental path with full attachment data.
// Re-enter performInitialSync whenever the server is empty. The previous
// gate flipped `isInitialLoad = false` after the first run regardless of
// success a partial-failure run would then route to the incremental
// path on the next cycle even though the server still had zero messages,
// making the missing data unrecoverable without a manual restart.
if stats.messageCount == 0 {
log.info("Server empty — entering initial-batch sync path")
activityLog.info("First sync detected - using optimized batch endpoint")
await performInitialSync()
isInitialLoad = false
} else {
isInitialLoad = false
await runReadCycle()
}
// Blob pass runs OUT OF BAND never awaited on the read cycle. The
// missing-attachments endpoint has no cursor, so a failing blob backend
// returns the same full page every fetch; awaiting that here held the
// base `isSyncing` flag indefinitely and starved message ingestion (every
// 30s read-timer tick got swallowed by syncNow's guard). Fire it detached,
// guarded so passes don't overlap.
if !blobSyncInFlight {
blobSyncInFlight = true
Task { [weak self, blobSyncManager] in
await blobSyncManager.syncBlobs()
self?.blobSyncInFlight = false
}
}
}
private func performInitialSync() async {
log.info("performInitialSync starting - streaming batch mode")
activityLog.info("Starting initial batch sync...")
var latestMessageDate: Date?
var totalSynced = 0
var totalFailed = 0
do {
let conversations = try imessageReader.getConversations()
log.info("Found \(conversations.count) conversations for batch sync")
activityLog.info("Found \(conversations.count) conversations")
// Stream in windows of maxConversationsPerBatch so we never hold
// the full history in memory at once.
var window: [SyncMessagesPayload] = []
var windowMessages = 0
var convIndex = 0
func flushWindow() async {
guard !window.isEmpty else { return }
let batchMsgs = windowMessages
activityLog.info("Sending \(window.count) convs / \(batchMsgs) msgs (\(convIndex)/\(conversations.count))...")
do {
let sent = try await apiClient.syncMessagesBatch(window)
totalSynced += sent
log.info("Batch sent: \(sent) messages (running total: \(totalSynced))")
} catch {
totalFailed += window.count
let detail = (error as NSError).userInfo.description
log.warning("Batch failed (\(window.count) convs, \(batchMsgs) msgs): \(error.localizedDescription) | \(detail)")
activityLog.warning("Batch failed (\(window.count) convs): \(error.localizedDescription)")
}
window.removeAll()
windowMessages = 0
await Task.yield()
}
for conversation in conversations {
convIndex += 1
if convIndex % 50 == 0 {
log.info("initial-sync progress: \(convIndex)/\(conversations.count) iterated, synced=\(totalSynced) failed=\(totalFailed)")
activityLog.info("Progress: \(convIndex)/\(conversations.count) (synced \(totalSynced) msgs)")
}
do {
let messages = try imessageReader.getMessages(conversationId: conversation.id, since: nil)
if messages.isEmpty { continue }
for msg in messages {
if latestMessageDate == nil || msg.date > latestMessageDate! {
latestMessageDate = msg.date
}
}
let allPayloadMessages = messages.map { msg in
SyncMessagePayload(
imessageGuid: msg.guid,
senderId: msg.senderId,
direction: msg.isFromMe ? "outgoing" : "incoming",
sentAt: ISO8601DateFormatter().string(from: msg.date),
deliveredAt: msg.dateDelivered.map { ISO8601DateFormatter().string(from: $0) },
readAt: msg.dateRead.map { ISO8601DateFormatter().string(from: $0) },
text: msg.text,
attributedBody: nil, // stripped in initial sync reduces payload 10-50x
associatedMessageType: msg.associatedMessageType,
associatedMessageGuid: msg.associatedMessageGuid,
isAudioMessage: msg.isAudioMessage,
expressiveSendStyleId: msg.expressiveSendStyleId,
replyToGuid: msg.replyToGuid,
threadOriginatorGuid: msg.threadOriginatorGuid,
groupTitle: msg.groupTitle,
balloonBundleId: msg.balloonBundleId,
service: msg.service,
senderIdentifier: msg.senderIdentifier,
senderDisplayName: msg.senderDisplayName,
senderPhoneNumber: msg.senderPhoneNumber,
senderEmail: msg.senderEmail,
attachments: msg.attachments.map { att in
SyncAttachmentPayload(
filename: att.filename,
mimeType: att.mimeType,
transferName: att.transferName,
size: att.size,
data: nil // metadata-only in initial sync; blobs synced separately
)
},
attachmentsCount: msg.attachmentsCount,
attachmentsTotalSize: msg.attachmentsTotalSize,
attachmentsFiletypes: msg.attachmentsFiletypes
)
}
// Split oversized conversations so no single payload exceeds the message limit.
let msgLimit = ChunkingStrategy.maxMessagesPerBatch
let slices = stride(from: 0, to: allPayloadMessages.count, by: msgLimit).map {
Array(allPayloadMessages[$0..<min($0 + msgLimit, allPayloadMessages.count)])
}
for slice in slices {
let payload = SyncMessagesPayload(
conversationImessageId: conversation.id,
conversationDisplayName: conversation.displayName,
isGroup: conversation.isGroup,
participantIds: conversation.participantIds,
messages: slice
)
window.append(payload)
windowMessages += slice.count
if window.count >= ChunkingStrategy.maxConversationsPerBatch ||
windowMessages >= ChunkingStrategy.maxMessagesPerBatch {
await flushWindow()
}
}
} catch {
log.warning("Failed to read conversation '\(conversation.displayName)': \(error.localizedDescription)")
activityLog.warning("Skipped \(conversation.displayName)")
}
}
// Flush any remaining conversations.
await flushWindow()
log.info("Initial sync complete - \(totalSynced) messages synced, \(totalFailed) conversations failed")
// Only advance the watermark when every batch was accepted. A partial
// initial sync that advances the watermark would mask the missing
// conversations forever the regular incremental path filters by
// `date > lastSync` and would never retry them. Leaving the watermark
// unset means the next cycle re-enters performInitialSync (gated on
// server-side message count == 0) only after the operator truncates;
// for partial-failure mid-run we want the user to clear and retry,
// or for the loop to retry on the next syncNow.
if totalFailed == 0, let newDate = latestMessageDate {
setLastSync(newDate)
activityLog.success("Initial sync complete: \(totalSynced) messages synced")
} else if totalFailed > 0 {
log.warning("Initial sync partial — watermark NOT advanced (\(totalFailed) convs failed). Re-run will retry.")
activityLog.warning("Initial sync partial: \(totalSynced) synced, \(totalFailed) failed — will retry")
} else {
activityLog.success("Initial sync complete: \(totalSynced) messages synced")
}
await fetchStats()
} catch {
log.warning("Initial sync failed: \(error.localizedDescription)")
activityLog.error("Initial sync failed: \(error.localizedDescription)")
}
isResyncInProgress = false
}
private func runReadCycle() async {
log.info("performSync starting")
activityLog.info("Starting sync...")
var runningMessageCount = stats.messageCount
var runningConversationCount = stats.conversationCount
var conversationsWithNewMessages = Set<String>()
do {
let conversations = try imessageReader.getConversations()
log.info("Found \(conversations.count) conversations")
activityLog.info("Found \(conversations.count) conversations")
var totalSynced = 0
var failedConversations = 0
// Watermark advances to the newest synced message *date* (data time),
// never wall-clock start from the current watermark.
var latestMessageDate: Date? = lastSync
// Re-read from `watermark overlap` so messages below the watermark
// (out-of-order / late / skewed) are always re-scanned; upsert dedupes.
let readSince = lastSync.map { $0.addingTimeInterval(-Self.incrementalOverlapSeconds) }
for conversation in conversations {
do {
let messages = try imessageReader.getMessages(
conversationId: conversation.id,
since: readSince
)
if messages.isEmpty { continue }
for m in messages where latestMessageDate == nil || m.date > latestMessageDate! {
latestMessageDate = m.date
}
let chunks = ChunkingStrategy.createAdaptiveChunks(messages)
var conversationSynced = 0
for (chunkIndex, chunk) in chunks.enumerated() {
let result = try await syncChunkWithRetry(chunk, conversation: conversation)
conversationSynced += result.synced
totalSynced += result.synced
if chunks.count > 1 {
log.info("Synced chunk \(chunkIndex + 1)/\(chunks.count) (\(result.synced) messages)")
}
if result.synced > 0 {
runningMessageCount += result.synced
if !conversationsWithNewMessages.contains(conversation.id) {
conversationsWithNewMessages.insert(conversation.id)
runningConversationCount += 1
}
stats = SyncStats(
messageCount: runningMessageCount,
conversationCount: runningConversationCount,
contactCount: stats.contactCount
)
}
}
if conversationSynced > 0 {
activityLog.info("Synced \(conversationSynced) messages from \(conversation.displayName)")
}
} catch {
failedConversations += 1
log.warning("Failed to sync '\(conversation.displayName)': \(error.localizedDescription)")
activityLog.warning("Failed \(conversation.displayName): \(error.localizedDescription)")
}
}
if failedConversations > 0 {
if totalSynced > 0 {
activityLog.warning("Sync partial: \(totalSynced) messages, \(failedConversations) conversations failed")
} else {
activityLog.error("Sync failed: \(failedConversations) conversations had errors")
}
} else if totalSynced > 0 {
activityLog.success("Sync complete: \(totalSynced) new messages")
} else {
activityLog.success("Sync complete: no new messages")
}
// Advance the watermark to the newest synced message date data time,
// not wall-clock and only on a clean cycle so a partial failure
// retries the gap next time instead of skipping past it. Monotonic:
// never moves the watermark backwards.
if failedConversations == 0, let newDate = latestMessageDate,
lastSync == nil || newDate > lastSync! {
setLastSync(newDate)
}
await fetchStats()
} catch {
log.warning("Sync failed: \(error.localizedDescription)")
activityLog.error("Sync failed: \(error.localizedDescription)")
}
isResyncInProgress = false
}
private func checkSchemaVersionAndReset() async -> Bool {
let storedVersion = UserDefaults.standard.integer(forKey: "syncSchemaVersion")
let currentVersion = Self.syncSchemaVersion
log.info("Schema version check - stored: \(storedVersion), current: \(currentVersion)")
if storedVersion != currentVersion {
if storedVersion > 0 { await performReset() }
UserDefaults.standard.set(currentVersion, forKey: "syncSchemaVersion")
return storedVersion > 0
}
return false
}
private func syncContacts() async {
activityLog.info("Syncing contacts...")
do {
let contacts = imessageReader.getAllContacts()
guard !contacts.isEmpty else {
activityLog.info("No contacts from address book")
contactSyncInfo.loadedFromAddressBook = 0
contactSyncInfo.sampleNames = []
return
}
contactSyncInfo.loadedFromAddressBook = contacts.count
contactSyncInfo.sampleNames = Array(contacts.prefix(5).map { $0.displayName })
let dateFormatter = ISO8601DateFormatter()
let payloads = contacts.map { contact in
SyncContactPayload(
appleId: nil,
phoneNumber: contact.phoneNumber,
email: contact.email,
displayName: contact.displayName,
avatarHash: nil,
birthday: contact.birthday.map { dateFormatter.string(from: $0) }
)
}
let synced = try await apiClient.syncContacts(payloads)
contactSyncInfo.syncedToServer = synced
let sampleStr = contactSyncInfo.sampleNames.prefix(3).joined(separator: ", ")
if !sampleStr.isEmpty {
activityLog.success("Synced \(synced) contacts: \(sampleStr)...")
} else {
activityLog.success("Synced \(synced) contacts")
}
} catch {
log.warning("syncContacts failed: \(error.localizedDescription)")
activityLog.error("Contact sync failed")
}
}
private func fetchStats() async {
do {
let response = try await apiClient.getStats()
stats = SyncStats(
messageCount: response.totalMessages,
conversationCount: response.totalConversations,
contactCount: response.totalContacts
)
if let serverLastSync = response.lastSyncAt, lastSync == nil {
setLastSync(serverLastSync)
}
} catch {
log.warning("fetchStats failed: \(error.localizedDescription)")
}
}
private func syncChunkWithRetry(
_ chunk: [iMessage],
conversation: iMessageConversation
) async throws -> ChunkSyncResult {
do {
let synced = try await syncSingleChunk(chunk, conversation: conversation)
return ChunkSyncResult(synced: synced)
} catch let error as APIError {
if case .serverError(_, let message) = error, (message ?? "").contains("Payload too large") {
if chunk.count == 1 {
let msg = chunk[0]
let sizeInfo = msg.attachmentsTotalSize > 0 ? "(\(msg.attachmentsTotalSize) bytes)" : "(no attachments)"
let label = msg.attachments.first?.filename ?? msg.guid
throw APIError.serverError(statusCode: 413, message: "Message too large: \(label) \(sizeInfo)")
}
let midpoint = chunk.count / 2
let firstResult = try await syncChunkWithRetry(Array(chunk[0..<midpoint]), conversation: conversation)
let secondResult = try await syncChunkWithRetry(Array(chunk[midpoint..<chunk.count]), conversation: conversation)
return ChunkSyncResult(synced: firstResult.synced + secondResult.synced)
}
throw error
}
}
private func syncSingleChunk(
_ chunk: [iMessage],
conversation: iMessageConversation
) async throws -> Int {
let payload = SyncMessagesPayload(
conversationImessageId: conversation.id,
conversationDisplayName: conversation.displayName,
isGroup: conversation.isGroup,
participantIds: conversation.participantIds,
messages: chunk.map { msg in
SyncMessagePayload(
imessageGuid: msg.guid,
senderId: msg.senderId,
direction: msg.isFromMe ? "outgoing" : "incoming",
sentAt: ISO8601DateFormatter().string(from: msg.date),
deliveredAt: msg.dateDelivered.map { ISO8601DateFormatter().string(from: $0) },
readAt: msg.dateRead.map { ISO8601DateFormatter().string(from: $0) },
text: msg.text,
attributedBody: msg.attributedBody,
associatedMessageType: msg.associatedMessageType,
associatedMessageGuid: msg.associatedMessageGuid,
isAudioMessage: msg.isAudioMessage,
expressiveSendStyleId: msg.expressiveSendStyleId,
replyToGuid: msg.replyToGuid,
threadOriginatorGuid: msg.threadOriginatorGuid,
groupTitle: msg.groupTitle,
balloonBundleId: msg.balloonBundleId,
service: msg.service,
senderIdentifier: msg.senderIdentifier,
senderDisplayName: msg.senderDisplayName,
senderPhoneNumber: msg.senderPhoneNumber,
senderEmail: msg.senderEmail,
attachments: msg.attachments.map { att in
SyncAttachmentPayload(
filename: att.filename,
mimeType: att.mimeType,
transferName: att.transferName,
size: att.size,
data: att.data
)
},
attachmentsCount: msg.attachmentsCount,
attachmentsTotalSize: msg.attachmentsTotalSize,
attachmentsFiletypes: msg.attachmentsFiletypes
)
}
)
return try await apiClient.syncMessages(payload)
}
// MARK: - Reset
func resetAndResync() {
guard !isResetting && !isSyncing else { return }
Task { await performReset() }
}
private func performReset() async {
log.info("performReset starting")
activityLog.info("Starting full reset...")
isResetting = true
isResyncInProgress = true
let previousLastSync = lastSync
do {
let result = try await apiClient.resetSync()
activityLog.info("Cleared \(result.deletedMessages) messages, \(result.deletedConversations) conversations")
setLastSync(nil)
isInitialLoad = true
isResetting = false
await performSyncForResync()
} catch {
log.warning("Reset failed: \(error.localizedDescription)")
activityLog.error("Reset failed: \(error.localizedDescription)")
if let previous = previousLastSync {
setLastSync(previous)
}
isResetting = false
isResyncInProgress = false
}
}
private func performSyncForResync() async {
log.info("performSyncForResync starting")
activityLog.info("Starting full resync...")
isSyncing = true
var runningMessageCount = 0
var runningConversationCount = 0
var conversationsWithNewMessages = Set<String>()
stats = SyncStats()
do {
let conversations = try imessageReader.getConversations()
activityLog.info("Found \(conversations.count) conversations")
var totalSynced = 0
var failedConversations = 0
var latestMessageDate: Date?
for conversation in conversations {
do {
let messages = try imessageReader.getMessages(conversationId: conversation.id, since: nil)
if messages.isEmpty { continue }
for m in messages where latestMessageDate == nil || m.date > latestMessageDate! {
latestMessageDate = m.date
}
let chunks = ChunkingStrategy.createAdaptiveChunks(messages)
var conversationSynced = 0
for (chunkIndex, chunk) in chunks.enumerated() {
let result = try await syncChunkWithRetry(chunk, conversation: conversation)
conversationSynced += result.synced
totalSynced += result.synced
if chunks.count > 1 {
log.info("Resynced chunk \(chunkIndex + 1)/\(chunks.count) (\(result.synced) messages)")
}
if result.synced > 0 {
runningMessageCount += result.synced
if !conversationsWithNewMessages.contains(conversation.id) {
conversationsWithNewMessages.insert(conversation.id)
runningConversationCount += 1
}
stats = SyncStats(
messageCount: runningMessageCount,
conversationCount: runningConversationCount,
contactCount: stats.contactCount
)
}
}
if conversationSynced > 0 {
activityLog.info("Resynced \(conversationSynced) messages from \(conversation.displayName)")
}
} catch {
failedConversations += 1
activityLog.warning("Failed \(conversation.displayName): \(error.localizedDescription)")
}
}
// Read watermark = newest synced message date (data time), never
// wall-clock otherwise messages older than the resync clock are
// filtered out on the next incremental cycle and silently lost.
// `lastSyncCompletedAt` is a UI "when did we last run" stamp, so
// wall-clock is correct there.
let completedAt = Date()
if let newDate = latestMessageDate {
setLastSync(newDate)
}
lastSyncCompletedAt = completedAt
UserDefaults.standard.set(completedAt, forKey: "imessage.lastSyncCompletedAt")
if failedConversations > 0 {
activityLog.warning("Resync partial: \(totalSynced) messages, \(failedConversations) failed")
} else {
activityLog.success("Resync complete: \(totalSynced) messages")
}
await fetchStats()
} catch {
activityLog.error("Resync failed: \(error.localizedDescription)")
}
isSyncing = false
isResyncInProgress = false
}
}