153 lines
6.5 KiB
Swift
153 lines
6.5 KiB
Swift
import Foundation
|
|
import LilithLogging
|
|
import os
|
|
|
|
private let log = AppLogger.logger(for: "Sync.SendQueue")
|
|
|
|
/// Unified-logging mirror. AppLogger's sink is not reachable via `log show`,
|
|
/// which makes the send-queue poller invisible to remote diagnosis. This
|
|
/// `os.Logger` writes to host journaling — query with:
|
|
/// log show --predicate 'subsystem == "com.lilith.mac-sync"' --info
|
|
private let osLog = os.Logger(subsystem: "com.lilith.mac-sync", category: "Sync.SendQueue")
|
|
|
|
/// Result of applying a single pending send-queue item locally.
|
|
public enum SendQueueApplyResult: Sendable {
|
|
case sent
|
|
case failed(reason: String)
|
|
}
|
|
|
|
/// Transport abstraction the `SendQueueClient` polls. Implementations adapt a
|
|
/// module's own typed APIClient methods (e.g. `getPendingCalendarSends()`,
|
|
/// `reportCalendarSendResult(_:status:error:)`) into this generic shape so
|
|
/// the polling loop itself is written exactly once.
|
|
public protocol SendQueueTransport: Sendable {
|
|
associatedtype PendingItem: Sendable
|
|
/// Stable identifier used to ack the item back to the server.
|
|
func id(of item: PendingItem) -> String
|
|
/// Fetch all currently-queued items for this device.
|
|
func fetchPending() async throws -> [PendingItem]
|
|
/// Report success/failure of applying a single item.
|
|
func reportResult(id: String, status: String, error: String?) async throws
|
|
}
|
|
|
|
/// Action that applies a pending item to the local OS (EventKit save,
|
|
/// AppleScript send, etc.). Returning `.failed` causes the client to ack the
|
|
/// item as failed without throwing.
|
|
///
|
|
/// Pinned to `@MainActor` because every module's applier (EKEventStore-based
|
|
/// Senders, AppleScript-based Senders, the iMessage SendService) is a
|
|
/// `@MainActor` type. Without this annotation each module would need a
|
|
/// trivial `await MainActor.run { … }` wrapper inside the closure.
|
|
public typealias SendQueueApply<Item> = @MainActor @Sendable (Item) async -> SendQueueApplyResult
|
|
|
|
/// Generic poll/apply/ack loop used by every bidirectional module. Owns its
|
|
/// own `Timer` so it can run on a different cadence than the read-sync loop
|
|
/// (iMessage polls every 30s, calendar/reminders every 60s, notes every
|
|
/// 600s, etc.).
|
|
///
|
|
/// This is `@MainActor` because Timer scheduling and the surrounding
|
|
/// SyncManagers all live on the main actor. The actual `apply` closure may
|
|
/// hop to other actors internally — the closure is `Sendable`.
|
|
@MainActor
|
|
public final class SendQueueClient<Transport: SendQueueTransport> {
|
|
public typealias Item = Transport.PendingItem
|
|
|
|
private let transport: Transport
|
|
private let apply: SendQueueApply<Item>
|
|
private let interval: TimeInterval
|
|
private let label: String
|
|
|
|
private var timer: Timer?
|
|
private var isProcessing: Bool = false
|
|
|
|
public init(
|
|
label: String,
|
|
transport: Transport,
|
|
interval: TimeInterval,
|
|
apply: @escaping SendQueueApply<Item>
|
|
) {
|
|
precondition(!label.isEmpty, "label must be non-empty")
|
|
precondition(interval > 0, "interval must be positive")
|
|
self.label = label
|
|
self.transport = transport
|
|
self.interval = interval
|
|
self.apply = apply
|
|
}
|
|
|
|
deinit {
|
|
timer?.invalidate()
|
|
}
|
|
|
|
/// Begin periodic polling. Kicks one drain immediately, then schedules a
|
|
/// repeating `Timer`.
|
|
public func start() {
|
|
log.info("[\(self.label)] SendQueueClient.start interval=\(self.interval)s")
|
|
osLog.notice("[\(self.label, privacy: .public)] start interval=\(self.interval, privacy: .public)s")
|
|
TraceLog.write("[\(self.label)] SendQueueClient.start interval=\(self.interval)s")
|
|
Task { [weak self] in await self?.drainOnce() }
|
|
timer?.invalidate()
|
|
timer = Timer.scheduledTimer(withTimeInterval: interval, repeats: true) { [weak self] _ in
|
|
Task { @MainActor in await self?.drainOnce() }
|
|
}
|
|
}
|
|
|
|
/// Stop periodic polling. Any in-flight drain finishes naturally.
|
|
public func stop() {
|
|
timer?.invalidate()
|
|
timer = nil
|
|
}
|
|
|
|
/// Drain queued items once. Public so SyncManagers can trigger a manual
|
|
/// flush (e.g. right after a server-side enqueue is known to be pending).
|
|
public func drainOnce() async {
|
|
TraceLog.write("[\(self.label)] drainOnce entered (isProcessing=\(self.isProcessing))")
|
|
guard !isProcessing else {
|
|
osLog.notice("[\(self.label, privacy: .public)] drainOnce skipped — previous drain still in flight")
|
|
TraceLog.write("[\(self.label)] drainOnce skipped — previous drain in flight")
|
|
return
|
|
}
|
|
isProcessing = true
|
|
defer { isProcessing = false }
|
|
|
|
let pending: [Item]
|
|
do {
|
|
pending = try await transport.fetchPending()
|
|
} catch {
|
|
log.warning("[\(self.label)] fetchPending failed: \(error.localizedDescription)")
|
|
osLog.error("[\(self.label, privacy: .public)] fetchPending failed: \(error.localizedDescription, privacy: .public)")
|
|
TraceLog.write("[\(self.label)] fetchPending FAILED: \(error.localizedDescription)")
|
|
return
|
|
}
|
|
|
|
TraceLog.write("[\(self.label)] fetchPending → \(pending.count) item(s)")
|
|
guard !pending.isEmpty else {
|
|
osLog.debug("[\(self.label, privacy: .public)] drainOnce — 0 pending")
|
|
return
|
|
}
|
|
log.info("[\(self.label)] draining \(pending.count) pending item(s)")
|
|
osLog.notice("[\(self.label, privacy: .public)] draining \(pending.count, privacy: .public) pending item(s)")
|
|
|
|
for item in pending {
|
|
let id = transport.id(of: item)
|
|
let result = await apply(item)
|
|
switch result {
|
|
case .sent:
|
|
osLog.notice("[\(self.label, privacy: .public)] item \(id, privacy: .public) → sent")
|
|
case .failed(let reason):
|
|
osLog.error("[\(self.label, privacy: .public)] item \(id, privacy: .public) → failed: \(reason, privacy: .public)")
|
|
}
|
|
do {
|
|
switch result {
|
|
case .sent:
|
|
try await transport.reportResult(id: id, status: "sent", error: nil)
|
|
case .failed(let reason):
|
|
try await transport.reportResult(id: id, status: "failed", error: reason)
|
|
}
|
|
} catch {
|
|
// Server unreachable while reporting; the item will be
|
|
// retried on the next drain cycle. Do not surface to the UI.
|
|
log.warning("[\(self.label)] reportResult failed for id=\(id): \(error.localizedDescription)")
|
|
}
|
|
}
|
|
}
|
|
}
|