import AppKit import Foundation import GRDB import LilithLogging import MacSyncShared import os private let log = AppLogger.logger(for: "IMessage.Send") /// Unified-logging mirror so the AppleScript send path is observable via /// `log show --predicate 'subsystem == "com.lilith.mac-sync"'` (host /// journaling). AppLogger's sink is not reachable through `log show`. private let osLog = os.Logger(subsystem: "com.lilith.mac-sync", category: "IMessage.Send") /// One-shot guard so a continuation is resumed exactly once when two paths /// race to finish it (a completion callback vs. a hard timeout). private final class ResumeOnce: @unchecked Sendable { private let lock = NSLock() private var done = false /// Returns `true` only for the first caller. func claim() -> Bool { lock.lock() defer { lock.unlock() } if done { return false } done = true return true } } /// Sends iMessages via an AppleScript bridge to Messages.app and verifies the /// result against chat.db. /// /// This is an `actor` (deliberately NOT `@MainActor`): every operation here — /// spawning `osascript`, reading chat.db — is blocking work that must run off /// the main thread, so a slow or wedged send can never freeze the `@MainActor` /// send-queue poller. Every blocking operation is additionally bounded by a /// hard timeout, so no single send can stall the poller indefinitely. actor SendService { static let shared = SendService() private var dailySendCount: Int = 0 private var hourlySendCount: Int = 0 private var lastHourReset: Date = Date() private var lastDayReset: Date = Date() private let maxDailyMessages: Int = 150 private let maxHourlyMessages: Int = 30 private init() {} /// Messages service a send is routed through. enum MessageServiceKind: String, Sendable { case iMessage = "iMessage" case sms = "SMS" } // MARK: - Validation (nonisolated for testability) nonisolated static func validate(recipient: String) -> (buddyId: String, isEmail: Bool, error: String?) { let isEmail = recipient.contains("@") if isEmail { let parts = recipient.split(separator: "@") guard parts.count == 2, let domain = parts.last, domain.contains(".") else { return ("", true, "Invalid email address format") } return (recipient, true, nil) } else { let cleaned = recipient.filter { $0.isNumber || $0 == "+" } guard cleaned.count >= 10 else { return ("", false, "Invalid phone number format") } return (cleaned, false, nil) } } nonisolated static func generateMessageId(buddyId: String, isEmail: Bool) -> String { let idSuffix: String if isEmail { let localPart = String(buddyId.split(separator: "@").first ?? "") idSuffix = String(localPart.suffix(4)) } else { idSuffix = String(buddyId.suffix(4)) } return "send_\(idSuffix)_\(Int(Date().timeIntervalSince1970))" } /// Pure routing decision (nonisolated for testability): SMS-only history /// → SMS; iMessage history, or a brand-new number → iMessage. nonisolated static func decideService(imessageCount: Int, smsCount: Int) -> MessageServiceKind { (smsCount > 0 && imessageCount == 0) ? .sms : .iMessage } // MARK: - Send func send(recipient: String, body: String) async -> (success: Bool, messageId: String?, error: String?, channel: String?) { resetCountersIfNeeded() if dailySendCount >= maxDailyMessages { return (false, nil, "Daily send limit reached (\(maxDailyMessages)/day)", nil) } if hourlySendCount >= maxHourlyMessages { return (false, nil, "Hourly send limit reached (\(maxHourlyMessages)/hour)", nil) } let validation = Self.validate(recipient: recipient) if let error = validation.error { return (false, nil, error, nil) } let buddyId = validation.buddyId let isEmail = validation.isEmail let logPrefix = String(buddyId.prefix(4)) // Route by chat.db history — bounded, so a chat.db lock cannot stall it. let preferred = await Self.boundedPreferredService(buddyId: buddyId) var attempt = await sendOnce(serviceType: preferred, buddyId: buddyId, body: body, logPrefix: logPrefix) // Fallback: an iMessage-routed number that didn't actually deliver // (not iMessage-reachable, service issue, …) — retry once over SMS. if !attempt.sent, preferred == .iMessage { osLog.notice("send: iMessage to \(logPrefix, privacy: .public) failed (\(attempt.error ?? "?", privacy: .public)) — retrying via SMS") attempt = await sendOnce(serviceType: .sms, buddyId: buddyId, body: body, logPrefix: logPrefix) } if attempt.sent { dailySendCount += 1 hourlySendCount += 1 let messageId = Self.generateMessageId(buddyId: buddyId, isEmail: isEmail) await Self.logActivity(success: true, "Message sent to \(logPrefix)... via \(attempt.usedService)") log.info("Message sent to \(logPrefix)..., id: \(messageId), via \(attempt.usedService)") osLog.notice("send: delivered to \(logPrefix, privacy: .public) via \(attempt.usedService, privacy: .public)") return (true, messageId, nil, Self.normalizeChannel(attempt.usedService)) } await Self.logActivity(success: false, "Message send failed to \(logPrefix)...: \(attempt.error ?? "unknown error")") osLog.error("send: FAILED for \(logPrefix, privacy: .public): \(attempt.error ?? "unknown", privacy: .public)") return (false, nil, attempt.error, Self.normalizeChannel(attempt.usedService)) } /// Map a Messages service label ("iMessage"/"SMS") to the server's /// `channel_used` enum ("imessage"/"sms"). Unknown → nil. nonisolated static func normalizeChannel(_ usedService: String) -> String? { switch usedService.lowercased() { case "imessage": return "imessage" case "sms": return "sms" default: return nil } } /// Send `body` to `buddyId` over one specific Messages service, then verify /// the result against chat.db — `osascript` exiting 0 only means the /// command was accepted, not that the message actually transmitted. /// /// Every wait here is bounded: the osascript exit (45s), the chat.db /// verification reads (10s each). Nothing here can stall the caller. private func sendOnce( serviceType: MessageServiceKind, buddyId: String, body: String, logPrefix: String ) async -> (sent: Bool, usedService: String, error: String?) { let sanitizedBody = body .replacingOccurrences(of: "\\", with: "\\\\") .replacingOccurrences(of: "\"", with: "\\\"") let sanitizedBuddy = buddyId .replacingOccurrences(of: "\\", with: "\\\\") .replacingOccurrences(of: "\"", with: "\\\"") let svc = serviceType.rawValue // Resolve the service by iterating with a per-service `try`: a plain // `1st service whose service type = …` throws (-1728) when any stale // service errors on `service type`. Require an *enabled* service. let script = """ tell application "Messages" set targetService to missing value repeat with s in services try if (service type of s) is \(svc) and (enabled of s) then set targetService to s exit repeat end if end try end repeat if targetService is missing value then error "no enabled \(svc) service" set targetBuddy to buddy "\(sanitizedBuddy)" of targetService send "\(sanitizedBody)" to targetBuddy end tell """ let process = Process() process.executableURL = URL(fileURLWithPath: "/usr/bin/osascript") process.arguments = ["-e", script] let errorPipe = Pipe() process.standardError = errorPipe process.standardOutput = Pipe() osLog.notice("send: launching osascript (\(svc, privacy: .public)) for \(logPrefix, privacy: .public)...") do { try process.run() } catch { return (false, svc, error.localizedDescription) } // Bounded async wait for osascript to exit (45s cap). A wedged // Messages.app makes osascript hang on its AppleEvent timeout; the // cap guarantees the caller is released even then. let sendTimeout: TimeInterval = 45 let timedOut: Bool = await withCheckedContinuation { (cont: CheckedContinuation) in let once = ResumeOnce() process.terminationHandler = { _ in if once.claim() { cont.resume(returning: false) } } // Edge case: the process exited before the handler was attached. if !process.isRunning, once.claim() { cont.resume(returning: false) } DispatchQueue.global().asyncAfter(deadline: .now() + sendTimeout) { if once.claim() { cont.resume(returning: true) } } } if timedOut { if process.isRunning { process.terminate() } return (false, svc, "osascript send timed out after \(Int(sendTimeout))s") } if process.terminationStatus != 0 { let errorData = errorPipe.fileHandleForReading.readDataToEndOfFile() let msg = String(data: errorData, encoding: .utf8)? .trimmingCharacters(in: .whitespacesAndNewlines) ?? "osascript exited with code \(process.terminationStatus)" return (false, svc, msg) } // Verify against chat.db. Poll — a delivery error (e.g. error 22) can // take several seconds to register after osascript returns. Async // sleeps (do not block the actor); each chat.db read is itself bounded. for _ in 0..<8 { try? await Task.sleep(nanoseconds: 2_000_000_000) guard let status = await Self.boundedChatDBSendStatus(buddyId: buddyId) else { continue } if status.error != 0 { return (false, svc, "Messages error \(status.error)") } if status.isSent { return (true, svc, nil) } } // No error observed within the poll window — accept as sent. return (true, svc, nil) } func getRateLimitStatus() -> (dailyRemaining: Int, hourlyRemaining: Int) { resetCountersIfNeeded() return (max(0, maxDailyMessages - dailySendCount), max(0, maxHourlyMessages - hourlySendCount)) } private func resetCountersIfNeeded() { let now = Date() if now.timeIntervalSince(lastHourReset) >= 3600 { hourlySendCount = 0 lastHourReset = now } if !Calendar.current.isDate(now, inSameDayAs: lastDayReset) { dailySendCount = 0 lastDayReset = now } } // MARK: - ActivityLog bridge /// `ActivityLog` is `@MainActor`; hop to it from this off-main actor. nonisolated static func logActivity(success: Bool, _ message: String) async { await MainActor.run { if success { ActivityLog.shared.success(message) } else { ActivityLog.shared.error(message) } } } // MARK: - chat.db reads (nonisolated; bounded wrappers below) /// Delivery status for a previously-sent message, looked up by suffix. /// `nonisolated` — only reads chat.db, touches no actor state. nonisolated func checkDelivery(messageId: String) -> String { let parts = messageId.split(separator: "_") guard parts.count >= 3, let timestamp = Double(parts.last ?? "") else { return "pending" } let phoneSuffix = String(parts[1]) let sentDate = Date(timeIntervalSince1970: timestamp) guard let dbQueue = iMessageReader.shared.getDatabaseQueue() else { return "pending" } do { return try dbQueue.read { db in let row = try Row.fetchOne(db, sql: """ SELECT m.date_delivered, m.date_read FROM message m JOIN chat_message_join cmj ON cmj.message_id = m.ROWID JOIN chat_handle_join chj ON chj.chat_id = cmj.chat_id JOIN handle h ON h.ROWID = chj.handle_id WHERE m.is_from_me = 1 AND h.id LIKE ? AND m.date > ? ORDER BY m.date DESC LIMIT 1 """, arguments: ["%\(phoneSuffix)", sentDate.timeIntervalSinceReferenceDate * 1_000_000_000]) guard let row = row else { return "pending" } if row["date_read"] != nil && (row["date_read"] as? Int64 ?? 0) > 0 { return "read" } if row["date_delivered"] != nil && (row["date_delivered"] as? Int64 ?? 0) > 0 { return "delivered" } return "sent" } } catch { log.warning("Failed to check delivery: \(error)") return "pending" } } /// Lifecycle of the most-recent outbound message to `buddyId` in chat.db. /// `error != 0` means Messages failed the send even when `osascript` /// exited 0; `isSent` is the ground-truth sent flag. Synchronous — /// callers in the send path use `boundedChatDBSendStatus`. nonisolated static func chatDBSendStatus(buddyId: String) -> (error: Int, isSent: Bool)? { guard let dbQueue = iMessageReader.shared.getDatabaseQueue() else { return nil } let suffix = String(buddyId.suffix(7)) return (try? dbQueue.read { db -> (Int, Bool)? in guard let row = try Row.fetchOne(db, sql: """ SELECT m.error AS err, m.is_sent AS sent FROM message m JOIN handle h ON h.ROWID = m.handle_id WHERE m.is_from_me = 1 AND h.id LIKE ? ORDER BY m.date DESC LIMIT 1 """, arguments: ["%\(suffix)"]) else { return nil } let err = (row["err"] as? Int64).map(Int.init) ?? 0 let sent = ((row["sent"] as? Int64) ?? 0) != 0 return (err, sent) }) ?? nil } /// Per-service message-count history for `buddyId` in chat.db. Synchronous /// — callers in the send path use `boundedPreferredService`. nonisolated static func preferredService(buddyId: String) -> MessageServiceKind { guard let dbQueue = iMessageReader.shared.getDatabaseQueue() else { return .iMessage } let suffix = String(buddyId.suffix(7)) let kind: MessageServiceKind? = try? dbQueue.read { db -> MessageServiceKind in var imessage = 0 var sms = 0 let rows = try Row.fetchAll(db, sql: """ SELECT m.service AS svc, COUNT(*) AS c FROM message m JOIN handle h ON h.ROWID = m.handle_id WHERE h.id LIKE ? GROUP BY m.service """, arguments: ["%\(suffix)"]) for row in rows { let svc = (row["svc"] as? String) ?? "" let count = (row["c"] as? Int64).map(Int.init) ?? 0 if svc == "iMessage" { imessage += count } if svc == "SMS" { sms += count } } return decideService(imessageCount: imessage, smsCount: sms) } return kind ?? .iMessage } // MARK: - Bounded wrappers /// Runs a blocking value-producing closure on a background queue, bounded /// by `seconds`. On timeout the `fallback` is returned (the work keeps /// running in the background — it cannot be cancelled mid-syscall — but /// the caller, and the poller awaiting it, is released). nonisolated static func bounded( _ seconds: TimeInterval, fallback: T, _ work: @escaping @Sendable () -> T ) async -> T { await withCheckedContinuation { (cont: CheckedContinuation) in let once = ResumeOnce() DispatchQueue.global().async { let value = work() if once.claim() { cont.resume(returning: value) } } DispatchQueue.global().asyncAfter(deadline: .now() + seconds) { if once.claim() { cont.resume(returning: fallback) } } } } /// `chatDBSendStatus`, bounded — a chat.db lock cannot stall the send. nonisolated static func boundedChatDBSendStatus(buddyId: String) async -> (error: Int, isSent: Bool)? { await bounded(10, fallback: nil) { chatDBSendStatus(buddyId: buddyId) } } /// `preferredService`, bounded — a chat.db lock cannot stall the send. nonisolated static func boundedPreferredService(buddyId: String) async -> MessageServiceKind { await bounded(10, fallback: .iMessage) { preferredService(buddyId: buddyId) } } }