macsync/@packages/imessage/Sources/IMessageSync/OutboxSendManager.swift
Natalie 576496ca3e feat(deploy): video-projects FUSE mount over DO Spaces
Generalize the photos-originals rclone-mount pattern to a video-projects
prefix so the video studio (and imajin ETL, per storage-portability-plan
§2.3) can read/write multi-GB project sources/renders as local files while
only hot data stays resident on plum (bounded VFS LRU cache). Lets a
small-disk laptop work with large footage without filling APFS.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-28 21:10:13 -04:00

125 lines
5 KiB
Swift
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import Foundation
import LilithLogging
import MacSyncShared
import os
private let log = AppLogger.logger(for: "Outbox.Send")
private let osLog = os.Logger(subsystem: "com.lilith.mac-sync", category: "Outbox.Send")
/// Drains the server-side outbox (`/client/outbox/*`) one message at a time,
/// **paced** so a backlog can never burst the Messages bridge (the "stream
/// closed after ~8 at once" failure this exists to prevent). Reuses
/// `SendService` for the channel-smart send (iMessage SMS fallback) and its
/// chat.db delivery verification, then writes the *true* status including the
/// channel actually used back to the server.
///
/// Self-rescheduling Timer rather than a fixed-interval poll: after each send it
/// waits a jittered `pacedDelayRange` before the next, which both spaces sends
/// and caps the per-minute rate. When nothing is due it idles at `idleInterval`.
@MainActor
final class OutboxSendManager {
private let api: OutboxAPI
private let sendService: SendService
private let activityLog: ActivityLog
/// Seconds to wait between consecutive sends (jittered) the anti-burst pace.
private let pacedDelayRange: ClosedRange<Double> = 20...90
/// Poll cadence when the queue is empty.
private let idleInterval: TimeInterval = 30
/// How many due items to fetch per tick (only the hottest is sent each tick).
private let fetchLimit = 3
private var timer: Timer?
private var isDraining = false
init(api: OutboxAPI, sendService: SendService, activityLog: ActivityLog) {
self.api = api
self.sendService = sendService
self.activityLog = activityLog
}
deinit { timer?.invalidate() }
func start() {
log.info("OutboxSendManager.start (pace \(self.pacedDelayRange.lowerBound)-\(self.pacedDelayRange.upperBound)s)")
osLog.notice("outbox: start")
scheduleNext(after: 2)
}
func stop() {
timer?.invalidate()
timer = nil
}
/// Jittered delay between sends. Pure + injectable so the pacing is testable.
nonisolated static func pacedDelaySeconds(_ range: ClosedRange<Double>, rand: () -> Double) -> TimeInterval {
range.lowerBound + rand() * (range.upperBound - range.lowerBound)
}
private func pacedDelay() -> TimeInterval {
Self.pacedDelaySeconds(pacedDelayRange) { Double.random(in: 0...1) }
}
private func scheduleNext(after delay: TimeInterval) {
timer?.invalidate()
timer = Timer.scheduledTimer(withTimeInterval: delay, repeats: false) { [weak self] _ in
Task { @MainActor in await self?.tick() }
}
}
private func tick() async {
guard !isDraining else { return }
isDraining = true
defer { isDraining = false }
let due: [OutboxDueItem]
do {
due = try await api.fetchOutboxDue(limit: fetchLimit)
} catch {
log.warning("fetchOutboxDue failed: \(error.localizedDescription)")
scheduleNext(after: idleInterval)
return
}
guard let item = due.first else {
scheduleNext(after: idleInterval)
return
}
// Claim (queued sending) before sending so a concurrent/next tick
// can't re-fetch and double-send the same row. A failed claim (lost
// race / 404) just means someone else has it move on, paced.
do {
try await api.claimOutbox(id: item.id)
} catch {
log.warning("claimOutbox \(item.id) failed: \(error.localizedDescription)")
scheduleNext(after: pacedDelay())
return
}
let suffix = String(item.recipient.suffix(4))
let result = await sendService.send(recipient: item.recipient, body: item.body)
do {
if result.success {
try await api.reportOutboxResult(id: item.id, status: "sent", channelUsed: result.channel, error: nil)
activityLog.success("Outbox → …\(suffix) sent via \(result.channel ?? "?")")
osLog.notice("outbox: \(item.id, privacy: .public) sent via \(result.channel ?? "?", privacy: .public)")
} else {
let reason = result.error ?? "unknown error"
try await api.reportOutboxResult(id: item.id, status: "failed", channelUsed: result.channel, error: reason)
activityLog.error("Outbox → …\(suffix) failed: \(reason)")
osLog.error("outbox: \(item.id, privacy: .public) failed: \(reason, privacy: .public)")
}
} catch {
// Status not recorded the row stays 'sending' and won't be
// re-fetched (listDue is queued-only). A server-side sweep that
// re-queues stale 'sending' rows after a timeout is the intended
// backstop (noted in the handoff); not this loop's job.
log.warning("reportOutboxResult \(item.id) failed: \(error.localizedDescription)")
}
// One send per jittered 2090s never a burst, even with a backlog.
scheduleNext(after: pacedDelay())
}
}