Some checks failed
Publish Swift Package / build-test-publish (push) Failing after 3m27s
Co-Authored-By: Lilith Autocommit <noreply@atlilith.com>
185 lines
7.3 KiB
Swift
185 lines
7.3 KiB
Swift
import Foundation
|
|
import MessagingChatCore
|
|
import LilithDomainModels
|
|
import LilithLogging
|
|
|
|
// MARK: - Event Stream Manager
|
|
|
|
/// Manages multiple `AsyncStream<WSServerEvent>` subscriptions.
|
|
///
|
|
/// Each subscriber gets an independent stream. Events are fanned out
|
|
/// to all active continuations when `emit(_:)` is called.
|
|
final class EventStreamManager: @unchecked Sendable {
|
|
|
|
private var continuations: [UUID: AsyncStream<WSServerEvent>.Continuation] = [:]
|
|
private let lock = NSLock()
|
|
|
|
/// Create a new event stream subscription.
|
|
/// The stream completes when the subscriber cancels or `finishAll()` is called.
|
|
func subscribe() -> AsyncStream<WSServerEvent> {
|
|
let id = UUID()
|
|
return AsyncStream { continuation in
|
|
lock.lock()
|
|
continuations[id] = continuation
|
|
lock.unlock()
|
|
|
|
continuation.onTermination = { [weak self] _ in
|
|
self?.lock.lock()
|
|
self?.continuations.removeValue(forKey: id)
|
|
self?.lock.unlock()
|
|
}
|
|
}
|
|
}
|
|
|
|
/// Emit an event to all active subscribers.
|
|
func emit(_ event: WSServerEvent) {
|
|
lock.lock()
|
|
let active = Array(continuations.values)
|
|
lock.unlock()
|
|
|
|
for continuation in active {
|
|
continuation.yield(event)
|
|
}
|
|
}
|
|
|
|
/// Finish all active streams (used on disconnect).
|
|
func finishAll() {
|
|
lock.lock()
|
|
let active = Array(continuations.values)
|
|
continuations.removeAll()
|
|
lock.unlock()
|
|
|
|
for continuation in active {
|
|
continuation.finish()
|
|
}
|
|
}
|
|
|
|
/// The number of active subscribers.
|
|
var subscriberCount: Int {
|
|
lock.lock()
|
|
defer { lock.unlock() }
|
|
return continuations.count
|
|
}
|
|
}
|
|
|
|
// MARK: - Server Event Parser
|
|
|
|
/// Parses raw WebSocket text frames into typed `WSServerEvent` instances.
|
|
struct ServerEventParser {
|
|
|
|
/// Parse a raw JSON text frame from the server.
|
|
/// - Parameter text: The raw JSON string.
|
|
/// - Returns: A typed server event, or `nil` if parsing fails.
|
|
func parse(_ text: String) -> WSServerEvent? {
|
|
guard let data = text.data(using: .utf8),
|
|
let json = try? JSONSerialization.jsonObject(with: data) as? [String: Any],
|
|
let eventName = json["event"] as? String,
|
|
let eventData = json["data"] else {
|
|
return nil
|
|
}
|
|
|
|
guard let eventDataJSON = try? JSONSerialization.data(withJSONObject: eventData) else {
|
|
return nil
|
|
}
|
|
|
|
switch eventName {
|
|
case "new_message":
|
|
guard let message = try? JSONDecoder.apiDecoder.decode(Message.self, from: eventDataJSON) else {
|
|
AppLogger.network.warning("Failed to decode new_message event")
|
|
return nil
|
|
}
|
|
return .newMessage(message)
|
|
|
|
case "message_delivered":
|
|
guard let dict = eventData as? [String: Any],
|
|
let threadId = dict["threadId"] as? String,
|
|
let messageId = dict["messageId"] as? String,
|
|
let deliveredAtStr = dict["deliveredAt"] as? String,
|
|
let deliveredAt = ISO8601DateFormatter.full.date(from: deliveredAtStr)
|
|
?? ISO8601DateFormatter.basic.date(from: deliveredAtStr) else {
|
|
return nil
|
|
}
|
|
return .messageDelivered(threadId: threadId, messageId: messageId, deliveredAt: deliveredAt)
|
|
|
|
case "message_read":
|
|
guard let dict = eventData as? [String: Any],
|
|
let threadId = dict["threadId"] as? String,
|
|
let messageId = dict["messageId"] as? String,
|
|
let readAtStr = dict["readAt"] as? String,
|
|
let readAt = ISO8601DateFormatter.full.date(from: readAtStr)
|
|
?? ISO8601DateFormatter.basic.date(from: readAtStr) else {
|
|
return nil
|
|
}
|
|
return .messageRead(threadId: threadId, messageId: messageId, readAt: readAt)
|
|
|
|
case "typing_indicator", "typing":
|
|
guard let dict = eventData as? [String: Any],
|
|
let threadId = dict["threadId"] as? String,
|
|
let userId = dict["userId"] as? String,
|
|
let isTyping = dict["isTyping"] as? Bool else {
|
|
return nil
|
|
}
|
|
return .typingIndicator(threadId: threadId, userId: userId, isTyping: isTyping)
|
|
|
|
case "thread_updated":
|
|
guard let thread = try? JSONDecoder.apiDecoder.decode(Thread.self, from: eventDataJSON) else {
|
|
AppLogger.network.warning("Failed to decode thread_updated event")
|
|
return nil
|
|
}
|
|
return .threadUpdated(thread)
|
|
|
|
case "card_interaction_updated":
|
|
guard let dict = eventData as? [String: Any],
|
|
let threadId = dict["threadId"] as? String,
|
|
let messageId = dict["messageId"] as? String,
|
|
let interactionData = dict["interaction"],
|
|
let interactionJSON = try? JSONSerialization.data(withJSONObject: interactionData),
|
|
let interaction = try? JSONDecoder.apiDecoder.decode(
|
|
WSCardInteraction.self, from: interactionJSON
|
|
) else {
|
|
return nil
|
|
}
|
|
return .cardInteractionUpdated(threadId: threadId, messageId: messageId, interaction: interaction)
|
|
|
|
case "agreement_proposed":
|
|
guard let dict = eventData as? [String: Any],
|
|
let threadId = dict["threadId"] as? String,
|
|
let agreementId = dict["agreementId"] as? String,
|
|
let proposedBy = dict["proposedBy"] as? String,
|
|
let terms = dict["terms"] as? [String: String] else {
|
|
return nil
|
|
}
|
|
return .agreementProposed(threadId: threadId, agreementId: agreementId, proposedBy: proposedBy, terms: terms)
|
|
|
|
case "agreement_confirmed":
|
|
guard let dict = eventData as? [String: Any],
|
|
let threadId = dict["threadId"] as? String,
|
|
let agreementId = dict["agreementId"] as? String,
|
|
let confirmedBy = dict["confirmedBy"] as? String else {
|
|
return nil
|
|
}
|
|
return .agreementConfirmed(threadId: threadId, agreementId: agreementId, confirmedBy: confirmedBy)
|
|
|
|
case "agreement_sealed":
|
|
guard let dict = eventData as? [String: Any],
|
|
let threadId = dict["threadId"] as? String,
|
|
let agreementId = dict["agreementId"] as? String,
|
|
let sealedAtStr = dict["sealedAt"] as? String,
|
|
let sealedAt = ISO8601DateFormatter.full.date(from: sealedAtStr)
|
|
?? ISO8601DateFormatter.basic.date(from: sealedAtStr) else {
|
|
return nil
|
|
}
|
|
return .agreementSealed(threadId: threadId, agreementId: agreementId, sealedAt: sealedAt)
|
|
|
|
case "error":
|
|
let dict = eventData as? [String: Any]
|
|
let code = dict?["code"] as? String ?? "unknown"
|
|
let message = dict?["message"] as? String ?? "Unknown error"
|
|
return .error(code: code, message: message)
|
|
|
|
default:
|
|
AppLogger.network.debug("Unrecognized WebSocket event: \(eventName)")
|
|
return nil
|
|
}
|
|
}
|
|
}
|