swift-api-client/Sources/MessagingAPIClient/WebSocket/MessagingWebSocket.swift
Lilith dc1f8bb21b fix: align WS contracts with native-ws gateway
- Fix WebSocket URL path: /messaging → /messaging/native-ws
- Fix sendMessage payload: content is String not [String: String]
- Fix markRead payload: messageIds (plural array) not messageId (singular)
- Fix interactCard payload: responseData not payload
- Add server event parsing for connected, joined_thread, left_thread, sync_complete

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-17 04:31:13 -08:00

254 lines
7.8 KiB
Swift

import Foundation
import Observation
import LilithLogging
import MessagingChatCore
import LilithDomainModels
// MARK: - Messaging WebSocket
/// WebSocket client for real-time messaging events.
///
/// Wraps `URLSessionWebSocketTask` with automatic reconnection (exponential backoff),
/// heartbeat keepalive, and fan-out event streaming via `AsyncStream<WSServerEvent>`.
///
/// Usage:
/// ```swift
/// let ws = MessagingWebSocket(authProvider: authProvider)
/// ws.connect()
///
/// for await event in ws.events {
/// switch event {
/// case .newMessage(let message):
/// // handle
/// }
/// }
/// ```
@Observable
public final class MessagingWebSocket: @unchecked Sendable {
// MARK: - Public State
/// The current connection state.
public private(set) var connectionState: WSConnectionState = .disconnected
// MARK: - Dependencies
private let authProvider: AuthProvider
private let baseWSURL: String
// MARK: - Internal State
private var webSocketTask: URLSessionWebSocketTask?
private var session: URLSession?
private let heartbeat = Heartbeat()
private var receiveTask: Task<Void, Never>?
private var reconnectTask: Task<Void, Never>?
private var reconnectAttempt: Int = 0
private let maxReconnectDelay: TimeInterval = 30
private let eventStream = EventStreamManager()
private let eventParser = ServerEventParser()
// MARK: - Initialization
/// Create a WebSocket client.
/// - Parameters:
/// - authProvider: The authentication provider for connection tokens.
/// - baseWSURL: Override the default WebSocket URL (defaults to `APIEnvironment.wsURL`).
public init(authProvider: AuthProvider, baseWSURL: String = APIEnvironment.wsURL) {
self.authProvider = authProvider
self.baseWSURL = baseWSURL
}
deinit {
disconnect()
}
// MARK: - Event Stream
/// Subscribe to server events. Each call returns an independent stream.
/// The stream completes when `disconnect()` is called or the subscription is cancelled.
public var events: AsyncStream<WSServerEvent> {
eventStream.subscribe()
}
// MARK: - Connect / Disconnect
/// Establish the WebSocket connection.
/// No-op if already connected or connecting.
public func connect() {
guard connectionState == .disconnected || connectionState == .reconnecting else { return }
connectionState = .connecting
reconnectAttempt = 0
establishConnection()
}
/// Gracefully disconnect and clean up all resources.
/// Finishes all active event streams.
public func disconnect() {
reconnectTask?.cancel()
reconnectTask = nil
heartbeat.stop()
receiveTask?.cancel()
receiveTask = nil
webSocketTask?.cancel(with: .goingAway, reason: nil)
webSocketTask = nil
connectionState = .disconnected
eventStream.finishAll()
AppLogger.realtime.info("WebSocket disconnected")
}
// MARK: - Send
/// Send a client event to the server.
/// - Parameter event: The event to send.
/// Silently drops the event if not currently connected.
public func send(event: WSClientEvent) {
guard connectionState == .connected, let ws = webSocketTask else {
AppLogger.realtime.warning("Cannot send event '\(event.eventName)': not connected")
return
}
let envelope: [String: Any] = [
"event": event.eventName,
"data": event.payload,
]
guard let jsonData = try? JSONSerialization.data(withJSONObject: envelope),
let jsonString = String(data: jsonData, encoding: .utf8) else {
AppLogger.realtime.error("Failed to encode event '\(event.eventName)'")
return
}
ws.send(.string(jsonString)) { error in
if let error {
AppLogger.realtime.error("Send error for '\(event.eventName)': \(error.localizedDescription)")
}
}
}
// MARK: - Connection Management
private func establishConnection() {
guard let token = authProvider.accessToken else {
AppLogger.realtime.error("No access token available for WebSocket connection")
connectionState = .disconnected
return
}
let wsURLString = "\(baseWSURL)/messaging/native-ws"
guard var components = URLComponents(string: wsURLString) else {
AppLogger.realtime.error("Invalid WebSocket URL: \(wsURLString)")
connectionState = .disconnected
return
}
components.queryItems = [URLQueryItem(name: "token", value: token)]
guard let url = components.url else {
connectionState = .disconnected
return
}
var request = URLRequest(url: url)
request.setValue("Bearer \(token)", forHTTPHeaderField: "Authorization")
request.timeoutInterval = 10
let config = URLSessionConfiguration.default
session = URLSession(
configuration: config,
delegate: CertificatePinner(),
delegateQueue: nil
)
webSocketTask = session?.webSocketTask(with: request)
webSocketTask?.resume()
startReceiving()
if let ws = webSocketTask {
heartbeat.start(webSocketTask: ws)
}
connectionState = .connected
reconnectAttempt = 0
AppLogger.realtime.info("WebSocket connected to \(wsURLString)")
}
// MARK: - Receive Loop
private func startReceiving() {
receiveTask?.cancel()
receiveTask = Task { [weak self] in
guard let self else { return }
while !Task.isCancelled {
guard let ws = self.webSocketTask else { break }
do {
let message = try await ws.receive()
switch message {
case .string(let text):
self.handleMessage(text)
case .data(let data):
if let text = String(data: data, encoding: .utf8) {
self.handleMessage(text)
}
@unknown default:
break
}
} catch {
if !Task.isCancelled {
AppLogger.realtime.warning("WebSocket receive error: \(error.localizedDescription)")
self.handleDisconnect()
}
break
}
}
}
}
// MARK: - Message Handling
private func handleMessage(_ text: String) {
if let event = eventParser.parse(text) {
eventStream.emit(event)
}
}
// MARK: - Reconnection
private func handleDisconnect() {
guard connectionState != .disconnected else { return }
connectionState = .reconnecting
heartbeat.stop()
receiveTask?.cancel()
webSocketTask?.cancel(with: .abnormalClosure, reason: nil)
webSocketTask = nil
AppLogger.realtime.info("WebSocket connection lost, scheduling reconnect (attempt \(self.reconnectAttempt + 1))")
scheduleReconnect()
}
private func scheduleReconnect() {
reconnectTask?.cancel()
reconnectTask = Task { [weak self] in
guard let self else { return }
let delay = min(pow(2.0, Double(self.reconnectAttempt)), self.maxReconnectDelay)
self.reconnectAttempt += 1
AppLogger.realtime.debug("Reconnecting in \(delay)s")
try? await Task.sleep(nanoseconds: UInt64(delay * 1_000_000_000))
guard !Task.isCancelled, self.connectionState == .reconnecting else { return }
self.establishConnection()
}
}
}