704 lines
30 KiB
Swift
704 lines
30 KiB
Swift
import AppKit
|
|
import Combine
|
|
import Foundation
|
|
import LilithAgentCore
|
|
import LilithLogging
|
|
import MacSyncShared
|
|
|
|
private let log = AppLogger.logger(for: "IMessage.Sync")
|
|
|
|
struct SyncStats: Equatable, Sendable {
|
|
var messageCount: Int = 0
|
|
var conversationCount: Int = 0
|
|
var contactCount: Int = 0
|
|
}
|
|
|
|
struct ChunkSyncResult {
|
|
let synced: Int
|
|
}
|
|
|
|
struct ContactSyncInfo {
|
|
var loadedFromAddressBook: Int = 0
|
|
var syncedToServer: Int = 0
|
|
var sampleNames: [String] = []
|
|
var hasContactsPermission: Bool = false
|
|
}
|
|
|
|
enum SyncError: Equatable, Sendable {
|
|
case none
|
|
case fullDiskAccessRequired
|
|
case databaseNotFound
|
|
case connectionFailed(String)
|
|
|
|
var message: String {
|
|
switch self {
|
|
case .none: return ""
|
|
case .fullDiskAccessRequired: return "Full Disk Access required"
|
|
case .databaseNotFound: return "iMessage database not found"
|
|
case .connectionFailed(let msg): return msg
|
|
}
|
|
}
|
|
}
|
|
|
|
@MainActor
|
|
final class SyncManager: BaseSyncManager<SyncStats, SyncError> {
|
|
static let shared = SyncManager()
|
|
|
|
static let syncSchemaVersion = 3
|
|
|
|
// Module-specific @Published state (base owns isSyncing/lastSyncCompletedAt/
|
|
// currentOperation/syncError/stats).
|
|
@Published var contactSyncInfo = ContactSyncInfo()
|
|
@Published var isResetting = false
|
|
@Published var isResyncInProgress = false
|
|
|
|
private let imessageReader = iMessageReader.shared
|
|
private let apiClient = APIClient.shared
|
|
private let sendService = SendService.shared
|
|
private let activityLog = ActivityLog.shared
|
|
private let blobSyncManager: BlobSyncManager
|
|
|
|
/// First-cycle bootstrap (DB connect, schema-version reset, contacts load) is
|
|
/// done lazily inside `performSync` since the base's `startSync` is final.
|
|
private var didBootstrap = false
|
|
|
|
/// True until the first inbound cycle has chosen a code path (initial batch
|
|
/// vs incremental). Reset to true after a `resetAndResync`.
|
|
private var isInitialLoad = true
|
|
|
|
/// Separate timer for blob uploads. Runs every 5 minutes, independent of
|
|
/// the inbound read cycle and the outbound send queue.
|
|
private var blobSyncTimer: Timer?
|
|
|
|
/// Outbound send-queue runner. Polls the server's legacy
|
|
/// `icloud.send_queue` table via `IMessageSendTransport`, delegates the
|
|
/// actual AppleScript send + rate-limit + delivery-tracking to
|
|
/// `SendService.shared`, and acks the result. Owns its own 30s timer,
|
|
/// independent of the read cycle, so an outbound queue with backlog
|
|
/// drains promptly even when no inbound sync runs.
|
|
private lazy var sendQueueClient: SendQueueClient<IMessageSendTransport> = {
|
|
let transport = IMessageSendTransport(apiClient: apiClient)
|
|
let service = sendService
|
|
let activity = activityLog
|
|
return SendQueueClient(label: "imessage", transport: transport, interval: 30) { message in
|
|
let result = service.send(recipient: message.toHandle, body: message.body)
|
|
let suffix = String(message.toHandle.prefix(4))
|
|
if result.success {
|
|
activity.success("Sent message to \(suffix)...")
|
|
return .sent
|
|
} else {
|
|
let reason = result.error ?? "Unknown error"
|
|
activity.error("Failed to send to \(suffix)...: \(reason)")
|
|
return .failed(reason: reason)
|
|
}
|
|
}
|
|
}()
|
|
|
|
private init() {
|
|
self.blobSyncManager = BlobSyncManager(apiClient: APIClient.shared, activityLog: ActivityLog.shared)
|
|
|
|
super.init(
|
|
initialStats: SyncStats(),
|
|
noError: .none,
|
|
persistenceKey: "imessage",
|
|
timerInterval: 30
|
|
)
|
|
|
|
// Migrate legacy UserDefaults watermark keys (one-time). iMessage used
|
|
// unprefixed `lastSync` / `lastSyncCompletedAt`.
|
|
if lastSync == nil, let legacy = UserDefaults.standard.object(forKey: "lastSync") as? Date {
|
|
setLastSync(legacy)
|
|
UserDefaults.standard.removeObject(forKey: "lastSync")
|
|
}
|
|
if lastSyncCompletedAt == nil, let legacy = UserDefaults.standard.object(forKey: "lastSyncCompletedAt") as? Date {
|
|
lastSyncCompletedAt = legacy
|
|
UserDefaults.standard.set(legacy, forKey: "imessage.lastSyncCompletedAt")
|
|
UserDefaults.standard.removeObject(forKey: "lastSyncCompletedAt")
|
|
}
|
|
}
|
|
|
|
func openFullDiskAccessSettings() {
|
|
if let url = URL(string: "x-apple.systempreferences:com.apple.preference.security?Privacy_AllFiles") {
|
|
NSWorkspace.shared.open(url)
|
|
}
|
|
}
|
|
|
|
func retryConnection() { startSync() }
|
|
|
|
// MARK: - Lifecycle hooks
|
|
|
|
override func didStartSync() {
|
|
sendQueueClient.start()
|
|
// Blob sync on its own 5-minute cadence, independent of read cycle.
|
|
blobSyncTimer = Timer.scheduledTimer(withTimeInterval: 300, repeats: true) { [weak self] _ in
|
|
guard let self else { return }
|
|
Task { await self.blobSyncManager.syncBlobs() }
|
|
}
|
|
}
|
|
|
|
override func willStopSync() {
|
|
sendQueueClient.stop()
|
|
blobSyncTimer?.invalidate()
|
|
blobSyncTimer = nil
|
|
}
|
|
|
|
// MARK: - Sync cycle
|
|
|
|
override func performSync() async {
|
|
// Lazy one-shot bootstrap: DB connect, schema version reset, contacts.
|
|
if !didBootstrap {
|
|
activityLog.info("Starting sync service...")
|
|
syncError = .none
|
|
|
|
do {
|
|
try imessageReader.connect()
|
|
log.info("Connected to iMessage database")
|
|
activityLog.success("Connected to iMessage database")
|
|
} catch {
|
|
let errorMsg = error.localizedDescription.lowercased()
|
|
if errorMsg.contains("authorization denied") || errorMsg.contains("error 23") {
|
|
syncError = .fullDiskAccessRequired
|
|
activityLog.error("Full Disk Access required")
|
|
} else if errorMsg.contains("not found") {
|
|
syncError = .databaseNotFound
|
|
activityLog.error("iMessage database not found")
|
|
} else {
|
|
syncError = .connectionFailed(error.localizedDescription)
|
|
activityLog.error("Connection failed: \(error.localizedDescription)")
|
|
}
|
|
return
|
|
}
|
|
|
|
let didReset = await checkSchemaVersionAndReset()
|
|
if didReset { activityLog.info("Schema version updated, resync required") }
|
|
|
|
activityLog.info("Loading contacts...")
|
|
let contactsLoaded = await imessageReader.loadContacts()
|
|
contactSyncInfo.hasContactsPermission = contactsLoaded
|
|
|
|
if contactsLoaded {
|
|
await syncContacts()
|
|
} else {
|
|
activityLog.warning("Contacts permission not granted - names won't sync")
|
|
}
|
|
|
|
await fetchStats()
|
|
didBootstrap = true
|
|
}
|
|
|
|
// Outbound send queue used to be invoked inline here. It now runs on
|
|
// its own 30s timer via `sendQueueClient` started from
|
|
// `didStartSync()`. We still flush once at the top of every read
|
|
// cycle so a manual `syncNow()` drains pending sends promptly.
|
|
await sendQueueClient.drainOnce()
|
|
|
|
// Initial-load path: server has nothing, stream a metadata-only batch
|
|
// import (attachments without blob bytes). Subsequent cycles use the
|
|
// standard incremental path with full attachment data.
|
|
if isInitialLoad && stats.messageCount == 0 {
|
|
log.info("Initial load detected - using batch sync")
|
|
activityLog.info("First sync detected - using optimized batch endpoint")
|
|
await performInitialSync()
|
|
isInitialLoad = false
|
|
} else {
|
|
isInitialLoad = false
|
|
await runReadCycle()
|
|
}
|
|
|
|
// Blob pass runs after every read cycle; uploads any messages whose
|
|
// attachment bytes haven't been pushed yet.
|
|
await blobSyncManager.syncBlobs()
|
|
}
|
|
|
|
private func performInitialSync() async {
|
|
log.info("performInitialSync starting - streaming batch mode")
|
|
activityLog.info("Starting initial batch sync...")
|
|
|
|
var latestMessageDate: Date?
|
|
var totalSynced = 0
|
|
var totalFailed = 0
|
|
|
|
do {
|
|
let conversations = try imessageReader.getConversations()
|
|
log.info("Found \(conversations.count) conversations for batch sync")
|
|
activityLog.info("Found \(conversations.count) conversations")
|
|
|
|
// Stream in windows of maxConversationsPerBatch so we never hold
|
|
// the full history in memory at once.
|
|
var window: [SyncMessagesPayload] = []
|
|
var windowMessages = 0
|
|
var convIndex = 0
|
|
|
|
func flushWindow() async {
|
|
guard !window.isEmpty else { return }
|
|
let batchMsgs = windowMessages
|
|
activityLog.info("Sending \(window.count) convs / \(batchMsgs) msgs (\(convIndex)/\(conversations.count))...")
|
|
do {
|
|
let sent = try await apiClient.syncMessagesBatch(window)
|
|
totalSynced += sent
|
|
log.info("Batch sent: \(sent) messages (running total: \(totalSynced))")
|
|
} catch {
|
|
totalFailed += window.count
|
|
let detail = (error as NSError).userInfo.description
|
|
log.warning("Batch failed (\(window.count) convs, \(batchMsgs) msgs): \(error.localizedDescription) | \(detail)")
|
|
activityLog.warning("Batch failed (\(window.count) convs): \(error.localizedDescription)")
|
|
}
|
|
window.removeAll()
|
|
windowMessages = 0
|
|
await Task.yield()
|
|
}
|
|
|
|
for conversation in conversations {
|
|
convIndex += 1
|
|
if convIndex % 50 == 0 {
|
|
log.info("initial-sync progress: \(convIndex)/\(conversations.count) iterated, synced=\(totalSynced) failed=\(totalFailed)")
|
|
activityLog.info("Progress: \(convIndex)/\(conversations.count) (synced \(totalSynced) msgs)")
|
|
}
|
|
do {
|
|
let messages = try imessageReader.getMessages(conversationId: conversation.id, since: nil)
|
|
if messages.isEmpty { continue }
|
|
|
|
for msg in messages {
|
|
if latestMessageDate == nil || msg.date > latestMessageDate! {
|
|
latestMessageDate = msg.date
|
|
}
|
|
}
|
|
|
|
let allPayloadMessages = messages.map { msg in
|
|
SyncMessagePayload(
|
|
imessageGuid: msg.guid,
|
|
senderId: msg.senderId,
|
|
direction: msg.isFromMe ? "outgoing" : "incoming",
|
|
sentAt: ISO8601DateFormatter().string(from: msg.date),
|
|
deliveredAt: msg.dateDelivered.map { ISO8601DateFormatter().string(from: $0) },
|
|
readAt: msg.dateRead.map { ISO8601DateFormatter().string(from: $0) },
|
|
text: msg.text,
|
|
attributedBody: nil, // stripped in initial sync — reduces payload 10-50x
|
|
associatedMessageType: msg.associatedMessageType,
|
|
associatedMessageGuid: msg.associatedMessageGuid,
|
|
isAudioMessage: msg.isAudioMessage,
|
|
expressiveSendStyleId: msg.expressiveSendStyleId,
|
|
replyToGuid: msg.replyToGuid,
|
|
threadOriginatorGuid: msg.threadOriginatorGuid,
|
|
groupTitle: msg.groupTitle,
|
|
balloonBundleId: msg.balloonBundleId,
|
|
service: msg.service,
|
|
senderIdentifier: msg.senderIdentifier,
|
|
senderDisplayName: msg.senderDisplayName,
|
|
senderPhoneNumber: msg.senderPhoneNumber,
|
|
senderEmail: msg.senderEmail,
|
|
attachments: msg.attachments.map { att in
|
|
SyncAttachmentPayload(
|
|
filename: att.filename,
|
|
mimeType: att.mimeType,
|
|
transferName: att.transferName,
|
|
size: att.size,
|
|
data: nil // metadata-only in initial sync; blobs synced separately
|
|
)
|
|
},
|
|
attachmentsCount: msg.attachmentsCount,
|
|
attachmentsTotalSize: msg.attachmentsTotalSize,
|
|
attachmentsFiletypes: msg.attachmentsFiletypes
|
|
)
|
|
}
|
|
|
|
// Split oversized conversations so no single payload exceeds the message limit.
|
|
let msgLimit = ChunkingStrategy.maxMessagesPerBatch
|
|
let slices = stride(from: 0, to: allPayloadMessages.count, by: msgLimit).map {
|
|
Array(allPayloadMessages[$0..<min($0 + msgLimit, allPayloadMessages.count)])
|
|
}
|
|
|
|
for slice in slices {
|
|
let payload = SyncMessagesPayload(
|
|
conversationImessageId: conversation.id,
|
|
conversationDisplayName: conversation.displayName,
|
|
isGroup: conversation.isGroup,
|
|
participantIds: conversation.participantIds,
|
|
messages: slice
|
|
)
|
|
window.append(payload)
|
|
windowMessages += slice.count
|
|
|
|
if window.count >= ChunkingStrategy.maxConversationsPerBatch ||
|
|
windowMessages >= ChunkingStrategy.maxMessagesPerBatch {
|
|
await flushWindow()
|
|
}
|
|
}
|
|
} catch {
|
|
log.warning("Failed to read conversation '\(conversation.displayName)': \(error.localizedDescription)")
|
|
activityLog.warning("Skipped \(conversation.displayName)")
|
|
}
|
|
}
|
|
|
|
// Flush any remaining conversations.
|
|
await flushWindow()
|
|
|
|
log.info("Initial sync complete - \(totalSynced) messages synced, \(totalFailed) conversations failed")
|
|
|
|
// Only advance the watermark when every batch was accepted. A partial
|
|
// initial sync that advances the watermark would mask the missing
|
|
// conversations forever — the regular incremental path filters by
|
|
// `date > lastSync` and would never retry them. Leaving the watermark
|
|
// unset means the next cycle re-enters performInitialSync (gated on
|
|
// server-side message count == 0) only after the operator truncates;
|
|
// for partial-failure mid-run we want the user to clear and retry,
|
|
// or for the loop to retry on the next syncNow.
|
|
if totalFailed == 0, let newDate = latestMessageDate {
|
|
setLastSync(newDate)
|
|
activityLog.success("Initial sync complete: \(totalSynced) messages synced")
|
|
} else if totalFailed > 0 {
|
|
log.warning("Initial sync partial — watermark NOT advanced (\(totalFailed) convs failed). Re-run will retry.")
|
|
activityLog.warning("Initial sync partial: \(totalSynced) synced, \(totalFailed) failed — will retry")
|
|
} else {
|
|
activityLog.success("Initial sync complete: \(totalSynced) messages synced")
|
|
}
|
|
await fetchStats()
|
|
|
|
} catch {
|
|
log.warning("Initial sync failed: \(error.localizedDescription)")
|
|
activityLog.error("Initial sync failed: \(error.localizedDescription)")
|
|
}
|
|
|
|
isResyncInProgress = false
|
|
}
|
|
|
|
private func runReadCycle() async {
|
|
log.info("performSync starting")
|
|
activityLog.info("Starting sync...")
|
|
|
|
var runningMessageCount = stats.messageCount
|
|
var runningConversationCount = stats.conversationCount
|
|
var conversationsWithNewMessages = Set<String>()
|
|
|
|
do {
|
|
let conversations = try imessageReader.getConversations()
|
|
log.info("Found \(conversations.count) conversations")
|
|
activityLog.info("Found \(conversations.count) conversations")
|
|
var totalSynced = 0
|
|
var failedConversations = 0
|
|
|
|
for conversation in conversations {
|
|
do {
|
|
let messages = try imessageReader.getMessages(
|
|
conversationId: conversation.id,
|
|
since: lastSync
|
|
)
|
|
|
|
if messages.isEmpty { continue }
|
|
|
|
let chunks = ChunkingStrategy.createAdaptiveChunks(messages)
|
|
var conversationSynced = 0
|
|
|
|
for (chunkIndex, chunk) in chunks.enumerated() {
|
|
let result = try await syncChunkWithRetry(chunk, conversation: conversation)
|
|
conversationSynced += result.synced
|
|
totalSynced += result.synced
|
|
|
|
if chunks.count > 1 {
|
|
log.info("Synced chunk \(chunkIndex + 1)/\(chunks.count) (\(result.synced) messages)")
|
|
}
|
|
|
|
if result.synced > 0 {
|
|
runningMessageCount += result.synced
|
|
if !conversationsWithNewMessages.contains(conversation.id) {
|
|
conversationsWithNewMessages.insert(conversation.id)
|
|
runningConversationCount += 1
|
|
}
|
|
stats = SyncStats(
|
|
messageCount: runningMessageCount,
|
|
conversationCount: runningConversationCount,
|
|
contactCount: stats.contactCount
|
|
)
|
|
}
|
|
}
|
|
|
|
if conversationSynced > 0 {
|
|
activityLog.info("Synced \(conversationSynced) messages from \(conversation.displayName)")
|
|
}
|
|
} catch {
|
|
failedConversations += 1
|
|
log.warning("Failed to sync '\(conversation.displayName)': \(error.localizedDescription)")
|
|
activityLog.warning("Failed \(conversation.displayName): \(error.localizedDescription)")
|
|
}
|
|
}
|
|
|
|
if failedConversations > 0 {
|
|
if totalSynced > 0 {
|
|
activityLog.warning("Sync partial: \(totalSynced) messages, \(failedConversations) conversations failed")
|
|
} else {
|
|
activityLog.error("Sync failed: \(failedConversations) conversations had errors")
|
|
}
|
|
} else if totalSynced > 0 {
|
|
activityLog.success("Sync complete: \(totalSynced) new messages")
|
|
} else {
|
|
activityLog.success("Sync complete: no new messages")
|
|
}
|
|
|
|
await fetchStats()
|
|
|
|
} catch {
|
|
log.warning("Sync failed: \(error.localizedDescription)")
|
|
activityLog.error("Sync failed: \(error.localizedDescription)")
|
|
}
|
|
|
|
isResyncInProgress = false
|
|
}
|
|
|
|
private func checkSchemaVersionAndReset() async -> Bool {
|
|
let storedVersion = UserDefaults.standard.integer(forKey: "syncSchemaVersion")
|
|
let currentVersion = Self.syncSchemaVersion
|
|
|
|
log.info("Schema version check - stored: \(storedVersion), current: \(currentVersion)")
|
|
|
|
if storedVersion != currentVersion {
|
|
if storedVersion > 0 { await performReset() }
|
|
UserDefaults.standard.set(currentVersion, forKey: "syncSchemaVersion")
|
|
return storedVersion > 0
|
|
}
|
|
return false
|
|
}
|
|
|
|
private func syncContacts() async {
|
|
activityLog.info("Syncing contacts...")
|
|
do {
|
|
let contacts = imessageReader.getAllContacts()
|
|
guard !contacts.isEmpty else {
|
|
activityLog.info("No contacts from address book")
|
|
contactSyncInfo.loadedFromAddressBook = 0
|
|
contactSyncInfo.sampleNames = []
|
|
return
|
|
}
|
|
|
|
contactSyncInfo.loadedFromAddressBook = contacts.count
|
|
contactSyncInfo.sampleNames = Array(contacts.prefix(5).map { $0.displayName })
|
|
|
|
let dateFormatter = ISO8601DateFormatter()
|
|
let payloads = contacts.map { contact in
|
|
SyncContactPayload(
|
|
appleId: nil,
|
|
phoneNumber: contact.phoneNumber,
|
|
email: contact.email,
|
|
displayName: contact.displayName,
|
|
avatarHash: nil,
|
|
birthday: contact.birthday.map { dateFormatter.string(from: $0) }
|
|
)
|
|
}
|
|
|
|
let synced = try await apiClient.syncContacts(payloads)
|
|
contactSyncInfo.syncedToServer = synced
|
|
|
|
let sampleStr = contactSyncInfo.sampleNames.prefix(3).joined(separator: ", ")
|
|
if !sampleStr.isEmpty {
|
|
activityLog.success("Synced \(synced) contacts: \(sampleStr)...")
|
|
} else {
|
|
activityLog.success("Synced \(synced) contacts")
|
|
}
|
|
} catch {
|
|
log.warning("syncContacts failed: \(error.localizedDescription)")
|
|
activityLog.error("Contact sync failed")
|
|
}
|
|
}
|
|
|
|
private func fetchStats() async {
|
|
do {
|
|
let response = try await apiClient.getStats()
|
|
stats = SyncStats(
|
|
messageCount: response.totalMessages,
|
|
conversationCount: response.totalConversations,
|
|
contactCount: response.totalContacts
|
|
)
|
|
if let serverLastSync = response.lastSyncAt, lastSync == nil {
|
|
setLastSync(serverLastSync)
|
|
}
|
|
} catch {
|
|
log.warning("fetchStats failed: \(error.localizedDescription)")
|
|
}
|
|
}
|
|
|
|
private func syncChunkWithRetry(
|
|
_ chunk: [iMessage],
|
|
conversation: iMessageConversation
|
|
) async throws -> ChunkSyncResult {
|
|
do {
|
|
let synced = try await syncSingleChunk(chunk, conversation: conversation)
|
|
return ChunkSyncResult(synced: synced)
|
|
} catch let error as APIError {
|
|
if case .serverError(_, let message) = error, (message ?? "").contains("Payload too large") {
|
|
if chunk.count == 1 {
|
|
let msg = chunk[0]
|
|
let sizeInfo = msg.attachmentsTotalSize > 0 ? "(\(msg.attachmentsTotalSize) bytes)" : "(no attachments)"
|
|
let label = msg.attachments.first?.filename ?? msg.guid
|
|
throw APIError.serverError(statusCode: 413, message: "Message too large: \(label) \(sizeInfo)")
|
|
}
|
|
|
|
let midpoint = chunk.count / 2
|
|
let firstResult = try await syncChunkWithRetry(Array(chunk[0..<midpoint]), conversation: conversation)
|
|
let secondResult = try await syncChunkWithRetry(Array(chunk[midpoint..<chunk.count]), conversation: conversation)
|
|
return ChunkSyncResult(synced: firstResult.synced + secondResult.synced)
|
|
}
|
|
throw error
|
|
}
|
|
}
|
|
|
|
private func syncSingleChunk(
|
|
_ chunk: [iMessage],
|
|
conversation: iMessageConversation
|
|
) async throws -> Int {
|
|
let payload = SyncMessagesPayload(
|
|
conversationImessageId: conversation.id,
|
|
conversationDisplayName: conversation.displayName,
|
|
isGroup: conversation.isGroup,
|
|
participantIds: conversation.participantIds,
|
|
messages: chunk.map { msg in
|
|
SyncMessagePayload(
|
|
imessageGuid: msg.guid,
|
|
senderId: msg.senderId,
|
|
direction: msg.isFromMe ? "outgoing" : "incoming",
|
|
sentAt: ISO8601DateFormatter().string(from: msg.date),
|
|
deliveredAt: msg.dateDelivered.map { ISO8601DateFormatter().string(from: $0) },
|
|
readAt: msg.dateRead.map { ISO8601DateFormatter().string(from: $0) },
|
|
text: msg.text,
|
|
attributedBody: msg.attributedBody,
|
|
associatedMessageType: msg.associatedMessageType,
|
|
associatedMessageGuid: msg.associatedMessageGuid,
|
|
isAudioMessage: msg.isAudioMessage,
|
|
expressiveSendStyleId: msg.expressiveSendStyleId,
|
|
replyToGuid: msg.replyToGuid,
|
|
threadOriginatorGuid: msg.threadOriginatorGuid,
|
|
groupTitle: msg.groupTitle,
|
|
balloonBundleId: msg.balloonBundleId,
|
|
service: msg.service,
|
|
senderIdentifier: msg.senderIdentifier,
|
|
senderDisplayName: msg.senderDisplayName,
|
|
senderPhoneNumber: msg.senderPhoneNumber,
|
|
senderEmail: msg.senderEmail,
|
|
attachments: msg.attachments.map { att in
|
|
SyncAttachmentPayload(
|
|
filename: att.filename,
|
|
mimeType: att.mimeType,
|
|
transferName: att.transferName,
|
|
size: att.size,
|
|
data: att.data
|
|
)
|
|
},
|
|
attachmentsCount: msg.attachmentsCount,
|
|
attachmentsTotalSize: msg.attachmentsTotalSize,
|
|
attachmentsFiletypes: msg.attachmentsFiletypes
|
|
)
|
|
}
|
|
)
|
|
|
|
return try await apiClient.syncMessages(payload)
|
|
}
|
|
|
|
// MARK: - Reset
|
|
|
|
func resetAndResync() {
|
|
guard !isResetting && !isSyncing else { return }
|
|
Task { await performReset() }
|
|
}
|
|
|
|
private func performReset() async {
|
|
log.info("performReset starting")
|
|
activityLog.info("Starting full reset...")
|
|
isResetting = true
|
|
isResyncInProgress = true
|
|
|
|
let previousLastSync = lastSync
|
|
|
|
do {
|
|
let result = try await apiClient.resetSync()
|
|
activityLog.info("Cleared \(result.deletedMessages) messages, \(result.deletedConversations) conversations")
|
|
|
|
setLastSync(nil)
|
|
isInitialLoad = true
|
|
isResetting = false
|
|
await performSyncForResync()
|
|
} catch {
|
|
log.warning("Reset failed: \(error.localizedDescription)")
|
|
activityLog.error("Reset failed: \(error.localizedDescription)")
|
|
|
|
if let previous = previousLastSync {
|
|
setLastSync(previous)
|
|
}
|
|
isResetting = false
|
|
isResyncInProgress = false
|
|
}
|
|
}
|
|
|
|
private func performSyncForResync() async {
|
|
log.info("performSyncForResync starting")
|
|
activityLog.info("Starting full resync...")
|
|
isSyncing = true
|
|
|
|
var runningMessageCount = 0
|
|
var runningConversationCount = 0
|
|
var conversationsWithNewMessages = Set<String>()
|
|
stats = SyncStats()
|
|
|
|
do {
|
|
let conversations = try imessageReader.getConversations()
|
|
activityLog.info("Found \(conversations.count) conversations")
|
|
var totalSynced = 0
|
|
var failedConversations = 0
|
|
|
|
for conversation in conversations {
|
|
do {
|
|
let messages = try imessageReader.getMessages(conversationId: conversation.id, since: nil)
|
|
if messages.isEmpty { continue }
|
|
|
|
let chunks = ChunkingStrategy.createAdaptiveChunks(messages)
|
|
var conversationSynced = 0
|
|
|
|
for (chunkIndex, chunk) in chunks.enumerated() {
|
|
let result = try await syncChunkWithRetry(chunk, conversation: conversation)
|
|
conversationSynced += result.synced
|
|
totalSynced += result.synced
|
|
|
|
if chunks.count > 1 {
|
|
log.info("Resynced chunk \(chunkIndex + 1)/\(chunks.count) (\(result.synced) messages)")
|
|
}
|
|
|
|
if result.synced > 0 {
|
|
runningMessageCount += result.synced
|
|
if !conversationsWithNewMessages.contains(conversation.id) {
|
|
conversationsWithNewMessages.insert(conversation.id)
|
|
runningConversationCount += 1
|
|
}
|
|
stats = SyncStats(
|
|
messageCount: runningMessageCount,
|
|
conversationCount: runningConversationCount,
|
|
contactCount: stats.contactCount
|
|
)
|
|
}
|
|
}
|
|
|
|
if conversationSynced > 0 {
|
|
activityLog.info("Resynced \(conversationSynced) messages from \(conversation.displayName)")
|
|
}
|
|
} catch {
|
|
failedConversations += 1
|
|
activityLog.warning("Failed \(conversation.displayName): \(error.localizedDescription)")
|
|
}
|
|
}
|
|
|
|
let newSyncTime = Date()
|
|
setLastSync(newSyncTime)
|
|
lastSyncCompletedAt = newSyncTime
|
|
UserDefaults.standard.set(newSyncTime, forKey: "imessage.lastSyncCompletedAt")
|
|
|
|
if failedConversations > 0 {
|
|
activityLog.warning("Resync partial: \(totalSynced) messages, \(failedConversations) failed")
|
|
} else {
|
|
activityLog.success("Resync complete: \(totalSynced) messages")
|
|
}
|
|
|
|
await fetchStats()
|
|
} catch {
|
|
activityLog.error("Resync failed: \(error.localizedDescription)")
|
|
}
|
|
|
|
isSyncing = false
|
|
isResyncInProgress = false
|
|
}
|
|
}
|