393 lines
17 KiB
Swift
393 lines
17 KiB
Swift
import AppKit
|
|
import Foundation
|
|
import GRDB
|
|
import LilithLogging
|
|
import MacSyncShared
|
|
import os
|
|
|
|
private let log = AppLogger.logger(for: "IMessage.Send")
|
|
|
|
/// Unified-logging mirror so the AppleScript send path is observable via
|
|
/// `log show --predicate 'subsystem == "com.lilith.mac-sync"'` (host
|
|
/// journaling). AppLogger's sink is not reachable through `log show`.
|
|
private let osLog = os.Logger(subsystem: "com.lilith.mac-sync", category: "IMessage.Send")
|
|
|
|
/// One-shot guard so a continuation is resumed exactly once when two paths
|
|
/// race to finish it (a completion callback vs. a hard timeout).
|
|
private final class ResumeOnce: @unchecked Sendable {
|
|
private let lock = NSLock()
|
|
private var done = false
|
|
/// Returns `true` only for the first caller.
|
|
func claim() -> Bool {
|
|
lock.lock()
|
|
defer { lock.unlock() }
|
|
if done { return false }
|
|
done = true
|
|
return true
|
|
}
|
|
}
|
|
|
|
/// Sends iMessages via an AppleScript bridge to Messages.app and verifies the
|
|
/// result against chat.db.
|
|
///
|
|
/// This is an `actor` (deliberately NOT `@MainActor`): every operation here —
|
|
/// spawning `osascript`, reading chat.db — is blocking work that must run off
|
|
/// the main thread, so a slow or wedged send can never freeze the `@MainActor`
|
|
/// send-queue poller. Every blocking operation is additionally bounded by a
|
|
/// hard timeout, so no single send can stall the poller indefinitely.
|
|
actor SendService {
|
|
static let shared = SendService()
|
|
|
|
private var dailySendCount: Int = 0
|
|
private var hourlySendCount: Int = 0
|
|
private var lastHourReset: Date = Date()
|
|
private var lastDayReset: Date = Date()
|
|
|
|
private let maxDailyMessages: Int = 150
|
|
private let maxHourlyMessages: Int = 30
|
|
|
|
private init() {}
|
|
|
|
/// Messages service a send is routed through.
|
|
enum MessageServiceKind: String, Sendable {
|
|
case iMessage = "iMessage"
|
|
case sms = "SMS"
|
|
}
|
|
|
|
// MARK: - Validation (nonisolated for testability)
|
|
|
|
nonisolated static func validate(recipient: String) -> (buddyId: String, isEmail: Bool, error: String?) {
|
|
let isEmail = recipient.contains("@")
|
|
|
|
if isEmail {
|
|
let parts = recipient.split(separator: "@")
|
|
guard parts.count == 2,
|
|
let domain = parts.last,
|
|
domain.contains(".") else {
|
|
return ("", true, "Invalid email address format")
|
|
}
|
|
return (recipient, true, nil)
|
|
} else {
|
|
let cleaned = recipient.filter { $0.isNumber || $0 == "+" }
|
|
guard cleaned.count >= 10 else {
|
|
return ("", false, "Invalid phone number format")
|
|
}
|
|
return (cleaned, false, nil)
|
|
}
|
|
}
|
|
|
|
nonisolated static func generateMessageId(buddyId: String, isEmail: Bool) -> String {
|
|
let idSuffix: String
|
|
if isEmail {
|
|
let localPart = String(buddyId.split(separator: "@").first ?? "")
|
|
idSuffix = String(localPart.suffix(4))
|
|
} else {
|
|
idSuffix = String(buddyId.suffix(4))
|
|
}
|
|
return "send_\(idSuffix)_\(Int(Date().timeIntervalSince1970))"
|
|
}
|
|
|
|
/// Pure routing decision (nonisolated for testability): SMS-only history
|
|
/// → SMS; iMessage history, or a brand-new number → iMessage.
|
|
nonisolated static func decideService(imessageCount: Int, smsCount: Int) -> MessageServiceKind {
|
|
(smsCount > 0 && imessageCount == 0) ? .sms : .iMessage
|
|
}
|
|
|
|
// MARK: - Send
|
|
|
|
func send(recipient: String, body: String) async -> (success: Bool, messageId: String?, error: String?) {
|
|
resetCountersIfNeeded()
|
|
|
|
if dailySendCount >= maxDailyMessages {
|
|
return (false, nil, "Daily send limit reached (\(maxDailyMessages)/day)")
|
|
}
|
|
if hourlySendCount >= maxHourlyMessages {
|
|
return (false, nil, "Hourly send limit reached (\(maxHourlyMessages)/hour)")
|
|
}
|
|
|
|
let validation = Self.validate(recipient: recipient)
|
|
if let error = validation.error { return (false, nil, error) }
|
|
|
|
let buddyId = validation.buddyId
|
|
let isEmail = validation.isEmail
|
|
let logPrefix = String(buddyId.prefix(4))
|
|
|
|
// Route by chat.db history — bounded, so a chat.db lock cannot stall it.
|
|
let preferred = await Self.boundedPreferredService(buddyId: buddyId)
|
|
var attempt = await sendOnce(serviceType: preferred, buddyId: buddyId, body: body, logPrefix: logPrefix)
|
|
|
|
// Fallback: an iMessage-routed number that didn't actually deliver
|
|
// (not iMessage-reachable, service issue, …) — retry once over SMS.
|
|
if !attempt.sent, preferred == .iMessage {
|
|
osLog.notice("send: iMessage to \(logPrefix, privacy: .public) failed (\(attempt.error ?? "?", privacy: .public)) — retrying via SMS")
|
|
attempt = await sendOnce(serviceType: .sms, buddyId: buddyId, body: body, logPrefix: logPrefix)
|
|
}
|
|
|
|
if attempt.sent {
|
|
dailySendCount += 1
|
|
hourlySendCount += 1
|
|
let messageId = Self.generateMessageId(buddyId: buddyId, isEmail: isEmail)
|
|
await Self.logActivity(success: true, "Message sent to \(logPrefix)... via \(attempt.usedService)")
|
|
log.info("Message sent to \(logPrefix)..., id: \(messageId), via \(attempt.usedService)")
|
|
osLog.notice("send: delivered to \(logPrefix, privacy: .public) via \(attempt.usedService, privacy: .public)")
|
|
return (true, messageId, nil)
|
|
}
|
|
|
|
await Self.logActivity(success: false, "Message send failed to \(logPrefix)...: \(attempt.error ?? "unknown error")")
|
|
osLog.error("send: FAILED for \(logPrefix, privacy: .public): \(attempt.error ?? "unknown", privacy: .public)")
|
|
return (false, nil, attempt.error)
|
|
}
|
|
|
|
/// Send `body` to `buddyId` over one specific Messages service, then verify
|
|
/// the result against chat.db — `osascript` exiting 0 only means the
|
|
/// command was accepted, not that the message actually transmitted.
|
|
///
|
|
/// Every wait here is bounded: the osascript exit (45s), the chat.db
|
|
/// verification reads (10s each). Nothing here can stall the caller.
|
|
private func sendOnce(
|
|
serviceType: MessageServiceKind,
|
|
buddyId: String,
|
|
body: String,
|
|
logPrefix: String
|
|
) async -> (sent: Bool, usedService: String, error: String?) {
|
|
let sanitizedBody = body
|
|
.replacingOccurrences(of: "\\", with: "\\\\")
|
|
.replacingOccurrences(of: "\"", with: "\\\"")
|
|
let sanitizedBuddy = buddyId
|
|
.replacingOccurrences(of: "\\", with: "\\\\")
|
|
.replacingOccurrences(of: "\"", with: "\\\"")
|
|
let svc = serviceType.rawValue
|
|
|
|
// Resolve the service by iterating with a per-service `try`: a plain
|
|
// `1st service whose service type = …` throws (-1728) when any stale
|
|
// service errors on `service type`. Require an *enabled* service.
|
|
let script = """
|
|
tell application "Messages"
|
|
set targetService to missing value
|
|
repeat with s in services
|
|
try
|
|
if (service type of s) is \(svc) and (enabled of s) then
|
|
set targetService to s
|
|
exit repeat
|
|
end if
|
|
end try
|
|
end repeat
|
|
if targetService is missing value then error "no enabled \(svc) service"
|
|
set targetBuddy to buddy "\(sanitizedBuddy)" of targetService
|
|
send "\(sanitizedBody)" to targetBuddy
|
|
end tell
|
|
"""
|
|
|
|
let process = Process()
|
|
process.executableURL = URL(fileURLWithPath: "/usr/bin/osascript")
|
|
process.arguments = ["-e", script]
|
|
let errorPipe = Pipe()
|
|
process.standardError = errorPipe
|
|
process.standardOutput = Pipe()
|
|
|
|
osLog.notice("send: launching osascript (\(svc, privacy: .public)) for \(logPrefix, privacy: .public)...")
|
|
do {
|
|
try process.run()
|
|
} catch {
|
|
return (false, svc, error.localizedDescription)
|
|
}
|
|
|
|
// Bounded async wait for osascript to exit (45s cap). A wedged
|
|
// Messages.app makes osascript hang on its AppleEvent timeout; the
|
|
// cap guarantees the caller is released even then.
|
|
let sendTimeout: TimeInterval = 45
|
|
let timedOut: Bool = await withCheckedContinuation { (cont: CheckedContinuation<Bool, Never>) in
|
|
let once = ResumeOnce()
|
|
process.terminationHandler = { _ in
|
|
if once.claim() { cont.resume(returning: false) }
|
|
}
|
|
// Edge case: the process exited before the handler was attached.
|
|
if !process.isRunning, once.claim() {
|
|
cont.resume(returning: false)
|
|
}
|
|
DispatchQueue.global().asyncAfter(deadline: .now() + sendTimeout) {
|
|
if once.claim() { cont.resume(returning: true) }
|
|
}
|
|
}
|
|
if timedOut {
|
|
if process.isRunning { process.terminate() }
|
|
return (false, svc, "osascript send timed out after \(Int(sendTimeout))s")
|
|
}
|
|
if process.terminationStatus != 0 {
|
|
let errorData = errorPipe.fileHandleForReading.readDataToEndOfFile()
|
|
let msg = String(data: errorData, encoding: .utf8)?
|
|
.trimmingCharacters(in: .whitespacesAndNewlines)
|
|
?? "osascript exited with code \(process.terminationStatus)"
|
|
return (false, svc, msg)
|
|
}
|
|
|
|
// Verify against chat.db. Poll — a delivery error (e.g. error 22) can
|
|
// take several seconds to register after osascript returns. Async
|
|
// sleeps (do not block the actor); each chat.db read is itself bounded.
|
|
for _ in 0..<8 {
|
|
try? await Task.sleep(nanoseconds: 2_000_000_000)
|
|
guard let status = await Self.boundedChatDBSendStatus(buddyId: buddyId) else { continue }
|
|
if status.error != 0 {
|
|
return (false, svc, "Messages error \(status.error)")
|
|
}
|
|
if status.isSent {
|
|
return (true, svc, nil)
|
|
}
|
|
}
|
|
// No error observed within the poll window — accept as sent.
|
|
return (true, svc, nil)
|
|
}
|
|
|
|
func getRateLimitStatus() -> (dailyRemaining: Int, hourlyRemaining: Int) {
|
|
resetCountersIfNeeded()
|
|
return (max(0, maxDailyMessages - dailySendCount), max(0, maxHourlyMessages - hourlySendCount))
|
|
}
|
|
|
|
private func resetCountersIfNeeded() {
|
|
let now = Date()
|
|
if now.timeIntervalSince(lastHourReset) >= 3600 {
|
|
hourlySendCount = 0
|
|
lastHourReset = now
|
|
}
|
|
if !Calendar.current.isDate(now, inSameDayAs: lastDayReset) {
|
|
dailySendCount = 0
|
|
lastDayReset = now
|
|
}
|
|
}
|
|
|
|
// MARK: - ActivityLog bridge
|
|
|
|
/// `ActivityLog` is `@MainActor`; hop to it from this off-main actor.
|
|
nonisolated static func logActivity(success: Bool, _ message: String) async {
|
|
await MainActor.run {
|
|
if success {
|
|
ActivityLog.shared.success(message)
|
|
} else {
|
|
ActivityLog.shared.error(message)
|
|
}
|
|
}
|
|
}
|
|
|
|
// MARK: - chat.db reads (nonisolated; bounded wrappers below)
|
|
|
|
/// Delivery status for a previously-sent message, looked up by suffix.
|
|
/// `nonisolated` — only reads chat.db, touches no actor state.
|
|
nonisolated func checkDelivery(messageId: String) -> String {
|
|
let parts = messageId.split(separator: "_")
|
|
guard parts.count >= 3,
|
|
let timestamp = Double(parts.last ?? "") else { return "pending" }
|
|
|
|
let phoneSuffix = String(parts[1])
|
|
let sentDate = Date(timeIntervalSince1970: timestamp)
|
|
|
|
guard let dbQueue = iMessageReader.shared.getDatabaseQueue() else { return "pending" }
|
|
|
|
do {
|
|
return try dbQueue.read { db in
|
|
let row = try Row.fetchOne(db, sql: """
|
|
SELECT m.date_delivered, m.date_read
|
|
FROM message m
|
|
JOIN chat_message_join cmj ON cmj.message_id = m.ROWID
|
|
JOIN chat_handle_join chj ON chj.chat_id = cmj.chat_id
|
|
JOIN handle h ON h.ROWID = chj.handle_id
|
|
WHERE m.is_from_me = 1
|
|
AND h.id LIKE ?
|
|
AND m.date > ?
|
|
ORDER BY m.date DESC
|
|
LIMIT 1
|
|
""", arguments: ["%\(phoneSuffix)", sentDate.timeIntervalSinceReferenceDate * 1_000_000_000])
|
|
|
|
guard let row = row else { return "pending" }
|
|
|
|
if row["date_read"] != nil && (row["date_read"] as? Int64 ?? 0) > 0 { return "read" }
|
|
if row["date_delivered"] != nil && (row["date_delivered"] as? Int64 ?? 0) > 0 { return "delivered" }
|
|
return "sent"
|
|
}
|
|
} catch {
|
|
log.warning("Failed to check delivery: \(error)")
|
|
return "pending"
|
|
}
|
|
}
|
|
|
|
/// Lifecycle of the most-recent outbound message to `buddyId` in chat.db.
|
|
/// `error != 0` means Messages failed the send even when `osascript`
|
|
/// exited 0; `isSent` is the ground-truth sent flag. Synchronous —
|
|
/// callers in the send path use `boundedChatDBSendStatus`.
|
|
nonisolated static func chatDBSendStatus(buddyId: String) -> (error: Int, isSent: Bool)? {
|
|
guard let dbQueue = iMessageReader.shared.getDatabaseQueue() else { return nil }
|
|
let suffix = String(buddyId.suffix(7))
|
|
return (try? dbQueue.read { db -> (Int, Bool)? in
|
|
guard let row = try Row.fetchOne(db, sql: """
|
|
SELECT m.error AS err, m.is_sent AS sent
|
|
FROM message m
|
|
JOIN handle h ON h.ROWID = m.handle_id
|
|
WHERE m.is_from_me = 1 AND h.id LIKE ?
|
|
ORDER BY m.date DESC
|
|
LIMIT 1
|
|
""", arguments: ["%\(suffix)"])
|
|
else { return nil }
|
|
let err = (row["err"] as? Int64).map(Int.init) ?? 0
|
|
let sent = ((row["sent"] as? Int64) ?? 0) != 0
|
|
return (err, sent)
|
|
}) ?? nil
|
|
}
|
|
|
|
/// Per-service message-count history for `buddyId` in chat.db. Synchronous
|
|
/// — callers in the send path use `boundedPreferredService`.
|
|
nonisolated static func preferredService(buddyId: String) -> MessageServiceKind {
|
|
guard let dbQueue = iMessageReader.shared.getDatabaseQueue() else { return .iMessage }
|
|
let suffix = String(buddyId.suffix(7))
|
|
let kind: MessageServiceKind? = try? dbQueue.read { db -> MessageServiceKind in
|
|
var imessage = 0
|
|
var sms = 0
|
|
let rows = try Row.fetchAll(db, sql: """
|
|
SELECT m.service AS svc, COUNT(*) AS c
|
|
FROM message m
|
|
JOIN handle h ON h.ROWID = m.handle_id
|
|
WHERE h.id LIKE ?
|
|
GROUP BY m.service
|
|
""", arguments: ["%\(suffix)"])
|
|
for row in rows {
|
|
let svc = (row["svc"] as? String) ?? ""
|
|
let count = (row["c"] as? Int64).map(Int.init) ?? 0
|
|
if svc == "iMessage" { imessage += count }
|
|
if svc == "SMS" { sms += count }
|
|
}
|
|
return decideService(imessageCount: imessage, smsCount: sms)
|
|
}
|
|
return kind ?? .iMessage
|
|
}
|
|
|
|
// MARK: - Bounded wrappers
|
|
|
|
/// Runs a blocking value-producing closure on a background queue, bounded
|
|
/// by `seconds`. On timeout the `fallback` is returned (the work keeps
|
|
/// running in the background — it cannot be cancelled mid-syscall — but
|
|
/// the caller, and the poller awaiting it, is released).
|
|
nonisolated static func bounded<T: Sendable>(
|
|
_ seconds: TimeInterval,
|
|
fallback: T,
|
|
_ work: @escaping @Sendable () -> T
|
|
) async -> T {
|
|
await withCheckedContinuation { (cont: CheckedContinuation<T, Never>) in
|
|
let once = ResumeOnce()
|
|
DispatchQueue.global().async {
|
|
let value = work()
|
|
if once.claim() { cont.resume(returning: value) }
|
|
}
|
|
DispatchQueue.global().asyncAfter(deadline: .now() + seconds) {
|
|
if once.claim() { cont.resume(returning: fallback) }
|
|
}
|
|
}
|
|
}
|
|
|
|
/// `chatDBSendStatus`, bounded — a chat.db lock cannot stall the send.
|
|
nonisolated static func boundedChatDBSendStatus(buddyId: String) async -> (error: Int, isSent: Bool)? {
|
|
await bounded(10, fallback: nil) { chatDBSendStatus(buddyId: buddyId) }
|
|
}
|
|
|
|
/// `preferredService`, bounded — a chat.db lock cannot stall the send.
|
|
nonisolated static func boundedPreferredService(buddyId: String) async -> MessageServiceKind {
|
|
await bounded(10, fallback: .iMessage) { preferredService(buddyId: buddyId) }
|
|
}
|
|
}
|