macsync/@packages/shared/Sources/MacSyncShared/Sync/SendQueueClient.swift

126 lines
4.8 KiB
Swift

import Foundation
import LilithLogging
private let log = AppLogger.logger(for: "Sync.SendQueue")
/// Result of applying a single pending send-queue item locally.
public enum SendQueueApplyResult: Sendable {
case sent
case failed(reason: String)
}
/// Transport abstraction the `SendQueueClient` polls. Implementations adapt a
/// module's own typed APIClient methods (e.g. `getPendingCalendarSends()`,
/// `reportCalendarSendResult(_:status:error:)`) into this generic shape so
/// the polling loop itself is written exactly once.
public protocol SendQueueTransport: Sendable {
associatedtype PendingItem: Sendable
/// Stable identifier used to ack the item back to the server.
func id(of item: PendingItem) -> String
/// Fetch all currently-queued items for this device.
func fetchPending() async throws -> [PendingItem]
/// Report success/failure of applying a single item.
func reportResult(id: String, status: String, error: String?) async throws
}
/// Action that applies a pending item to the local OS (EventKit save,
/// AppleScript send, etc.). Returning `.failed` causes the client to ack the
/// item as failed without throwing.
///
/// Pinned to `@MainActor` because every module's applier (EKEventStore-based
/// Senders, AppleScript-based Senders, the iMessage SendService) is a
/// `@MainActor` type. Without this annotation each module would need a
/// trivial `await MainActor.run { }` wrapper inside the closure.
public typealias SendQueueApply<Item> = @MainActor @Sendable (Item) async -> SendQueueApplyResult
/// Generic poll/apply/ack loop used by every bidirectional module. Owns its
/// own `Timer` so it can run on a different cadence than the read-sync loop
/// (iMessage polls every 30s, calendar/reminders every 60s, notes every
/// 600s, etc.).
///
/// This is `@MainActor` because Timer scheduling and the surrounding
/// SyncManagers all live on the main actor. The actual `apply` closure may
/// hop to other actors internally the closure is `Sendable`.
@MainActor
public final class SendQueueClient<Transport: SendQueueTransport> {
public typealias Item = Transport.PendingItem
private let transport: Transport
private let apply: SendQueueApply<Item>
private let interval: TimeInterval
private let label: String
private var timer: Timer?
private var isProcessing: Bool = false
public init(
label: String,
transport: Transport,
interval: TimeInterval,
apply: @escaping SendQueueApply<Item>
) {
precondition(!label.isEmpty, "label must be non-empty")
precondition(interval > 0, "interval must be positive")
self.label = label
self.transport = transport
self.interval = interval
self.apply = apply
}
deinit {
timer?.invalidate()
}
/// Begin periodic polling. Kicks one drain immediately, then schedules a
/// repeating `Timer`.
public func start() {
log.info("[\(self.label)] SendQueueClient.start interval=\(self.interval)s")
Task { [weak self] in await self?.drainOnce() }
timer?.invalidate()
timer = Timer.scheduledTimer(withTimeInterval: interval, repeats: true) { [weak self] _ in
Task { @MainActor in await self?.drainOnce() }
}
}
/// Stop periodic polling. Any in-flight drain finishes naturally.
public func stop() {
timer?.invalidate()
timer = nil
}
/// Drain queued items once. Public so SyncManagers can trigger a manual
/// flush (e.g. right after a server-side enqueue is known to be pending).
public func drainOnce() async {
guard !isProcessing else { return }
isProcessing = true
defer { isProcessing = false }
let pending: [Item]
do {
pending = try await transport.fetchPending()
} catch {
log.warning("[\(self.label)] fetchPending failed: \(error.localizedDescription)")
return
}
guard !pending.isEmpty else { return }
log.info("[\(self.label)] draining \(pending.count) pending item(s)")
for item in pending {
let id = transport.id(of: item)
let result = await apply(item)
do {
switch result {
case .sent:
try await transport.reportResult(id: id, status: "sent", error: nil)
case .failed(let reason):
try await transport.reportResult(id: id, status: "failed", error: reason)
}
} catch {
// Server unreachable while reporting; the item will be
// retried on the next drain cycle. Do not surface to the UI.
log.warning("[\(self.label)] reportResult failed for id=\(id): \(error.localizedDescription)")
}
}
}
}