merge batch 2: imessage, iphoto, ContentTypeMapping/ConfigFile already-superset
This commit is contained in:
parent
cc1d40f7f6
commit
bf3879feba
4 changed files with 331 additions and 314 deletions
|
|
@ -9,7 +9,7 @@ private let log = AppLogger.logger(for: "IMessage.API")
|
|||
|
||||
// MARK: - Protocol
|
||||
|
||||
protocol APIClientProtocol: AnyObject {
|
||||
protocol APIClientProtocol: AnyObject, Sendable {
|
||||
var isAuthenticated: Bool { get }
|
||||
func syncMessages(_ payload: SyncMessagesPayload) async throws -> Int
|
||||
func syncMessagesBatch(_ payloads: [SyncMessagesPayload]) async throws -> Int
|
||||
|
|
@ -269,7 +269,7 @@ struct SyncContactPayload: Encodable {
|
|||
}
|
||||
}
|
||||
|
||||
struct PendingSendMessage {
|
||||
struct PendingSendMessage: Sendable {
|
||||
let id: String
|
||||
let phoneNumber: String
|
||||
let body: String
|
||||
|
|
|
|||
|
|
@ -291,30 +291,36 @@ class iMessageReader: MessageReaderProtocol {
|
|||
let deliveredUnix: TimeInterval? = row["date_delivered_unix"]
|
||||
let readUnix: TimeInterval? = row["date_read_unix"]
|
||||
|
||||
let rawText: String? = row["text"]
|
||||
let rawAttributedBody: Data? = row["attributedBody"]
|
||||
|
||||
var attributedBodyBase64: String?
|
||||
// Prefer the text column; fall back to decoding attributedBody (NSKeyedArchiver) for
|
||||
// newer messages where Apple stores the string only in the attributed blob.
|
||||
var resolvedText: String? = rawText.flatMap { $0.isEmpty ? nil : $0 }
|
||||
|
||||
if let bodyData = rawAttributedBody {
|
||||
attributedBodyBase64 = bodyData.base64EncodedString()
|
||||
if resolvedText == nil || resolvedText!.isEmpty {
|
||||
if let decoded = try? NSKeyedUnarchiver.unarchivedObject(
|
||||
ofClass: NSAttributedString.self, from: bodyData
|
||||
) {
|
||||
let s = decoded.string.trimmingCharacters(in: .whitespacesAndNewlines)
|
||||
if !s.isEmpty { resolvedText = s }
|
||||
}
|
||||
// Secondary heuristic fallback if NSKeyedUnarchiver fails.
|
||||
if resolvedText == nil || resolvedText!.isEmpty {
|
||||
resolvedText = self.extractTextFromAttributedBody(bodyData)
|
||||
}
|
||||
var extractedText: String?
|
||||
if let attributedBodyData: Data = row["attributedBody"] {
|
||||
attributedBodyBase64 = attributedBodyData.base64EncodedString()
|
||||
// Try NSKeyedUnarchiver first — when the blob is a real
|
||||
// archived NSAttributedString it gives us the cleanest
|
||||
// text. Fall back to the heuristic typedstream scan
|
||||
// (`extractTextFromAttributedBody`) when it isn't, which
|
||||
// is the common case for modern URL bubbles / reactions.
|
||||
if let decoded = try? NSKeyedUnarchiver.unarchivedObject(
|
||||
ofClass: NSAttributedString.self, from: attributedBodyData
|
||||
) {
|
||||
let s = decoded.string.trimmingCharacters(in: .whitespacesAndNewlines)
|
||||
if !s.isEmpty { extractedText = s }
|
||||
}
|
||||
if extractedText == nil || extractedText!.isEmpty {
|
||||
extractedText = self.extractTextFromAttributedBody(attributedBodyData)
|
||||
}
|
||||
}
|
||||
|
||||
// Prefer the chat.db `text` field; fall back to text recovered
|
||||
// from attributedBody when the row's text is null or empty
|
||||
// (common for URL bubbles, reactions, expressive sends).
|
||||
let rawText: String? = row["text"]
|
||||
let effectiveText: String? = {
|
||||
if let raw = rawText, !raw.isEmpty { return raw }
|
||||
if let extracted = extractedText, !extracted.isEmpty { return extracted }
|
||||
return rawText
|
||||
}()
|
||||
|
||||
let senderId: String? = row["sender_id"]
|
||||
var senderDisplayName: String?
|
||||
var senderPhoneNumber: String?
|
||||
|
|
@ -340,7 +346,7 @@ class iMessageReader: MessageReaderProtocol {
|
|||
date: Date(timeIntervalSince1970: dateUnix),
|
||||
dateDelivered: deliveredUnix.map { Date(timeIntervalSince1970: $0) },
|
||||
dateRead: readUnix.map { Date(timeIntervalSince1970: $0) },
|
||||
text: resolvedText,
|
||||
text: effectiveText,
|
||||
attributedBody: attributedBodyBase64,
|
||||
associatedMessageType: row["associated_message_type"],
|
||||
associatedMessageGuid: row["associated_message_guid"],
|
||||
|
|
@ -387,86 +393,139 @@ class iMessageReader: MessageReaderProtocol {
|
|||
}
|
||||
}
|
||||
|
||||
/// Decoder for the iMessage `attributedBody` NSKeyedArchiver typedstream
|
||||
/// blob. Mirror of `src/server/src/shared/typedstream/decode.ts` — both
|
||||
/// sides must agree on which messages yield non-null text.
|
||||
///
|
||||
/// Strategy:
|
||||
/// 1. Scan every "NSString" class-definition marker; at each, read a
|
||||
/// length-prefixed UTF-8 run after the 5-byte padding. Keep the
|
||||
/// longest valid extraction (handles repeated markers via
|
||||
/// back-references / appended attribute dicts).
|
||||
/// 2. Fallback: longest printable byte run that contains at least one
|
||||
/// letter and is not a known typedstream class-name marker.
|
||||
///
|
||||
/// Length prefixes:
|
||||
/// - 0x00..0x7F → that byte is the length
|
||||
/// - 0x81 LO HI → 16-bit little-endian
|
||||
/// - 0x82 B0..B3 → 32-bit little-endian
|
||||
func extractTextFromAttributedBody(_ data: Data) -> String? {
|
||||
guard !data.isEmpty else { return nil }
|
||||
|
||||
let bytes = [UInt8](data)
|
||||
let marker: [UInt8] = [78, 83, 83, 116, 114, 105, 110, 103] // "NSString"
|
||||
if let text = bestNSStringRun(in: bytes) { return text }
|
||||
return longestPrintableRun(in: bytes)
|
||||
}
|
||||
|
||||
if let markerIndex = findSubsequence(in: bytes, subsequence: marker) {
|
||||
let contentStart = markerIndex + 8 + 5
|
||||
private static let nsStringMarker: [UInt8] = [78, 83, 83, 116, 114, 105, 110, 103] // "NSString"
|
||||
private static let nsStringPadding: Int = 5
|
||||
|
||||
if contentStart < bytes.count {
|
||||
let lengthByte = bytes[contentStart]
|
||||
let textStart: Int
|
||||
let textLength: Int
|
||||
private static let typedstreamMarkerWords: Set<String> = [
|
||||
"NSString", "NSMutableString",
|
||||
"NSAttributedString", "NSMutableAttributedString",
|
||||
"NSDictionary", "NSMutableDictionary",
|
||||
"NSArray", "NSMutableArray",
|
||||
"NSNumber", "NSObject", "NSValue", "NSData", "NSDate",
|
||||
"streamtyped"
|
||||
]
|
||||
|
||||
if lengthByte == 0x81 {
|
||||
if contentStart + 3 <= bytes.count {
|
||||
textLength = Int(bytes[contentStart + 1]) | (Int(bytes[contentStart + 2]) << 8)
|
||||
textStart = contentStart + 3
|
||||
} else {
|
||||
return nil
|
||||
}
|
||||
} else {
|
||||
textLength = Int(lengthByte)
|
||||
textStart = contentStart + 1
|
||||
}
|
||||
private func bestNSStringRun(in bytes: [UInt8]) -> String? {
|
||||
let marker = Self.nsStringMarker
|
||||
var best: String?
|
||||
var bestLen = 0
|
||||
var searchFrom = 0
|
||||
|
||||
if textStart + textLength <= bytes.count && textLength > 0 {
|
||||
let textData = Data(bytes[textStart..<(textStart + textLength)])
|
||||
if let text = String(data: textData, encoding: .utf8) {
|
||||
let trimmed = text.trimmingCharacters(in: .whitespacesAndNewlines)
|
||||
if !trimmed.isEmpty { return trimmed }
|
||||
}
|
||||
}
|
||||
while searchFrom <= bytes.count - marker.count {
|
||||
guard let idx = findSubsequence(in: bytes, subsequence: marker, from: searchFrom) else { break }
|
||||
if let candidate = readNSStringRun(in: bytes, markerIndex: idx), candidate.count > bestLen {
|
||||
best = candidate
|
||||
bestLen = candidate.count
|
||||
}
|
||||
searchFrom = idx + 1
|
||||
}
|
||||
|
||||
var longestText: String?
|
||||
return best
|
||||
}
|
||||
|
||||
private func readNSStringRun(in bytes: [UInt8], markerIndex: Int) -> String? {
|
||||
let contentStart = markerIndex + Self.nsStringMarker.count + Self.nsStringPadding
|
||||
guard contentStart < bytes.count else { return nil }
|
||||
guard let len = readVarLength(in: bytes, at: contentStart), len.length > 0 else { return nil }
|
||||
let end = len.nextOffset + len.length
|
||||
guard end <= bytes.count else { return nil }
|
||||
|
||||
let slice = Data(bytes[len.nextOffset..<end])
|
||||
guard let text = String(data: slice, encoding: .utf8) else { return nil }
|
||||
let trimmed = text.trimmingCharacters(in: .whitespacesAndNewlines)
|
||||
return trimmed.isEmpty ? nil : trimmed
|
||||
}
|
||||
|
||||
private struct VarLength { let length: Int; let nextOffset: Int }
|
||||
|
||||
private func readVarLength(in bytes: [UInt8], at offset: Int) -> VarLength? {
|
||||
guard offset < bytes.count else { return nil }
|
||||
let tag = bytes[offset]
|
||||
if tag < 0x81 {
|
||||
return VarLength(length: Int(tag), nextOffset: offset + 1)
|
||||
}
|
||||
if tag == 0x81 {
|
||||
guard offset + 3 <= bytes.count else { return nil }
|
||||
let length = Int(bytes[offset + 1]) | (Int(bytes[offset + 2]) << 8)
|
||||
return VarLength(length: length, nextOffset: offset + 3)
|
||||
}
|
||||
if tag == 0x82 {
|
||||
guard offset + 5 <= bytes.count else { return nil }
|
||||
let b0 = UInt32(bytes[offset + 1])
|
||||
let b1 = UInt32(bytes[offset + 2])
|
||||
let b2 = UInt32(bytes[offset + 3])
|
||||
let b3 = UInt32(bytes[offset + 4])
|
||||
let length = b0 | (b1 << 8) | (b2 << 16) | (b3 << 24)
|
||||
return VarLength(length: Int(length), nextOffset: offset + 5)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
private func longestPrintableRun(in bytes: [UInt8]) -> String? {
|
||||
var longest: String?
|
||||
var currentRun = Data()
|
||||
|
||||
let flush: (inout Data, inout String?) -> Void = { run, longest in
|
||||
guard run.count > 3 else { run = Data(); return }
|
||||
if let str = String(data: run, encoding: .utf8),
|
||||
str.rangeOfCharacter(from: .letters) != nil {
|
||||
let trimmed = str.trimmingCharacters(in: .whitespacesAndNewlines)
|
||||
if trimmed.count > (longest?.count ?? 0)
|
||||
&& !Self.typedstreamMarkerWords.contains(trimmed)
|
||||
&& !trimmed.hasPrefix("NS")
|
||||
&& !trimmed.hasPrefix("__")
|
||||
&& !trimmed.contains("attributedString") {
|
||||
longest = trimmed
|
||||
}
|
||||
}
|
||||
run = Data()
|
||||
}
|
||||
|
||||
for byte in bytes {
|
||||
if (byte >= 0x20 && byte <= 0x7E) || byte >= 0x80 || byte == 0x0A || byte == 0x0D {
|
||||
currentRun.append(byte)
|
||||
} else {
|
||||
if currentRun.count > 3 {
|
||||
if let str = String(data: currentRun, encoding: .utf8),
|
||||
str.rangeOfCharacter(from: .letters) != nil {
|
||||
let trimmed = str.trimmingCharacters(in: .whitespacesAndNewlines)
|
||||
if trimmed.count > (longestText?.count ?? 0) &&
|
||||
!trimmed.hasPrefix("NS") &&
|
||||
!trimmed.hasPrefix("__") &&
|
||||
!trimmed.contains("attributedString") {
|
||||
longestText = trimmed
|
||||
}
|
||||
}
|
||||
}
|
||||
currentRun = Data()
|
||||
flush(¤tRun, &longest)
|
||||
}
|
||||
}
|
||||
flush(¤tRun, &longest)
|
||||
|
||||
if currentRun.count > 3 {
|
||||
if let str = String(data: currentRun, encoding: .utf8),
|
||||
str.rangeOfCharacter(from: .letters) != nil {
|
||||
let trimmed = str.trimmingCharacters(in: .whitespacesAndNewlines)
|
||||
if trimmed.count > (longestText?.count ?? 0) &&
|
||||
!trimmed.hasPrefix("NS") &&
|
||||
!trimmed.hasPrefix("__") &&
|
||||
!trimmed.contains("attributedString") {
|
||||
longestText = trimmed
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return longestText
|
||||
return longest
|
||||
}
|
||||
|
||||
func findSubsequence(in array: [UInt8], subsequence: [UInt8]) -> Int? {
|
||||
func findSubsequence(in array: [UInt8], subsequence: [UInt8], from: Int = 0) -> Int? {
|
||||
guard subsequence.count <= array.count else { return nil }
|
||||
return (0...(array.count - subsequence.count))
|
||||
let start = max(0, from)
|
||||
let last = array.count - subsequence.count
|
||||
guard start <= last else { return nil }
|
||||
return (start...last)
|
||||
.first { Array(array[$0..<($0 + subsequence.count)]) == subsequence }
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
enum iMessageError: LocalizedError {
|
||||
|
|
|
|||
|
|
@ -7,7 +7,7 @@ import MacSyncShared
|
|||
|
||||
private let log = AppLogger.logger(for: "IMessage.Sync")
|
||||
|
||||
struct SyncStats: Equatable {
|
||||
struct SyncStats: Equatable, Sendable {
|
||||
var messageCount: Int = 0
|
||||
var conversationCount: Int = 0
|
||||
var contactCount: Int = 0
|
||||
|
|
@ -24,7 +24,7 @@ struct ContactSyncInfo {
|
|||
var hasContactsPermission: Bool = false
|
||||
}
|
||||
|
||||
enum SyncError: Equatable {
|
||||
enum SyncError: Equatable, Sendable {
|
||||
case none
|
||||
case fullDiskAccessRequired
|
||||
case databaseNotFound
|
||||
|
|
@ -41,16 +41,13 @@ enum SyncError: Equatable {
|
|||
}
|
||||
|
||||
@MainActor
|
||||
class SyncManager: ObservableObject {
|
||||
final class SyncManager: BaseSyncManager<SyncStats, SyncError> {
|
||||
static let shared = SyncManager()
|
||||
|
||||
static let syncSchemaVersion = 3
|
||||
|
||||
@Published var isSyncing = false
|
||||
@Published var lastSync: Date?
|
||||
@Published var lastSyncCompletedAt: Date?
|
||||
@Published var stats = SyncStats()
|
||||
@Published var syncError: SyncError = .none
|
||||
// Module-specific @Published state (base owns isSyncing/lastSyncCompletedAt/
|
||||
// currentOperation/syncError/stats).
|
||||
@Published var contactSyncInfo = ContactSyncInfo()
|
||||
@Published var isResetting = false
|
||||
@Published var isResyncInProgress = false
|
||||
|
|
@ -60,56 +57,118 @@ class SyncManager: ObservableObject {
|
|||
private let sendService = SendService.shared
|
||||
private let activityLog = ActivityLog.shared
|
||||
private let blobSyncManager: BlobSyncManager
|
||||
private var syncTimer: Timer?
|
||||
private var blobSyncTimer: Timer?
|
||||
|
||||
/// First-cycle bootstrap (DB connect, schema-version reset, contacts load) is
|
||||
/// done lazily inside `performSync` since the base's `startSync` is final.
|
||||
private var didBootstrap = false
|
||||
|
||||
/// True until the first inbound cycle has chosen a code path (initial batch
|
||||
/// vs incremental). Reset to true after a `resetAndResync`.
|
||||
private var isInitialLoad = true
|
||||
|
||||
/// Separate timer for blob uploads. Runs every 5 minutes, independent of
|
||||
/// the inbound read cycle and the outbound send queue.
|
||||
private var blobSyncTimer: Timer?
|
||||
|
||||
/// Outbound send-queue runner. Polls the server's legacy
|
||||
/// `icloud.send_queue` table via `IMessageSendTransport`, delegates the
|
||||
/// actual AppleScript send + rate-limit + delivery-tracking to
|
||||
/// `SendService.shared`, and acks the result. Owns its own 30s timer,
|
||||
/// independent of the read cycle, so an outbound queue with backlog
|
||||
/// drains promptly even when no inbound sync runs.
|
||||
private lazy var sendQueueClient: SendQueueClient<IMessageSendTransport> = {
|
||||
let transport = IMessageSendTransport(apiClient: apiClient)
|
||||
let service = sendService
|
||||
let activity = activityLog
|
||||
return SendQueueClient(label: "imessage", transport: transport, interval: 30) { message in
|
||||
let result = service.send(recipient: message.phoneNumber, body: message.body)
|
||||
let suffix = String(message.phoneNumber.prefix(4))
|
||||
if result.success {
|
||||
activity.success("Sent message to \(suffix)...")
|
||||
return .sent
|
||||
} else {
|
||||
let reason = result.error ?? "Unknown error"
|
||||
activity.error("Failed to send to \(suffix)...: \(reason)")
|
||||
return .failed(reason: reason)
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
private init() {
|
||||
self.blobSyncManager = BlobSyncManager(apiClient: APIClient.shared, activityLog: ActivityLog.shared)
|
||||
lastSync = UserDefaults.standard.object(forKey: "lastSync") as? Date
|
||||
lastSyncCompletedAt = UserDefaults.standard.object(forKey: "lastSyncCompletedAt") as? Date
|
||||
log.info("init - lastSync: \(String(describing: self.lastSync))")
|
||||
}
|
||||
|
||||
private func checkSchemaVersionAndReset() async -> Bool {
|
||||
let storedVersion = UserDefaults.standard.integer(forKey: "syncSchemaVersion")
|
||||
let currentVersion = Self.syncSchemaVersion
|
||||
super.init(
|
||||
initialStats: SyncStats(),
|
||||
noError: .none,
|
||||
persistenceKey: "imessage",
|
||||
timerInterval: 30
|
||||
)
|
||||
|
||||
log.info("Schema version check - stored: \(storedVersion), current: \(currentVersion)")
|
||||
|
||||
if storedVersion != currentVersion {
|
||||
if storedVersion > 0 { await performReset() }
|
||||
UserDefaults.standard.set(currentVersion, forKey: "syncSchemaVersion")
|
||||
return storedVersion > 0
|
||||
// Migrate legacy UserDefaults watermark keys (one-time). iMessage used
|
||||
// unprefixed `lastSync` / `lastSyncCompletedAt`.
|
||||
if lastSync == nil, let legacy = UserDefaults.standard.object(forKey: "lastSync") as? Date {
|
||||
setLastSync(legacy)
|
||||
UserDefaults.standard.removeObject(forKey: "lastSync")
|
||||
}
|
||||
if lastSyncCompletedAt == nil, let legacy = UserDefaults.standard.object(forKey: "lastSyncCompletedAt") as? Date {
|
||||
lastSyncCompletedAt = legacy
|
||||
UserDefaults.standard.set(legacy, forKey: "imessage.lastSyncCompletedAt")
|
||||
UserDefaults.standard.removeObject(forKey: "lastSyncCompletedAt")
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func startSync() {
|
||||
log.info("startSync called")
|
||||
activityLog.info("Starting sync service...")
|
||||
syncError = .none
|
||||
func openFullDiskAccessSettings() {
|
||||
if let url = URL(string: "x-apple.systempreferences:com.apple.preference.security?Privacy_AllFiles") {
|
||||
NSWorkspace.shared.open(url)
|
||||
}
|
||||
}
|
||||
|
||||
do {
|
||||
try imessageReader.connect()
|
||||
log.info("Connected to iMessage database")
|
||||
activityLog.success("Connected to iMessage database")
|
||||
} catch {
|
||||
let errorMsg = error.localizedDescription.lowercased()
|
||||
if errorMsg.contains("authorization denied") || errorMsg.contains("error 23") {
|
||||
syncError = .fullDiskAccessRequired
|
||||
activityLog.error("Full Disk Access required")
|
||||
} else if errorMsg.contains("not found") {
|
||||
syncError = .databaseNotFound
|
||||
activityLog.error("iMessage database not found")
|
||||
} else {
|
||||
syncError = .connectionFailed(error.localizedDescription)
|
||||
activityLog.error("Connection failed: \(error.localizedDescription)")
|
||||
func retryConnection() { startSync() }
|
||||
|
||||
// MARK: - Lifecycle hooks
|
||||
|
||||
override func didStartSync() {
|
||||
sendQueueClient.start()
|
||||
// Blob sync on its own 5-minute cadence, independent of read cycle.
|
||||
blobSyncTimer = Timer.scheduledTimer(withTimeInterval: 300, repeats: true) { [weak self] _ in
|
||||
guard let self else { return }
|
||||
Task { await self.blobSyncManager.syncBlobs() }
|
||||
}
|
||||
}
|
||||
|
||||
override func willStopSync() {
|
||||
sendQueueClient.stop()
|
||||
blobSyncTimer?.invalidate()
|
||||
blobSyncTimer = nil
|
||||
}
|
||||
|
||||
// MARK: - Sync cycle
|
||||
|
||||
override func performSync() async {
|
||||
// Lazy one-shot bootstrap: DB connect, schema version reset, contacts.
|
||||
if !didBootstrap {
|
||||
activityLog.info("Starting sync service...")
|
||||
syncError = .none
|
||||
|
||||
do {
|
||||
try imessageReader.connect()
|
||||
log.info("Connected to iMessage database")
|
||||
activityLog.success("Connected to iMessage database")
|
||||
} catch {
|
||||
let errorMsg = error.localizedDescription.lowercased()
|
||||
if errorMsg.contains("authorization denied") || errorMsg.contains("error 23") {
|
||||
syncError = .fullDiskAccessRequired
|
||||
activityLog.error("Full Disk Access required")
|
||||
} else if errorMsg.contains("not found") {
|
||||
syncError = .databaseNotFound
|
||||
activityLog.error("iMessage database not found")
|
||||
} else {
|
||||
syncError = .connectionFailed(error.localizedDescription)
|
||||
activityLog.error("Connection failed: \(error.localizedDescription)")
|
||||
}
|
||||
return
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
Task {
|
||||
let didReset = await checkSchemaVersionAndReset()
|
||||
if didReset { activityLog.info("Schema version updated, resync required") }
|
||||
|
||||
|
|
@ -123,58 +182,37 @@ class SyncManager: ObservableObject {
|
|||
activityLog.warning("Contacts permission not granted - names won't sync")
|
||||
}
|
||||
|
||||
if isInitialLoad && stats.messageCount == 0 {
|
||||
log.info("Initial load detected - using batch sync")
|
||||
activityLog.info("First sync detected - using optimized batch endpoint")
|
||||
await performInitialSync()
|
||||
isInitialLoad = false
|
||||
} else {
|
||||
isInitialLoad = false
|
||||
await performSync()
|
||||
}
|
||||
|
||||
// Blob pass runs whenever there are missing attachment blobs, regardless
|
||||
// of whether this was an initial or incremental sync.
|
||||
await blobSyncManager.syncBlobs()
|
||||
await fetchStats()
|
||||
didBootstrap = true
|
||||
}
|
||||
|
||||
syncTimer = Timer.scheduledTimer(withTimeInterval: 30, repeats: true) { [weak self] _ in
|
||||
Task { @MainActor in self?.syncNow() }
|
||||
}
|
||||
blobSyncTimer = Timer.scheduledTimer(withTimeInterval: 300, repeats: true) { [weak self] _ in
|
||||
guard let self else { return }
|
||||
Task { await self.blobSyncManager.syncBlobs() }
|
||||
}
|
||||
activityLog.info("Scheduled automatic sync every 30s, blob sync every 5m")
|
||||
}
|
||||
// Outbound send queue used to be invoked inline here. It now runs on
|
||||
// its own 30s timer via `sendQueueClient` started from
|
||||
// `didStartSync()`. We still flush once at the top of every read
|
||||
// cycle so a manual `syncNow()` drains pending sends promptly.
|
||||
await sendQueueClient.drainOnce()
|
||||
|
||||
func stopSync() {
|
||||
syncTimer?.invalidate()
|
||||
syncTimer = nil
|
||||
blobSyncTimer?.invalidate()
|
||||
blobSyncTimer = nil
|
||||
}
|
||||
|
||||
func openFullDiskAccessSettings() {
|
||||
if let url = URL(string: "x-apple.systempreferences:com.apple.preference.security?Privacy_AllFiles") {
|
||||
NSWorkspace.shared.open(url)
|
||||
// Initial-load path: server has nothing, stream a metadata-only batch
|
||||
// import (attachments without blob bytes). Subsequent cycles use the
|
||||
// standard incremental path with full attachment data.
|
||||
if isInitialLoad && stats.messageCount == 0 {
|
||||
log.info("Initial load detected - using batch sync")
|
||||
activityLog.info("First sync detected - using optimized batch endpoint")
|
||||
await performInitialSync()
|
||||
isInitialLoad = false
|
||||
} else {
|
||||
isInitialLoad = false
|
||||
await runReadCycle()
|
||||
}
|
||||
}
|
||||
|
||||
func retryConnection() { startSync() }
|
||||
|
||||
func syncNow() {
|
||||
guard !isSyncing else { return }
|
||||
Task {
|
||||
await processSendQueue()
|
||||
await performSync()
|
||||
}
|
||||
// Blob pass runs after every read cycle; uploads any messages whose
|
||||
// attachment bytes haven't been pushed yet.
|
||||
await blobSyncManager.syncBlobs()
|
||||
}
|
||||
|
||||
private func performInitialSync() async {
|
||||
log.info("performInitialSync starting - streaming batch mode")
|
||||
activityLog.info("Starting initial batch sync...")
|
||||
isSyncing = true
|
||||
|
||||
var latestMessageDate: Date?
|
||||
var totalSynced = 0
|
||||
|
|
@ -186,7 +224,7 @@ class SyncManager: ObservableObject {
|
|||
activityLog.info("Found \(conversations.count) conversations")
|
||||
|
||||
// Stream in windows of maxConversationsPerBatch so we never hold
|
||||
// the full 48k-message history in memory at once.
|
||||
// the full history in memory at once.
|
||||
var window: [SyncMessagesPayload] = []
|
||||
var windowMessages = 0
|
||||
var convIndex = 0
|
||||
|
|
@ -293,14 +331,9 @@ class SyncManager: ObservableObject {
|
|||
log.info("Initial sync complete - \(totalSynced) messages synced, \(totalFailed) conversations failed")
|
||||
|
||||
if let newDate = latestMessageDate {
|
||||
lastSync = newDate
|
||||
UserDefaults.standard.set(newDate, forKey: "lastSync")
|
||||
setLastSync(newDate)
|
||||
}
|
||||
|
||||
let now = Date()
|
||||
lastSyncCompletedAt = now
|
||||
UserDefaults.standard.set(now, forKey: "lastSyncCompletedAt")
|
||||
|
||||
activityLog.success("Initial sync complete: \(totalSynced) messages synced")
|
||||
await fetchStats()
|
||||
|
||||
|
|
@ -309,19 +342,16 @@ class SyncManager: ObservableObject {
|
|||
activityLog.error("Initial sync failed: \(error.localizedDescription)")
|
||||
}
|
||||
|
||||
isSyncing = false
|
||||
isResyncInProgress = false
|
||||
}
|
||||
|
||||
private func performSync() async {
|
||||
private func runReadCycle() async {
|
||||
log.info("performSync starting")
|
||||
activityLog.info("Starting sync...")
|
||||
isSyncing = true
|
||||
|
||||
var runningMessageCount = stats.messageCount
|
||||
var runningConversationCount = stats.conversationCount
|
||||
var conversationsWithNewMessages = Set<String>()
|
||||
var latestMessageDate: Date?
|
||||
|
||||
do {
|
||||
let conversations = try imessageReader.getConversations()
|
||||
|
|
@ -339,12 +369,6 @@ class SyncManager: ObservableObject {
|
|||
|
||||
if messages.isEmpty { continue }
|
||||
|
||||
for msg in messages {
|
||||
if latestMessageDate == nil || msg.date > latestMessageDate! {
|
||||
latestMessageDate = msg.date
|
||||
}
|
||||
}
|
||||
|
||||
let chunks = ChunkingStrategy.createAdaptiveChunks(messages)
|
||||
var conversationSynced = 0
|
||||
|
||||
|
|
@ -381,16 +405,6 @@ class SyncManager: ObservableObject {
|
|||
}
|
||||
}
|
||||
|
||||
if totalSynced > 0 || failedConversations == 0 {
|
||||
let newSyncTime = latestMessageDate ?? Date()
|
||||
lastSync = newSyncTime
|
||||
UserDefaults.standard.set(newSyncTime, forKey: "lastSync")
|
||||
}
|
||||
|
||||
let now = Date()
|
||||
lastSyncCompletedAt = now
|
||||
UserDefaults.standard.set(now, forKey: "lastSyncCompletedAt")
|
||||
|
||||
if failedConversations > 0 {
|
||||
if totalSynced > 0 {
|
||||
activityLog.warning("Sync partial: \(totalSynced) messages, \(failedConversations) conversations failed")
|
||||
|
|
@ -410,10 +424,23 @@ class SyncManager: ObservableObject {
|
|||
activityLog.error("Sync failed: \(error.localizedDescription)")
|
||||
}
|
||||
|
||||
isSyncing = false
|
||||
isResyncInProgress = false
|
||||
}
|
||||
|
||||
private func checkSchemaVersionAndReset() async -> Bool {
|
||||
let storedVersion = UserDefaults.standard.integer(forKey: "syncSchemaVersion")
|
||||
let currentVersion = Self.syncSchemaVersion
|
||||
|
||||
log.info("Schema version check - stored: \(storedVersion), current: \(currentVersion)")
|
||||
|
||||
if storedVersion != currentVersion {
|
||||
if storedVersion > 0 { await performReset() }
|
||||
UserDefaults.standard.set(currentVersion, forKey: "syncSchemaVersion")
|
||||
return storedVersion > 0
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
private func syncContacts() async {
|
||||
activityLog.info("Syncing contacts...")
|
||||
do {
|
||||
|
|
@ -464,47 +491,13 @@ class SyncManager: ObservableObject {
|
|||
contactCount: response.totalContacts
|
||||
)
|
||||
if let serverLastSync = response.lastSyncAt, lastSync == nil {
|
||||
lastSync = serverLastSync
|
||||
UserDefaults.standard.set(serverLastSync, forKey: "lastSync")
|
||||
setLastSync(serverLastSync)
|
||||
}
|
||||
} catch {
|
||||
log.warning("fetchStats failed: \(error.localizedDescription)")
|
||||
}
|
||||
}
|
||||
|
||||
private func processSendQueue() async {
|
||||
guard apiClient.isAuthenticated else { return }
|
||||
|
||||
do {
|
||||
let pendingMessages = try await apiClient.getPendingSends()
|
||||
if pendingMessages.isEmpty { return }
|
||||
|
||||
activityLog.info("Processing \(pendingMessages.count) queued message(s)...")
|
||||
|
||||
for message in pendingMessages {
|
||||
let result = sendService.send(recipient: message.phoneNumber, body: message.body)
|
||||
|
||||
do {
|
||||
try await apiClient.reportSendResult(
|
||||
messageId: message.id,
|
||||
status: result.success ? "sent" : "failed",
|
||||
error: result.error
|
||||
)
|
||||
if result.success {
|
||||
activityLog.success("Sent message to \(message.phoneNumber.prefix(4))...")
|
||||
} else {
|
||||
activityLog.error("Failed to send to \(message.phoneNumber.prefix(4))...: \(result.error ?? "Unknown error")")
|
||||
}
|
||||
} catch {
|
||||
activityLog.warning("Failed to report send result: \(error.localizedDescription)")
|
||||
}
|
||||
}
|
||||
} catch {
|
||||
if case APIError.missingAuthToken = error { return }
|
||||
log.warning("processSendQueue failed: \(error.localizedDescription)")
|
||||
}
|
||||
}
|
||||
|
||||
private func syncChunkWithRetry(
|
||||
_ chunk: [iMessage],
|
||||
conversation: iMessageConversation
|
||||
|
|
@ -600,7 +593,8 @@ class SyncManager: ObservableObject {
|
|||
let result = try await apiClient.resetSync()
|
||||
activityLog.info("Cleared \(result.deletedMessages) messages, \(result.deletedConversations) conversations")
|
||||
|
||||
UserDefaults.standard.removeObject(forKey: "lastSync")
|
||||
setLastSync(nil)
|
||||
isInitialLoad = true
|
||||
isResetting = false
|
||||
await performSyncForResync()
|
||||
} catch {
|
||||
|
|
@ -608,7 +602,7 @@ class SyncManager: ObservableObject {
|
|||
activityLog.error("Reset failed: \(error.localizedDescription)")
|
||||
|
||||
if let previous = previousLastSync {
|
||||
UserDefaults.standard.set(previous, forKey: "lastSync")
|
||||
setLastSync(previous)
|
||||
}
|
||||
isResetting = false
|
||||
isResyncInProgress = false
|
||||
|
|
@ -672,10 +666,9 @@ class SyncManager: ObservableObject {
|
|||
}
|
||||
|
||||
let newSyncTime = Date()
|
||||
lastSync = newSyncTime
|
||||
setLastSync(newSyncTime)
|
||||
lastSyncCompletedAt = newSyncTime
|
||||
UserDefaults.standard.set(newSyncTime, forKey: "lastSync")
|
||||
UserDefaults.standard.set(newSyncTime, forKey: "lastSyncCompletedAt")
|
||||
UserDefaults.standard.set(newSyncTime, forKey: "imessage.lastSyncCompletedAt")
|
||||
|
||||
if failedConversations > 0 {
|
||||
activityLog.warning("Resync partial: \(totalSynced) messages, \(failedConversations) failed")
|
||||
|
|
|
|||
|
|
@ -80,21 +80,11 @@ public enum IPhotoSyncError: Equatable, Sendable {
|
|||
/// Upload concurrency is capped at `maxConcurrentUploads` (4) using `withTaskGroup`
|
||||
/// with explicit batch windows so we never open more than 4 concurrent HTTP streams.
|
||||
@MainActor
|
||||
public final class SyncManager: ObservableObject {
|
||||
public final class SyncManager: BaseSyncManager<IPhotoSyncStats, IPhotoSyncError> {
|
||||
public static let shared = SyncManager()
|
||||
|
||||
// MARK: Public state (observed by MenuBar + local web server)
|
||||
@Published public var isSyncing = false
|
||||
@Published public var lastSync: Date?
|
||||
@Published public var lastSyncCompletedAt: Date?
|
||||
@Published public var stats = IPhotoSyncStats()
|
||||
@Published public var syncError: IPhotoSyncError = .none
|
||||
@Published public var currentOperation: String = ""
|
||||
|
||||
// MARK: Private
|
||||
private let reader = PhotosLibraryReader.shared
|
||||
private let apiClient = APIClient.shared
|
||||
private var syncTimer: Timer?
|
||||
|
||||
/// Batch size for metadata sync — 100 photos per request.
|
||||
private let metadataBatchSize = 100
|
||||
|
|
@ -102,57 +92,43 @@ public final class SyncManager: ObservableObject {
|
|||
private let maxConcurrentUploads = 4
|
||||
|
||||
private init() {
|
||||
lastSync = UserDefaults.standard.object(forKey: "iphotoLastSync") as? Date
|
||||
lastSyncCompletedAt = UserDefaults.standard.object(forKey: "iphotoLastSyncCompletedAt") as? Date
|
||||
log.info("init lastSync=\(String(describing: self.lastSync))")
|
||||
}
|
||||
super.init(
|
||||
initialStats: IPhotoSyncStats(),
|
||||
noError: .none,
|
||||
persistenceKey: "iphoto",
|
||||
timerInterval: 300
|
||||
)
|
||||
|
||||
// MARK: - Lifecycle
|
||||
|
||||
public func startSync() {
|
||||
log.info("startSync called")
|
||||
syncError = .none
|
||||
|
||||
Task {
|
||||
let authorized = await reader.requestAuthorization()
|
||||
guard authorized else {
|
||||
log.warning("Photos access denied")
|
||||
syncError = .photosAccessRequired
|
||||
return
|
||||
}
|
||||
await performSync()
|
||||
// Migrate legacy UserDefaults watermark keys (one-time).
|
||||
if lastSync == nil, let legacy = UserDefaults.standard.object(forKey: "iphotoLastSync") as? Date {
|
||||
setLastSync(legacy)
|
||||
UserDefaults.standard.removeObject(forKey: "iphotoLastSync")
|
||||
}
|
||||
|
||||
syncTimer = Timer.scheduledTimer(withTimeInterval: 300, repeats: true) { [weak self] _ in
|
||||
Task { @MainActor in self?.syncNow() }
|
||||
}
|
||||
log.info("Scheduled automatic sync every 5 minutes")
|
||||
}
|
||||
|
||||
public func stopSync() {
|
||||
syncTimer?.invalidate()
|
||||
syncTimer = nil
|
||||
}
|
||||
|
||||
public func syncNow() {
|
||||
guard !isSyncing else { return }
|
||||
Task {
|
||||
if !reader.isAccessible {
|
||||
let ok = await reader.requestAuthorization()
|
||||
guard ok else {
|
||||
syncError = .photosAccessRequired
|
||||
return
|
||||
}
|
||||
}
|
||||
await performSync()
|
||||
if lastSyncCompletedAt == nil, let legacy = UserDefaults.standard.object(forKey: "iphotoLastSyncCompletedAt") as? Date {
|
||||
lastSyncCompletedAt = legacy
|
||||
UserDefaults.standard.set(legacy, forKey: "iphoto.lastSyncCompletedAt")
|
||||
UserDefaults.standard.removeObject(forKey: "iphotoLastSyncCompletedAt")
|
||||
}
|
||||
}
|
||||
|
||||
// MARK: - Authorization hooks
|
||||
|
||||
public override func isAuthorized() async -> Bool { reader.isAccessible }
|
||||
|
||||
public override func requestAuthorization() async -> Bool {
|
||||
await reader.requestAuthorization()
|
||||
}
|
||||
|
||||
public override func onAuthorizationDenied() {
|
||||
syncError = .photosAccessRequired
|
||||
}
|
||||
|
||||
// MARK: - Module-specific entry points
|
||||
|
||||
public func forceFullResync() {
|
||||
guard !isSyncing else { return }
|
||||
log.info("Forcing full resync")
|
||||
lastSync = nil
|
||||
UserDefaults.standard.removeObject(forKey: "iphotoLastSync")
|
||||
setLastSync(nil)
|
||||
stats.failedBatches = 0
|
||||
syncNow()
|
||||
}
|
||||
|
|
@ -160,15 +136,16 @@ public final class SyncManager: ObservableObject {
|
|||
/// Upload only photos that have metadata but no binary on the server.
|
||||
public func uploadPending() {
|
||||
guard !isSyncing else { return }
|
||||
Task {
|
||||
if !reader.isAccessible {
|
||||
let ok = await reader.requestAuthorization()
|
||||
Task { [weak self] in
|
||||
guard let self else { return }
|
||||
if !self.reader.isAccessible {
|
||||
let ok = await self.reader.requestAuthorization()
|
||||
guard ok else {
|
||||
syncError = .photosAccessRequired
|
||||
self.syncError = .photosAccessRequired
|
||||
return
|
||||
}
|
||||
}
|
||||
await performPendingUpload()
|
||||
await self.performPendingUpload()
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -180,9 +157,8 @@ public final class SyncManager: ObservableObject {
|
|||
|
||||
// MARK: - Sync cycle
|
||||
|
||||
private func performSync() async {
|
||||
public override func performSync() async {
|
||||
log.info("performSync starting")
|
||||
isSyncing = true
|
||||
currentOperation = "Fetching photos…"
|
||||
|
||||
// Background diagnostic — never blocks sync loop
|
||||
|
|
@ -195,7 +171,6 @@ public final class SyncManager: ObservableObject {
|
|||
|
||||
if photos.isEmpty {
|
||||
await updateStats()
|
||||
isSyncing = false
|
||||
currentOperation = "No new photos"
|
||||
await performPendingUpload()
|
||||
return
|
||||
|
|
@ -242,13 +217,8 @@ public final class SyncManager: ObservableObject {
|
|||
log.info("Batch \(idx + 1)/\(batches.count) synced=\(response.synced) needsUpload=\(response.needsUpload.count)")
|
||||
} catch {
|
||||
stats.failedBatches += 1
|
||||
let msg = error.localizedDescription.lowercased()
|
||||
if msg.contains("network") || msg.contains("connection") ||
|
||||
msg.contains("timeout") || msg.contains("unreachable") {
|
||||
syncError = .backendUnreachable
|
||||
} else {
|
||||
log.warning("Batch \(idx + 1) failed: \(error.localizedDescription)")
|
||||
}
|
||||
setConnectionError(error)
|
||||
log.warning("Batch \(idx + 1) failed: \(error.localizedDescription)")
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -261,23 +231,15 @@ public final class SyncManager: ObservableObject {
|
|||
currentOperation = "Syncing albums…"
|
||||
await syncAlbums()
|
||||
|
||||
let now = Date()
|
||||
lastSync = now
|
||||
lastSyncCompletedAt = now
|
||||
UserDefaults.standard.set(now, forKey: "iphotoLastSync")
|
||||
UserDefaults.standard.set(now, forKey: "iphotoLastSyncCompletedAt")
|
||||
|
||||
await updateStats()
|
||||
currentOperation = "Sync complete"
|
||||
log.info("performSync complete")
|
||||
|
||||
isSyncing = false
|
||||
await performPendingUpload()
|
||||
}
|
||||
|
||||
private func performPendingUpload() async {
|
||||
log.info("Checking pending uploads")
|
||||
isSyncing = true
|
||||
currentOperation = "Checking pending uploads…"
|
||||
|
||||
Task.detached { self.reader.checkAccessibility() }
|
||||
|
|
@ -288,19 +250,16 @@ public final class SyncManager: ObservableObject {
|
|||
|
||||
guard !pending.isEmpty else {
|
||||
currentOperation = "No pending uploads"
|
||||
isSyncing = false
|
||||
return
|
||||
}
|
||||
|
||||
currentOperation = "Uploading pending…"
|
||||
await uploadBatch(pending, label: "Pending upload")
|
||||
await updateStats()
|
||||
|
||||
} catch {
|
||||
log.warning("getPendingUploads failed: \(error.localizedDescription)")
|
||||
}
|
||||
|
||||
isSyncing = false
|
||||
currentOperation = ""
|
||||
}
|
||||
|
||||
|
|
@ -439,4 +398,10 @@ public final class SyncManager: ObservableObject {
|
|||
log.warning("updateStats failed: \(error.localizedDescription)")
|
||||
}
|
||||
}
|
||||
|
||||
private func setConnectionError(_ error: Error) {
|
||||
if SyncConnectionErrorHeuristic.isConnectionError(error) {
|
||||
syncError = .backendUnreachable
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue