import Foundation import LilithLogging import os private let log = AppLogger.logger(for: "Sync.SendQueue") /// Unified-logging mirror. AppLogger's sink is not reachable via `log show`, /// which makes the send-queue poller invisible to remote diagnosis. This /// `os.Logger` writes to host journaling — query with: /// log show --predicate 'subsystem == "com.lilith.mac-sync"' --info private let osLog = os.Logger(subsystem: "com.lilith.mac-sync", category: "Sync.SendQueue") /// Result of applying a single pending send-queue item locally. public enum SendQueueApplyResult: Sendable { case sent case failed(reason: String) } /// Transport abstraction the `SendQueueClient` polls. Implementations adapt a /// module's own typed APIClient methods (e.g. `getPendingCalendarSends()`, /// `reportCalendarSendResult(_:status:error:)`) into this generic shape so /// the polling loop itself is written exactly once. public protocol SendQueueTransport: Sendable { associatedtype PendingItem: Sendable /// Stable identifier used to ack the item back to the server. func id(of item: PendingItem) -> String /// Fetch all currently-queued items for this device. func fetchPending() async throws -> [PendingItem] /// Report success/failure of applying a single item. func reportResult(id: String, status: String, error: String?) async throws } /// Action that applies a pending item to the local OS (EventKit save, /// AppleScript send, etc.). Returning `.failed` causes the client to ack the /// item as failed without throwing. /// /// Pinned to `@MainActor` because every module's applier (EKEventStore-based /// Senders, AppleScript-based Senders, the iMessage SendService) is a /// `@MainActor` type. Without this annotation each module would need a /// trivial `await MainActor.run { … }` wrapper inside the closure. public typealias SendQueueApply = @MainActor @Sendable (Item) async -> SendQueueApplyResult /// Generic poll/apply/ack loop used by every bidirectional module. Owns its /// own `Timer` so it can run on a different cadence than the read-sync loop /// (iMessage polls every 30s, calendar/reminders every 60s, notes every /// 600s, etc.). /// /// This is `@MainActor` because Timer scheduling and the surrounding /// SyncManagers all live on the main actor. The actual `apply` closure may /// hop to other actors internally — the closure is `Sendable`. @MainActor public final class SendQueueClient { public typealias Item = Transport.PendingItem private let transport: Transport private let apply: SendQueueApply private let interval: TimeInterval private let label: String private var timer: Timer? private var isProcessing: Bool = false /// Timestamp of the last forward progress — drain start, or an item /// completing. Drives the stall watchdog in `drainOnce`. private var drainProgressAt: Date? /// Bumped when the watchdog orphans a stuck drain, so that drain's /// `defer` cannot clobber a newer drain's state. private var drainGeneration = 0 public init( label: String, transport: Transport, interval: TimeInterval, apply: @escaping SendQueueApply ) { precondition(!label.isEmpty, "label must be non-empty") precondition(interval > 0, "interval must be positive") self.label = label self.transport = transport self.interval = interval self.apply = apply } deinit { timer?.invalidate() } /// Begin periodic polling. Kicks one drain immediately, then schedules a /// repeating `Timer`. public func start() { log.info("[\(self.label)] SendQueueClient.start interval=\(self.interval)s") osLog.notice("[\(self.label, privacy: .public)] start interval=\(self.interval, privacy: .public)s") TraceLog.write("[\(self.label)] SendQueueClient.start interval=\(self.interval)s") Task { [weak self] in await self?.drainOnce() } timer?.invalidate() timer = Timer.scheduledTimer(withTimeInterval: interval, repeats: true) { [weak self] _ in Task { @MainActor in await self?.drainOnce() } } } /// Stop periodic polling. Any in-flight drain finishes naturally. public func stop() { timer?.invalidate() timer = nil } /// Drain queued items once. Public so SyncManagers can trigger a manual /// flush (e.g. right after a server-side enqueue is known to be pending). public func drainOnce() async { TraceLog.write("[\(self.label)] drainOnce entered (isProcessing=\(self.isProcessing))") if isProcessing { // Watchdog: a drain that has made no progress for >360s is // genuinely stuck — every per-item operation is bounded to well // under that. Force-reset so the poller self-heals; bump the // generation so the stuck drain's `defer` cannot clobber the // newer drain's state if it ever resumes. if let progress = drainProgressAt, Date().timeIntervalSince(progress) > 360 { osLog.error("[\(self.label, privacy: .public)] watchdog: drain stalled >360s — force-resetting") TraceLog.write("[\(self.label)] watchdog: force-reset stalled drain") drainGeneration &+= 1 } else { osLog.notice("[\(self.label, privacy: .public)] drainOnce skipped — previous drain still in flight") TraceLog.write("[\(self.label)] drainOnce skipped — previous drain in flight") return } } isProcessing = true drainProgressAt = Date() let generation = drainGeneration defer { // Only clear shared state if this drain still owns it — the // watchdog may have orphaned it in favour of a newer drain. if generation == self.drainGeneration { isProcessing = false drainProgressAt = nil } } let pending: [Item] do { pending = try await transport.fetchPending() } catch { log.warning("[\(self.label)] fetchPending failed: \(error.localizedDescription)") osLog.error("[\(self.label, privacy: .public)] fetchPending failed: \(error.localizedDescription, privacy: .public)") TraceLog.write("[\(self.label)] fetchPending FAILED: \(error.localizedDescription)") return } TraceLog.write("[\(self.label)] fetchPending → \(pending.count) item(s)") guard !pending.isEmpty else { osLog.debug("[\(self.label, privacy: .public)] drainOnce — 0 pending") return } log.info("[\(self.label)] draining \(pending.count) pending item(s)") osLog.notice("[\(self.label, privacy: .public)] draining \(pending.count, privacy: .public) pending item(s)") for item in pending { drainProgressAt = Date() let id = transport.id(of: item) let result = await apply(item) switch result { case .sent: osLog.notice("[\(self.label, privacy: .public)] item \(id, privacy: .public) → sent") case .failed(let reason): osLog.error("[\(self.label, privacy: .public)] item \(id, privacy: .public) → failed: \(reason, privacy: .public)") } do { switch result { case .sent: try await transport.reportResult(id: id, status: "sent", error: nil) case .failed(let reason): try await transport.reportResult(id: id, status: "failed", error: reason) } } catch { // Server unreachable while reporting; the item will be // retried on the next drain cycle. Do not surface to the UI. log.warning("[\(self.label)] reportResult failed for id=\(id): \(error.localizedDescription)") } } } }