macsync/@packages/imessage/Sources/IMessageSync/Sender.swift
Natalie 576496ca3e feat(deploy): video-projects FUSE mount over DO Spaces
Generalize the photos-originals rclone-mount pattern to a video-projects
prefix so the video studio (and imajin ETL, per storage-portability-plan
§2.3) can read/write multi-GB project sources/renders as local files while
only hot data stays resident on plum (bounded VFS LRU cache). Lets a
small-disk laptop work with large footage without filling APFS.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-28 21:10:13 -04:00

403 lines
17 KiB
Swift

import AppKit
import Foundation
import GRDB
import LilithLogging
import MacSyncShared
import os
private let log = AppLogger.logger(for: "IMessage.Send")
/// Unified-logging mirror so the AppleScript send path is observable via
/// `log show --predicate 'subsystem == "com.lilith.mac-sync"'` (host
/// journaling). AppLogger's sink is not reachable through `log show`.
private let osLog = os.Logger(subsystem: "com.lilith.mac-sync", category: "IMessage.Send")
/// One-shot guard so a continuation is resumed exactly once when two paths
/// race to finish it (a completion callback vs. a hard timeout).
private final class ResumeOnce: @unchecked Sendable {
private let lock = NSLock()
private var done = false
/// Returns `true` only for the first caller.
func claim() -> Bool {
lock.lock()
defer { lock.unlock() }
if done { return false }
done = true
return true
}
}
/// Sends iMessages via an AppleScript bridge to Messages.app and verifies the
/// result against chat.db.
///
/// This is an `actor` (deliberately NOT `@MainActor`): every operation here
/// spawning `osascript`, reading chat.db is blocking work that must run off
/// the main thread, so a slow or wedged send can never freeze the `@MainActor`
/// send-queue poller. Every blocking operation is additionally bounded by a
/// hard timeout, so no single send can stall the poller indefinitely.
actor SendService {
static let shared = SendService()
private var dailySendCount: Int = 0
private var hourlySendCount: Int = 0
private var lastHourReset: Date = Date()
private var lastDayReset: Date = Date()
private let maxDailyMessages: Int = 150
private let maxHourlyMessages: Int = 30
private init() {}
/// Messages service a send is routed through.
enum MessageServiceKind: String, Sendable {
case iMessage = "iMessage"
case sms = "SMS"
}
// MARK: - Validation (nonisolated for testability)
nonisolated static func validate(recipient: String) -> (buddyId: String, isEmail: Bool, error: String?) {
let isEmail = recipient.contains("@")
if isEmail {
let parts = recipient.split(separator: "@")
guard parts.count == 2,
let domain = parts.last,
domain.contains(".") else {
return ("", true, "Invalid email address format")
}
return (recipient, true, nil)
} else {
let cleaned = recipient.filter { $0.isNumber || $0 == "+" }
guard cleaned.count >= 10 else {
return ("", false, "Invalid phone number format")
}
return (cleaned, false, nil)
}
}
nonisolated static func generateMessageId(buddyId: String, isEmail: Bool) -> String {
let idSuffix: String
if isEmail {
let localPart = String(buddyId.split(separator: "@").first ?? "")
idSuffix = String(localPart.suffix(4))
} else {
idSuffix = String(buddyId.suffix(4))
}
return "send_\(idSuffix)_\(Int(Date().timeIntervalSince1970))"
}
/// Pure routing decision (nonisolated for testability): SMS-only history
/// SMS; iMessage history, or a brand-new number iMessage.
nonisolated static func decideService(imessageCount: Int, smsCount: Int) -> MessageServiceKind {
(smsCount > 0 && imessageCount == 0) ? .sms : .iMessage
}
// MARK: - Send
func send(recipient: String, body: String) async -> (success: Bool, messageId: String?, error: String?, channel: String?) {
resetCountersIfNeeded()
if dailySendCount >= maxDailyMessages {
return (false, nil, "Daily send limit reached (\(maxDailyMessages)/day)", nil)
}
if hourlySendCount >= maxHourlyMessages {
return (false, nil, "Hourly send limit reached (\(maxHourlyMessages)/hour)", nil)
}
let validation = Self.validate(recipient: recipient)
if let error = validation.error { return (false, nil, error, nil) }
let buddyId = validation.buddyId
let isEmail = validation.isEmail
let logPrefix = String(buddyId.prefix(4))
// Route by chat.db history bounded, so a chat.db lock cannot stall it.
let preferred = await Self.boundedPreferredService(buddyId: buddyId)
var attempt = await sendOnce(serviceType: preferred, buddyId: buddyId, body: body, logPrefix: logPrefix)
// Fallback: an iMessage-routed number that didn't actually deliver
// (not iMessage-reachable, service issue, ) retry once over SMS.
if !attempt.sent, preferred == .iMessage {
osLog.notice("send: iMessage to \(logPrefix, privacy: .public) failed (\(attempt.error ?? "?", privacy: .public)) — retrying via SMS")
attempt = await sendOnce(serviceType: .sms, buddyId: buddyId, body: body, logPrefix: logPrefix)
}
if attempt.sent {
dailySendCount += 1
hourlySendCount += 1
let messageId = Self.generateMessageId(buddyId: buddyId, isEmail: isEmail)
await Self.logActivity(success: true, "Message sent to \(logPrefix)... via \(attempt.usedService)")
log.info("Message sent to \(logPrefix)..., id: \(messageId), via \(attempt.usedService)")
osLog.notice("send: delivered to \(logPrefix, privacy: .public) via \(attempt.usedService, privacy: .public)")
return (true, messageId, nil, Self.normalizeChannel(attempt.usedService))
}
await Self.logActivity(success: false, "Message send failed to \(logPrefix)...: \(attempt.error ?? "unknown error")")
osLog.error("send: FAILED for \(logPrefix, privacy: .public): \(attempt.error ?? "unknown", privacy: .public)")
return (false, nil, attempt.error, Self.normalizeChannel(attempt.usedService))
}
/// Map a Messages service label ("iMessage"/"SMS") to the server's
/// `channel_used` enum ("imessage"/"sms"). Unknown nil.
nonisolated static func normalizeChannel(_ usedService: String) -> String? {
switch usedService.lowercased() {
case "imessage": return "imessage"
case "sms": return "sms"
default: return nil
}
}
/// Send `body` to `buddyId` over one specific Messages service, then verify
/// the result against chat.db `osascript` exiting 0 only means the
/// command was accepted, not that the message actually transmitted.
///
/// Every wait here is bounded: the osascript exit (45s), the chat.db
/// verification reads (10s each). Nothing here can stall the caller.
private func sendOnce(
serviceType: MessageServiceKind,
buddyId: String,
body: String,
logPrefix: String
) async -> (sent: Bool, usedService: String, error: String?) {
let sanitizedBody = body
.replacingOccurrences(of: "\\", with: "\\\\")
.replacingOccurrences(of: "\"", with: "\\\"")
let sanitizedBuddy = buddyId
.replacingOccurrences(of: "\\", with: "\\\\")
.replacingOccurrences(of: "\"", with: "\\\"")
let svc = serviceType.rawValue
// Resolve the service by iterating with a per-service `try`: a plain
// `1st service whose service type = ` throws (-1728) when any stale
// service errors on `service type`. Require an *enabled* service.
let script = """
tell application "Messages"
set targetService to missing value
repeat with s in services
try
if (service type of s) is \(svc) and (enabled of s) then
set targetService to s
exit repeat
end if
end try
end repeat
if targetService is missing value then error "no enabled \(svc) service"
set targetBuddy to buddy "\(sanitizedBuddy)" of targetService
send "\(sanitizedBody)" to targetBuddy
end tell
"""
let process = Process()
process.executableURL = URL(fileURLWithPath: "/usr/bin/osascript")
process.arguments = ["-e", script]
let errorPipe = Pipe()
process.standardError = errorPipe
process.standardOutput = Pipe()
osLog.notice("send: launching osascript (\(svc, privacy: .public)) for \(logPrefix, privacy: .public)...")
do {
try process.run()
} catch {
return (false, svc, error.localizedDescription)
}
// Bounded async wait for osascript to exit (45s cap). A wedged
// Messages.app makes osascript hang on its AppleEvent timeout; the
// cap guarantees the caller is released even then.
let sendTimeout: TimeInterval = 45
let timedOut: Bool = await withCheckedContinuation { (cont: CheckedContinuation<Bool, Never>) in
let once = ResumeOnce()
process.terminationHandler = { _ in
if once.claim() { cont.resume(returning: false) }
}
// Edge case: the process exited before the handler was attached.
if !process.isRunning, once.claim() {
cont.resume(returning: false)
}
DispatchQueue.global().asyncAfter(deadline: .now() + sendTimeout) {
if once.claim() { cont.resume(returning: true) }
}
}
if timedOut {
if process.isRunning { process.terminate() }
return (false, svc, "osascript send timed out after \(Int(sendTimeout))s")
}
if process.terminationStatus != 0 {
let errorData = errorPipe.fileHandleForReading.readDataToEndOfFile()
let msg = String(data: errorData, encoding: .utf8)?
.trimmingCharacters(in: .whitespacesAndNewlines)
?? "osascript exited with code \(process.terminationStatus)"
return (false, svc, msg)
}
// Verify against chat.db. Poll a delivery error (e.g. error 22) can
// take several seconds to register after osascript returns. Async
// sleeps (do not block the actor); each chat.db read is itself bounded.
for _ in 0..<8 {
try? await Task.sleep(nanoseconds: 2_000_000_000)
guard let status = await Self.boundedChatDBSendStatus(buddyId: buddyId) else { continue }
if status.error != 0 {
return (false, svc, "Messages error \(status.error)")
}
if status.isSent {
return (true, svc, nil)
}
}
// No error observed within the poll window accept as sent.
return (true, svc, nil)
}
func getRateLimitStatus() -> (dailyRemaining: Int, hourlyRemaining: Int) {
resetCountersIfNeeded()
return (max(0, maxDailyMessages - dailySendCount), max(0, maxHourlyMessages - hourlySendCount))
}
private func resetCountersIfNeeded() {
let now = Date()
if now.timeIntervalSince(lastHourReset) >= 3600 {
hourlySendCount = 0
lastHourReset = now
}
if !Calendar.current.isDate(now, inSameDayAs: lastDayReset) {
dailySendCount = 0
lastDayReset = now
}
}
// MARK: - ActivityLog bridge
/// `ActivityLog` is `@MainActor`; hop to it from this off-main actor.
nonisolated static func logActivity(success: Bool, _ message: String) async {
await MainActor.run {
if success {
ActivityLog.shared.success(message)
} else {
ActivityLog.shared.error(message)
}
}
}
// MARK: - chat.db reads (nonisolated; bounded wrappers below)
/// Delivery status for a previously-sent message, looked up by suffix.
/// `nonisolated` only reads chat.db, touches no actor state.
nonisolated func checkDelivery(messageId: String) -> String {
let parts = messageId.split(separator: "_")
guard parts.count >= 3,
let timestamp = Double(parts.last ?? "") else { return "pending" }
let phoneSuffix = String(parts[1])
let sentDate = Date(timeIntervalSince1970: timestamp)
guard let dbQueue = iMessageReader.shared.getDatabaseQueue() else { return "pending" }
do {
return try dbQueue.read { db in
let row = try Row.fetchOne(db, sql: """
SELECT m.date_delivered, m.date_read
FROM message m
JOIN chat_message_join cmj ON cmj.message_id = m.ROWID
JOIN chat_handle_join chj ON chj.chat_id = cmj.chat_id
JOIN handle h ON h.ROWID = chj.handle_id
WHERE m.is_from_me = 1
AND h.id LIKE ?
AND m.date > ?
ORDER BY m.date DESC
LIMIT 1
""", arguments: ["%\(phoneSuffix)", sentDate.timeIntervalSinceReferenceDate * 1_000_000_000])
guard let row = row else { return "pending" }
if row["date_read"] != nil && (row["date_read"] as? Int64 ?? 0) > 0 { return "read" }
if row["date_delivered"] != nil && (row["date_delivered"] as? Int64 ?? 0) > 0 { return "delivered" }
return "sent"
}
} catch {
log.warning("Failed to check delivery: \(error)")
return "pending"
}
}
/// Lifecycle of the most-recent outbound message to `buddyId` in chat.db.
/// `error != 0` means Messages failed the send even when `osascript`
/// exited 0; `isSent` is the ground-truth sent flag. Synchronous
/// callers in the send path use `boundedChatDBSendStatus`.
nonisolated static func chatDBSendStatus(buddyId: String) -> (error: Int, isSent: Bool)? {
guard let dbQueue = iMessageReader.shared.getDatabaseQueue() else { return nil }
let suffix = String(buddyId.suffix(7))
return (try? dbQueue.read { db -> (Int, Bool)? in
guard let row = try Row.fetchOne(db, sql: """
SELECT m.error AS err, m.is_sent AS sent
FROM message m
JOIN handle h ON h.ROWID = m.handle_id
WHERE m.is_from_me = 1 AND h.id LIKE ?
ORDER BY m.date DESC
LIMIT 1
""", arguments: ["%\(suffix)"])
else { return nil }
let err = (row["err"] as? Int64).map(Int.init) ?? 0
let sent = ((row["sent"] as? Int64) ?? 0) != 0
return (err, sent)
}) ?? nil
}
/// Per-service message-count history for `buddyId` in chat.db. Synchronous
/// callers in the send path use `boundedPreferredService`.
nonisolated static func preferredService(buddyId: String) -> MessageServiceKind {
guard let dbQueue = iMessageReader.shared.getDatabaseQueue() else { return .iMessage }
let suffix = String(buddyId.suffix(7))
let kind: MessageServiceKind? = try? dbQueue.read { db -> MessageServiceKind in
var imessage = 0
var sms = 0
let rows = try Row.fetchAll(db, sql: """
SELECT m.service AS svc, COUNT(*) AS c
FROM message m
JOIN handle h ON h.ROWID = m.handle_id
WHERE h.id LIKE ?
GROUP BY m.service
""", arguments: ["%\(suffix)"])
for row in rows {
let svc = (row["svc"] as? String) ?? ""
let count = (row["c"] as? Int64).map(Int.init) ?? 0
if svc == "iMessage" { imessage += count }
if svc == "SMS" { sms += count }
}
return decideService(imessageCount: imessage, smsCount: sms)
}
return kind ?? .iMessage
}
// MARK: - Bounded wrappers
/// Runs a blocking value-producing closure on a background queue, bounded
/// by `seconds`. On timeout the `fallback` is returned (the work keeps
/// running in the background it cannot be cancelled mid-syscall but
/// the caller, and the poller awaiting it, is released).
nonisolated static func bounded<T: Sendable>(
_ seconds: TimeInterval,
fallback: T,
_ work: @escaping @Sendable () -> T
) async -> T {
await withCheckedContinuation { (cont: CheckedContinuation<T, Never>) in
let once = ResumeOnce()
DispatchQueue.global().async {
let value = work()
if once.claim() { cont.resume(returning: value) }
}
DispatchQueue.global().asyncAfter(deadline: .now() + seconds) {
if once.claim() { cont.resume(returning: fallback) }
}
}
}
/// `chatDBSendStatus`, bounded a chat.db lock cannot stall the send.
nonisolated static func boundedChatDBSendStatus(buddyId: String) async -> (error: Int, isSent: Bool)? {
await bounded(10, fallback: nil) { chatDBSendStatus(buddyId: buddyId) }
}
/// `preferredService`, bounded a chat.db lock cannot stall the send.
nonisolated static func boundedPreferredService(buddyId: String) async -> MessageServiceKind {
await bounded(10, fallback: .iMessage) { preferredService(buddyId: buddyId) }
}
}