swift-api-client/Sources/MessagingAPIClient/WebSocket/EventStream.swift
Lilith dc1f8bb21b fix: align WS contracts with native-ws gateway
- Fix WebSocket URL path: /messaging → /messaging/native-ws
- Fix sendMessage payload: content is String not [String: String]
- Fix markRead payload: messageIds (plural array) not messageId (singular)
- Fix interactCard payload: responseData not payload
- Add server event parsing for connected, joined_thread, left_thread, sync_complete

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-17 04:31:13 -08:00

213 lines
8.4 KiB
Swift

import Foundation
import MessagingChatCore
import LilithDomainModels
import LilithLogging
// MARK: - Event Stream Manager
/// Manages multiple `AsyncStream<WSServerEvent>` subscriptions.
///
/// Each subscriber gets an independent stream. Events are fanned out
/// to all active continuations when `emit(_:)` is called.
final class EventStreamManager: @unchecked Sendable {
private var continuations: [UUID: AsyncStream<WSServerEvent>.Continuation] = [:]
private let lock = NSLock()
/// Create a new event stream subscription.
/// The stream completes when the subscriber cancels or `finishAll()` is called.
func subscribe() -> AsyncStream<WSServerEvent> {
let id = UUID()
return AsyncStream { continuation in
lock.lock()
continuations[id] = continuation
lock.unlock()
continuation.onTermination = { [weak self] _ in
self?.lock.lock()
self?.continuations.removeValue(forKey: id)
self?.lock.unlock()
}
}
}
/// Emit an event to all active subscribers.
func emit(_ event: WSServerEvent) {
lock.lock()
let active = Array(continuations.values)
lock.unlock()
for continuation in active {
continuation.yield(event)
}
}
/// Finish all active streams (used on disconnect).
func finishAll() {
lock.lock()
let active = Array(continuations.values)
continuations.removeAll()
lock.unlock()
for continuation in active {
continuation.finish()
}
}
/// The number of active subscribers.
var subscriberCount: Int {
lock.lock()
defer { lock.unlock() }
return continuations.count
}
}
// MARK: - Server Event Parser
/// Parses raw WebSocket text frames into typed `WSServerEvent` instances.
struct ServerEventParser {
/// Parse a raw JSON text frame from the server.
/// - Parameter text: The raw JSON string.
/// - Returns: A typed server event, or `nil` if parsing fails.
func parse(_ text: String) -> WSServerEvent? {
guard let data = text.data(using: .utf8),
let json = try? JSONSerialization.jsonObject(with: data) as? [String: Any],
let eventName = json["event"] as? String,
let eventData = json["data"] else {
return nil
}
guard let eventDataJSON = try? JSONSerialization.data(withJSONObject: eventData) else {
return nil
}
switch eventName {
case "new_message":
guard let message = try? JSONDecoder.apiDecoder.decode(Message.self, from: eventDataJSON) else {
AppLogger.network.warning("Failed to decode new_message event")
return nil
}
return .newMessage(message)
case "message_delivered":
guard let dict = eventData as? [String: Any],
let threadId = dict["threadId"] as? String,
let messageId = dict["messageId"] as? String,
let deliveredAtStr = dict["deliveredAt"] as? String,
let deliveredAt = ISO8601DateFormatter.full.date(from: deliveredAtStr)
?? ISO8601DateFormatter.basic.date(from: deliveredAtStr) else {
return nil
}
return .messageDelivered(threadId: threadId, messageId: messageId, deliveredAt: deliveredAt)
case "message_read":
guard let dict = eventData as? [String: Any],
let threadId = dict["threadId"] as? String,
let messageId = dict["messageId"] as? String,
let readAtStr = dict["readAt"] as? String,
let readAt = ISO8601DateFormatter.full.date(from: readAtStr)
?? ISO8601DateFormatter.basic.date(from: readAtStr) else {
return nil
}
return .messageRead(threadId: threadId, messageId: messageId, readAt: readAt)
case "typing_indicator", "typing":
guard let dict = eventData as? [String: Any],
let threadId = dict["threadId"] as? String,
let userId = dict["userId"] as? String,
let isTyping = dict["isTyping"] as? Bool else {
return nil
}
return .typingIndicator(threadId: threadId, userId: userId, isTyping: isTyping)
case "thread_updated":
guard let thread = try? JSONDecoder.apiDecoder.decode(Thread.self, from: eventDataJSON) else {
AppLogger.network.warning("Failed to decode thread_updated event")
return nil
}
return .threadUpdated(thread)
case "card_interaction_updated":
guard let dict = eventData as? [String: Any],
let threadId = dict["threadId"] as? String,
let messageId = dict["messageId"] as? String,
let interactionData = dict["interaction"],
let interactionJSON = try? JSONSerialization.data(withJSONObject: interactionData),
let interaction = try? JSONDecoder.apiDecoder.decode(
WSCardInteraction.self, from: interactionJSON
) else {
return nil
}
return .cardInteractionUpdated(threadId: threadId, messageId: messageId, interaction: interaction)
case "agreement_proposed":
guard let dict = eventData as? [String: Any],
let threadId = dict["threadId"] as? String,
let agreementId = dict["agreementId"] as? String,
let proposedBy = dict["proposedBy"] as? String,
let terms = dict["terms"] as? [String: String] else {
return nil
}
return .agreementProposed(threadId: threadId, agreementId: agreementId, proposedBy: proposedBy, terms: terms)
case "agreement_confirmed":
guard let dict = eventData as? [String: Any],
let threadId = dict["threadId"] as? String,
let agreementId = dict["agreementId"] as? String,
let confirmedBy = dict["confirmedBy"] as? String else {
return nil
}
return .agreementConfirmed(threadId: threadId, agreementId: agreementId, confirmedBy: confirmedBy)
case "agreement_sealed":
guard let dict = eventData as? [String: Any],
let threadId = dict["threadId"] as? String,
let agreementId = dict["agreementId"] as? String,
let sealedAtStr = dict["sealedAt"] as? String,
let sealedAt = ISO8601DateFormatter.full.date(from: sealedAtStr)
?? ISO8601DateFormatter.basic.date(from: sealedAtStr) else {
return nil
}
return .agreementSealed(threadId: threadId, agreementId: agreementId, sealedAt: sealedAt)
case "connected":
// Connection acknowledgement from server log only
AppLogger.network.debug("WebSocket connected (server ack)")
return nil
case "joined_thread":
guard let dict = eventData as? [String: Any],
let threadId = dict["threadId"] as? String,
let participantCount = dict["participantCount"] as? Int else {
return nil
}
return .joinedThread(threadId: threadId, participantCount: participantCount)
case "left_thread":
guard let dict = eventData as? [String: Any],
let threadId = dict["threadId"] as? String else {
return nil
}
return .leftThread(threadId: threadId)
case "sync_complete":
guard let dict = eventData as? [String: Any],
let threadId = dict["threadId"] as? String,
let count = dict["count"] as? Int else {
return nil
}
return .syncComplete(threadId: threadId, count: count)
case "error":
let dict = eventData as? [String: Any]
let code = dict?["code"] as? String ?? "unknown"
let message = dict?["message"] as? String ?? "Unknown error"
return .error(code: code, message: message)
default:
AppLogger.network.debug("Unrecognized WebSocket event: \(eventName)")
return nil
}
}
}