516 lines
20 KiB
Swift
516 lines
20 KiB
Swift
import AppKit
|
|
import Combine
|
|
import Foundation
|
|
import LilithAgentCore
|
|
import LilithLogging
|
|
import MacSyncShared
|
|
|
|
private let log = AppLogger.logger(for: "IMessage.Sync")
|
|
|
|
struct SyncStats: Equatable, Sendable {
|
|
var messageCount: Int = 0
|
|
var conversationCount: Int = 0
|
|
var contactCount: Int = 0
|
|
}
|
|
|
|
struct ChunkSyncResult {
|
|
let synced: Int
|
|
}
|
|
|
|
struct ContactSyncInfo {
|
|
var loadedFromAddressBook: Int = 0
|
|
var syncedToServer: Int = 0
|
|
var sampleNames: [String] = []
|
|
var hasContactsPermission: Bool = false
|
|
}
|
|
|
|
enum SyncError: Equatable, Sendable {
|
|
case none
|
|
case fullDiskAccessRequired
|
|
case databaseNotFound
|
|
case connectionFailed(String)
|
|
|
|
var message: String {
|
|
switch self {
|
|
case .none: return ""
|
|
case .fullDiskAccessRequired: return "Full Disk Access required"
|
|
case .databaseNotFound: return "iMessage database not found"
|
|
case .connectionFailed(let msg): return msg
|
|
}
|
|
}
|
|
}
|
|
|
|
@MainActor
|
|
final class SyncManager: BaseSyncManager<SyncStats, SyncError> {
|
|
static let shared = SyncManager()
|
|
|
|
static let syncSchemaVersion = 2
|
|
|
|
// Module-specific @Published state (base owns isSyncing/lastSyncCompletedAt/
|
|
// currentOperation/syncError/stats).
|
|
@Published var contactSyncInfo = ContactSyncInfo()
|
|
@Published var isResetting = false
|
|
@Published var isResyncInProgress = false
|
|
|
|
private let imessageReader = iMessageReader.shared
|
|
private let apiClient = APIClient.shared
|
|
private let sendService = SendService.shared
|
|
private let activityLog = ActivityLog.shared
|
|
|
|
/// First-cycle bootstrap (DB connect, schema-version reset, contacts load) is
|
|
/// done lazily inside `performSync` since the base's `startSync` is final.
|
|
private var didBootstrap = false
|
|
|
|
/// Outbound send-queue runner. Polls the server's legacy
|
|
/// `icloud.send_queue` table via `IMessageSendTransport`, delegates the
|
|
/// actual AppleScript send + rate-limit + delivery-tracking to
|
|
/// `SendService.shared`, and acks the result. Owns its own 30s timer,
|
|
/// independent of the read cycle, so an outbound queue with backlog
|
|
/// drains promptly even when no inbound sync runs.
|
|
private lazy var sendQueueClient: SendQueueClient<IMessageSendTransport> = {
|
|
let transport = IMessageSendTransport(apiClient: apiClient)
|
|
let service = sendService
|
|
let activity = activityLog
|
|
return SendQueueClient(label: "imessage", transport: transport, interval: 30) { message in
|
|
let result = service.send(recipient: message.phoneNumber, body: message.body)
|
|
let suffix = String(message.phoneNumber.prefix(4))
|
|
if result.success {
|
|
activity.success("Sent message to \(suffix)...")
|
|
return .sent
|
|
} else {
|
|
let reason = result.error ?? "Unknown error"
|
|
activity.error("Failed to send to \(suffix)...: \(reason)")
|
|
return .failed(reason: reason)
|
|
}
|
|
}
|
|
}()
|
|
|
|
private init() {
|
|
super.init(
|
|
initialStats: SyncStats(),
|
|
noError: .none,
|
|
persistenceKey: "imessage",
|
|
timerInterval: 30
|
|
)
|
|
|
|
// Migrate legacy UserDefaults watermark keys (one-time). iMessage used
|
|
// unprefixed `lastSync` / `lastSyncCompletedAt`.
|
|
if lastSync == nil, let legacy = UserDefaults.standard.object(forKey: "lastSync") as? Date {
|
|
setLastSync(legacy)
|
|
UserDefaults.standard.removeObject(forKey: "lastSync")
|
|
}
|
|
if lastSyncCompletedAt == nil, let legacy = UserDefaults.standard.object(forKey: "lastSyncCompletedAt") as? Date {
|
|
lastSyncCompletedAt = legacy
|
|
UserDefaults.standard.set(legacy, forKey: "imessage.lastSyncCompletedAt")
|
|
UserDefaults.standard.removeObject(forKey: "lastSyncCompletedAt")
|
|
}
|
|
}
|
|
|
|
func openFullDiskAccessSettings() {
|
|
if let url = URL(string: "x-apple.systempreferences:com.apple.preference.security?Privacy_AllFiles") {
|
|
NSWorkspace.shared.open(url)
|
|
}
|
|
}
|
|
|
|
func retryConnection() { startSync() }
|
|
|
|
// MARK: - Sync cycle
|
|
|
|
override func performSync() async {
|
|
// Lazy one-shot bootstrap: DB connect, schema version reset, contacts.
|
|
if !didBootstrap {
|
|
activityLog.info("Starting sync service...")
|
|
syncError = .none
|
|
|
|
do {
|
|
try imessageReader.connect()
|
|
log.info("Connected to iMessage database")
|
|
activityLog.success("Connected to iMessage database")
|
|
} catch {
|
|
let errorMsg = error.localizedDescription.lowercased()
|
|
if errorMsg.contains("authorization denied") || errorMsg.contains("error 23") {
|
|
syncError = .fullDiskAccessRequired
|
|
activityLog.error("Full Disk Access required")
|
|
} else if errorMsg.contains("not found") {
|
|
syncError = .databaseNotFound
|
|
activityLog.error("iMessage database not found")
|
|
} else {
|
|
syncError = .connectionFailed(error.localizedDescription)
|
|
activityLog.error("Connection failed: \(error.localizedDescription)")
|
|
}
|
|
return
|
|
}
|
|
|
|
let didReset = await checkSchemaVersionAndReset()
|
|
if didReset { activityLog.info("Schema version updated, resync required") }
|
|
|
|
activityLog.info("Loading contacts...")
|
|
let contactsLoaded = await imessageReader.loadContacts()
|
|
contactSyncInfo.hasContactsPermission = contactsLoaded
|
|
|
|
if contactsLoaded {
|
|
await syncContacts()
|
|
} else {
|
|
activityLog.warning("Contacts permission not granted - names won't sync")
|
|
}
|
|
|
|
await fetchStats()
|
|
didBootstrap = true
|
|
}
|
|
|
|
// Outbound send queue used to be invoked inline here. It now runs on
|
|
// its own 30s timer via `sendQueueClient` started from
|
|
// `didStartSync()`. We still flush once at the top of every read
|
|
// cycle so a manual `syncNow()` drains pending sends promptly.
|
|
await sendQueueClient.drainOnce()
|
|
|
|
await runReadCycle()
|
|
}
|
|
|
|
override func didStartSync() {
|
|
sendQueueClient.start()
|
|
}
|
|
|
|
override func willStopSync() {
|
|
sendQueueClient.stop()
|
|
}
|
|
|
|
private func runReadCycle() async {
|
|
log.info("performSync starting")
|
|
activityLog.info("Starting sync...")
|
|
|
|
var runningMessageCount = stats.messageCount
|
|
var runningConversationCount = stats.conversationCount
|
|
var conversationsWithNewMessages = Set<String>()
|
|
|
|
do {
|
|
let conversations = try imessageReader.getConversations()
|
|
log.info("Found \(conversations.count) conversations")
|
|
activityLog.info("Found \(conversations.count) conversations")
|
|
var totalSynced = 0
|
|
var failedConversations = 0
|
|
|
|
for conversation in conversations {
|
|
do {
|
|
let messages = try imessageReader.getMessages(
|
|
conversationId: conversation.id,
|
|
since: lastSync
|
|
)
|
|
|
|
if messages.isEmpty { continue }
|
|
|
|
let chunks = ChunkingStrategy.createAdaptiveChunks(messages)
|
|
var conversationSynced = 0
|
|
|
|
for (chunkIndex, chunk) in chunks.enumerated() {
|
|
let result = try await syncChunkWithRetry(chunk, conversation: conversation)
|
|
conversationSynced += result.synced
|
|
totalSynced += result.synced
|
|
|
|
if chunks.count > 1 {
|
|
log.info("Synced chunk \(chunkIndex + 1)/\(chunks.count) (\(result.synced) messages)")
|
|
}
|
|
|
|
if result.synced > 0 {
|
|
runningMessageCount += result.synced
|
|
if !conversationsWithNewMessages.contains(conversation.id) {
|
|
conversationsWithNewMessages.insert(conversation.id)
|
|
runningConversationCount += 1
|
|
}
|
|
stats = SyncStats(
|
|
messageCount: runningMessageCount,
|
|
conversationCount: runningConversationCount,
|
|
contactCount: stats.contactCount
|
|
)
|
|
}
|
|
}
|
|
|
|
if conversationSynced > 0 {
|
|
activityLog.info("Synced \(conversationSynced) messages from \(conversation.displayName)")
|
|
}
|
|
} catch {
|
|
failedConversations += 1
|
|
log.warning("Failed to sync '\(conversation.displayName)': \(error.localizedDescription)")
|
|
activityLog.warning("Failed \(conversation.displayName): \(error.localizedDescription)")
|
|
}
|
|
}
|
|
|
|
if failedConversations > 0 {
|
|
if totalSynced > 0 {
|
|
activityLog.warning("Sync partial: \(totalSynced) messages, \(failedConversations) conversations failed")
|
|
} else {
|
|
activityLog.error("Sync failed: \(failedConversations) conversations had errors")
|
|
}
|
|
} else if totalSynced > 0 {
|
|
activityLog.success("Sync complete: \(totalSynced) new messages")
|
|
} else {
|
|
activityLog.success("Sync complete: no new messages")
|
|
}
|
|
|
|
await fetchStats()
|
|
|
|
} catch {
|
|
log.warning("Sync failed: \(error.localizedDescription)")
|
|
activityLog.error("Sync failed: \(error.localizedDescription)")
|
|
}
|
|
|
|
isResyncInProgress = false
|
|
}
|
|
|
|
private func checkSchemaVersionAndReset() async -> Bool {
|
|
let storedVersion = UserDefaults.standard.integer(forKey: "syncSchemaVersion")
|
|
let currentVersion = Self.syncSchemaVersion
|
|
|
|
log.info("Schema version check - stored: \(storedVersion), current: \(currentVersion)")
|
|
|
|
if storedVersion != currentVersion {
|
|
if storedVersion > 0 { await performReset() }
|
|
UserDefaults.standard.set(currentVersion, forKey: "syncSchemaVersion")
|
|
return storedVersion > 0
|
|
}
|
|
return false
|
|
}
|
|
|
|
private func syncContacts() async {
|
|
activityLog.info("Syncing contacts...")
|
|
do {
|
|
let contacts = imessageReader.getAllContacts()
|
|
guard !contacts.isEmpty else {
|
|
activityLog.info("No contacts from address book")
|
|
contactSyncInfo.loadedFromAddressBook = 0
|
|
contactSyncInfo.sampleNames = []
|
|
return
|
|
}
|
|
|
|
contactSyncInfo.loadedFromAddressBook = contacts.count
|
|
contactSyncInfo.sampleNames = Array(contacts.prefix(5).map { $0.displayName })
|
|
|
|
let dateFormatter = ISO8601DateFormatter()
|
|
let payloads = contacts.map { contact in
|
|
SyncContactPayload(
|
|
appleId: nil,
|
|
phoneNumber: contact.phoneNumber,
|
|
email: contact.email,
|
|
displayName: contact.displayName,
|
|
avatarHash: nil,
|
|
birthday: contact.birthday.map { dateFormatter.string(from: $0) }
|
|
)
|
|
}
|
|
|
|
let synced = try await apiClient.syncContacts(payloads)
|
|
contactSyncInfo.syncedToServer = synced
|
|
|
|
let sampleStr = contactSyncInfo.sampleNames.prefix(3).joined(separator: ", ")
|
|
if !sampleStr.isEmpty {
|
|
activityLog.success("Synced \(synced) contacts: \(sampleStr)...")
|
|
} else {
|
|
activityLog.success("Synced \(synced) contacts")
|
|
}
|
|
} catch {
|
|
log.warning("syncContacts failed: \(error.localizedDescription)")
|
|
activityLog.error("Contact sync failed")
|
|
}
|
|
}
|
|
|
|
private func fetchStats() async {
|
|
do {
|
|
let response = try await apiClient.getStats()
|
|
stats = SyncStats(
|
|
messageCount: response.totalMessages,
|
|
conversationCount: response.totalConversations,
|
|
contactCount: response.totalContacts
|
|
)
|
|
if let serverLastSync = response.lastSyncAt, lastSync == nil {
|
|
setLastSync(serverLastSync)
|
|
}
|
|
} catch {
|
|
log.warning("fetchStats failed: \(error.localizedDescription)")
|
|
}
|
|
}
|
|
|
|
private func syncChunkWithRetry(
|
|
_ chunk: [iMessage],
|
|
conversation: iMessageConversation
|
|
) async throws -> ChunkSyncResult {
|
|
do {
|
|
let synced = try await syncSingleChunk(chunk, conversation: conversation)
|
|
return ChunkSyncResult(synced: synced)
|
|
} catch let error as APIError {
|
|
if case .serverError(_, let message) = error, (message ?? "").contains("Payload too large") {
|
|
if chunk.count == 1 {
|
|
let msg = chunk[0]
|
|
let sizeInfo = msg.attachmentsTotalSize > 0 ? "(\(msg.attachmentsTotalSize) bytes)" : "(no attachments)"
|
|
let label = msg.attachments.first?.filename ?? msg.guid
|
|
throw APIError.serverError(statusCode: 413, message: "Message too large: \(label) \(sizeInfo)")
|
|
}
|
|
|
|
let midpoint = chunk.count / 2
|
|
let firstResult = try await syncChunkWithRetry(Array(chunk[0..<midpoint]), conversation: conversation)
|
|
let secondResult = try await syncChunkWithRetry(Array(chunk[midpoint..<chunk.count]), conversation: conversation)
|
|
return ChunkSyncResult(synced: firstResult.synced + secondResult.synced)
|
|
}
|
|
throw error
|
|
}
|
|
}
|
|
|
|
private func syncSingleChunk(
|
|
_ chunk: [iMessage],
|
|
conversation: iMessageConversation
|
|
) async throws -> Int {
|
|
let payload = SyncMessagesPayload(
|
|
conversationImessageId: conversation.id,
|
|
conversationDisplayName: conversation.displayName,
|
|
isGroup: conversation.isGroup,
|
|
participantIds: conversation.participantIds,
|
|
messages: chunk.map { msg in
|
|
SyncMessagePayload(
|
|
imessageGuid: msg.guid,
|
|
senderId: msg.senderId,
|
|
direction: msg.isFromMe ? "outgoing" : "incoming",
|
|
sentAt: ISO8601DateFormatter().string(from: msg.date),
|
|
deliveredAt: msg.dateDelivered.map { ISO8601DateFormatter().string(from: $0) },
|
|
readAt: msg.dateRead.map { ISO8601DateFormatter().string(from: $0) },
|
|
text: msg.text,
|
|
attributedBody: msg.attributedBody,
|
|
associatedMessageType: msg.associatedMessageType,
|
|
associatedMessageGuid: msg.associatedMessageGuid,
|
|
isAudioMessage: msg.isAudioMessage,
|
|
expressiveSendStyleId: msg.expressiveSendStyleId,
|
|
replyToGuid: msg.replyToGuid,
|
|
threadOriginatorGuid: msg.threadOriginatorGuid,
|
|
groupTitle: msg.groupTitle,
|
|
balloonBundleId: msg.balloonBundleId,
|
|
service: msg.service,
|
|
senderIdentifier: msg.senderIdentifier,
|
|
senderDisplayName: msg.senderDisplayName,
|
|
senderPhoneNumber: msg.senderPhoneNumber,
|
|
senderEmail: msg.senderEmail,
|
|
attachments: msg.attachments.map { att in
|
|
SyncAttachmentPayload(
|
|
filename: att.filename,
|
|
mimeType: att.mimeType,
|
|
transferName: att.transferName,
|
|
size: att.size,
|
|
data: att.data
|
|
)
|
|
},
|
|
attachmentsCount: msg.attachmentsCount,
|
|
attachmentsTotalSize: msg.attachmentsTotalSize,
|
|
attachmentsFiletypes: msg.attachmentsFiletypes
|
|
)
|
|
}
|
|
)
|
|
|
|
return try await apiClient.syncMessages(payload)
|
|
}
|
|
|
|
// MARK: - Reset
|
|
|
|
func resetAndResync() {
|
|
guard !isResetting && !isSyncing else { return }
|
|
Task { await performReset() }
|
|
}
|
|
|
|
private func performReset() async {
|
|
log.info("performReset starting")
|
|
activityLog.info("Starting full reset...")
|
|
isResetting = true
|
|
isResyncInProgress = true
|
|
|
|
let previousLastSync = lastSync
|
|
|
|
do {
|
|
let result = try await apiClient.resetSync()
|
|
activityLog.info("Cleared \(result.deletedMessages) messages, \(result.deletedConversations) conversations")
|
|
|
|
setLastSync(nil)
|
|
isResetting = false
|
|
await performSyncForResync()
|
|
} catch {
|
|
log.warning("Reset failed: \(error.localizedDescription)")
|
|
activityLog.error("Reset failed: \(error.localizedDescription)")
|
|
|
|
if let previous = previousLastSync {
|
|
setLastSync(previous)
|
|
}
|
|
isResetting = false
|
|
isResyncInProgress = false
|
|
}
|
|
}
|
|
|
|
private func performSyncForResync() async {
|
|
log.info("performSyncForResync starting")
|
|
activityLog.info("Starting full resync...")
|
|
isSyncing = true
|
|
|
|
var runningMessageCount = 0
|
|
var runningConversationCount = 0
|
|
var conversationsWithNewMessages = Set<String>()
|
|
stats = SyncStats()
|
|
|
|
do {
|
|
let conversations = try imessageReader.getConversations()
|
|
activityLog.info("Found \(conversations.count) conversations")
|
|
var totalSynced = 0
|
|
var failedConversations = 0
|
|
|
|
for conversation in conversations {
|
|
do {
|
|
let messages = try imessageReader.getMessages(conversationId: conversation.id, since: nil)
|
|
if messages.isEmpty { continue }
|
|
|
|
let chunks = ChunkingStrategy.createAdaptiveChunks(messages)
|
|
var conversationSynced = 0
|
|
|
|
for (chunkIndex, chunk) in chunks.enumerated() {
|
|
let result = try await syncChunkWithRetry(chunk, conversation: conversation)
|
|
conversationSynced += result.synced
|
|
totalSynced += result.synced
|
|
|
|
if chunks.count > 1 {
|
|
log.info("Resynced chunk \(chunkIndex + 1)/\(chunks.count) (\(result.synced) messages)")
|
|
}
|
|
|
|
if result.synced > 0 {
|
|
runningMessageCount += result.synced
|
|
if !conversationsWithNewMessages.contains(conversation.id) {
|
|
conversationsWithNewMessages.insert(conversation.id)
|
|
runningConversationCount += 1
|
|
}
|
|
stats = SyncStats(
|
|
messageCount: runningMessageCount,
|
|
conversationCount: runningConversationCount,
|
|
contactCount: stats.contactCount
|
|
)
|
|
}
|
|
}
|
|
|
|
if conversationSynced > 0 {
|
|
activityLog.info("Resynced \(conversationSynced) messages from \(conversation.displayName)")
|
|
}
|
|
} catch {
|
|
failedConversations += 1
|
|
activityLog.warning("Failed \(conversation.displayName): \(error.localizedDescription)")
|
|
}
|
|
}
|
|
|
|
let newSyncTime = Date()
|
|
setLastSync(newSyncTime)
|
|
lastSyncCompletedAt = newSyncTime
|
|
UserDefaults.standard.set(newSyncTime, forKey: "imessage.lastSyncCompletedAt")
|
|
|
|
if failedConversations > 0 {
|
|
activityLog.warning("Resync partial: \(totalSynced) messages, \(failedConversations) failed")
|
|
} else {
|
|
activityLog.success("Resync complete: \(totalSynced) messages")
|
|
}
|
|
|
|
await fetchStats()
|
|
} catch {
|
|
activityLog.error("Resync failed: \(error.localizedDescription)")
|
|
}
|
|
|
|
isSyncing = false
|
|
isResyncInProgress = false
|
|
}
|
|
}
|