macsync/@packages/imessage/Sources/IMessageSync/OutboxSendManager.swift

126 lines
5 KiB
Swift
Raw Normal View History

import Foundation
import LilithLogging
import MacSyncShared
import os
private let log = AppLogger.logger(for: "Outbox.Send")
private let osLog = os.Logger(subsystem: "com.lilith.mac-sync", category: "Outbox.Send")
/// Drains the server-side outbox (`/client/outbox/*`) one message at a time,
/// **paced** so a backlog can never burst the Messages bridge (the "stream
/// closed after ~8 at once" failure this exists to prevent). Reuses
/// `SendService` for the channel-smart send (iMessage SMS fallback) and its
/// chat.db delivery verification, then writes the *true* status including the
/// channel actually used back to the server.
///
/// Self-rescheduling Timer rather than a fixed-interval poll: after each send it
/// waits a jittered `pacedDelayRange` before the next, which both spaces sends
/// and caps the per-minute rate. When nothing is due it idles at `idleInterval`.
@MainActor
final class OutboxSendManager {
private let api: OutboxAPI
private let sendService: SendService
private let activityLog: ActivityLog
/// Seconds to wait between consecutive sends (jittered) the anti-burst pace.
private let pacedDelayRange: ClosedRange<Double> = 20...90
/// Poll cadence when the queue is empty.
private let idleInterval: TimeInterval = 30
/// How many due items to fetch per tick (only the hottest is sent each tick).
private let fetchLimit = 3
private var timer: Timer?
private var isDraining = false
init(api: OutboxAPI, sendService: SendService, activityLog: ActivityLog) {
self.api = api
self.sendService = sendService
self.activityLog = activityLog
}
deinit { timer?.invalidate() }
func start() {
log.info("OutboxSendManager.start (pace \(self.pacedDelayRange.lowerBound)-\(self.pacedDelayRange.upperBound)s)")
osLog.notice("outbox: start")
scheduleNext(after: 2)
}
func stop() {
timer?.invalidate()
timer = nil
}
/// Jittered delay between sends. Pure + injectable so the pacing is testable.
nonisolated static func pacedDelaySeconds(_ range: ClosedRange<Double>, rand: () -> Double) -> TimeInterval {
range.lowerBound + rand() * (range.upperBound - range.lowerBound)
}
private func pacedDelay() -> TimeInterval {
Self.pacedDelaySeconds(pacedDelayRange) { Double.random(in: 0...1) }
}
private func scheduleNext(after delay: TimeInterval) {
timer?.invalidate()
timer = Timer.scheduledTimer(withTimeInterval: delay, repeats: false) { [weak self] _ in
Task { @MainActor in await self?.tick() }
}
}
private func tick() async {
guard !isDraining else { return }
isDraining = true
defer { isDraining = false }
let due: [OutboxDueItem]
do {
due = try await api.fetchOutboxDue(limit: fetchLimit)
} catch {
log.warning("fetchOutboxDue failed: \(error.localizedDescription)")
scheduleNext(after: idleInterval)
return
}
guard let item = due.first else {
scheduleNext(after: idleInterval)
return
}
// Claim (queued sending) before sending so a concurrent/next tick
// can't re-fetch and double-send the same row. A failed claim (lost
// race / 404) just means someone else has it move on, paced.
do {
try await api.claimOutbox(id: item.id)
} catch {
log.warning("claimOutbox \(item.id) failed: \(error.localizedDescription)")
scheduleNext(after: pacedDelay())
return
}
let suffix = String(item.recipient.suffix(4))
let result = await sendService.send(recipient: item.recipient, body: item.body)
do {
if result.success {
try await api.reportOutboxResult(id: item.id, status: "sent", channelUsed: result.channel, error: nil)
activityLog.success("Outbox → …\(suffix) sent via \(result.channel ?? "?")")
osLog.notice("outbox: \(item.id, privacy: .public) sent via \(result.channel ?? "?", privacy: .public)")
} else {
let reason = result.error ?? "unknown error"
try await api.reportOutboxResult(id: item.id, status: "failed", channelUsed: result.channel, error: reason)
activityLog.error("Outbox → …\(suffix) failed: \(reason)")
osLog.error("outbox: \(item.id, privacy: .public) failed: \(reason, privacy: .public)")
}
} catch {
// Status not recorded the row stays 'sending' and won't be
// re-fetched (listDue is queued-only). A server-side sweep that
// re-queues stale 'sending' rows after a timeout is the intended
// backstop (noted in the handoff); not this loop's job.
log.warning("reportOutboxResult \(item.id) failed: \(error.localizedDescription)")
}
// One send per jittered 2090s never a burst, even with a backlog.
scheduleNext(after: pacedDelay())
}
}