- Fix WebSocket URL path: /messaging → /messaging/native-ws - Fix sendMessage payload: content is String not [String: String] - Fix markRead payload: messageIds (plural array) not messageId (singular) - Fix interactCard payload: responseData not payload - Add server event parsing for connected, joined_thread, left_thread, sync_complete Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
254 lines
7.8 KiB
Swift
254 lines
7.8 KiB
Swift
import Foundation
|
|
import Observation
|
|
import LilithLogging
|
|
import MessagingChatCore
|
|
import LilithDomainModels
|
|
|
|
// MARK: - Messaging WebSocket
|
|
|
|
/// WebSocket client for real-time messaging events.
|
|
///
|
|
/// Wraps `URLSessionWebSocketTask` with automatic reconnection (exponential backoff),
|
|
/// heartbeat keepalive, and fan-out event streaming via `AsyncStream<WSServerEvent>`.
|
|
///
|
|
/// Usage:
|
|
/// ```swift
|
|
/// let ws = MessagingWebSocket(authProvider: authProvider)
|
|
/// ws.connect()
|
|
///
|
|
/// for await event in ws.events {
|
|
/// switch event {
|
|
/// case .newMessage(let message):
|
|
/// // handle
|
|
/// }
|
|
/// }
|
|
/// ```
|
|
@Observable
|
|
public final class MessagingWebSocket: @unchecked Sendable {
|
|
|
|
// MARK: - Public State
|
|
|
|
/// The current connection state.
|
|
public private(set) var connectionState: WSConnectionState = .disconnected
|
|
|
|
// MARK: - Dependencies
|
|
|
|
private let authProvider: AuthProvider
|
|
private let baseWSURL: String
|
|
|
|
// MARK: - Internal State
|
|
|
|
private var webSocketTask: URLSessionWebSocketTask?
|
|
private var session: URLSession?
|
|
private let heartbeat = Heartbeat()
|
|
private var receiveTask: Task<Void, Never>?
|
|
private var reconnectTask: Task<Void, Never>?
|
|
private var reconnectAttempt: Int = 0
|
|
private let maxReconnectDelay: TimeInterval = 30
|
|
|
|
private let eventStream = EventStreamManager()
|
|
private let eventParser = ServerEventParser()
|
|
|
|
// MARK: - Initialization
|
|
|
|
/// Create a WebSocket client.
|
|
/// - Parameters:
|
|
/// - authProvider: The authentication provider for connection tokens.
|
|
/// - baseWSURL: Override the default WebSocket URL (defaults to `APIEnvironment.wsURL`).
|
|
public init(authProvider: AuthProvider, baseWSURL: String = APIEnvironment.wsURL) {
|
|
self.authProvider = authProvider
|
|
self.baseWSURL = baseWSURL
|
|
}
|
|
|
|
deinit {
|
|
disconnect()
|
|
}
|
|
|
|
// MARK: - Event Stream
|
|
|
|
/// Subscribe to server events. Each call returns an independent stream.
|
|
/// The stream completes when `disconnect()` is called or the subscription is cancelled.
|
|
public var events: AsyncStream<WSServerEvent> {
|
|
eventStream.subscribe()
|
|
}
|
|
|
|
// MARK: - Connect / Disconnect
|
|
|
|
/// Establish the WebSocket connection.
|
|
/// No-op if already connected or connecting.
|
|
public func connect() {
|
|
guard connectionState == .disconnected || connectionState == .reconnecting else { return }
|
|
connectionState = .connecting
|
|
reconnectAttempt = 0
|
|
|
|
establishConnection()
|
|
}
|
|
|
|
/// Gracefully disconnect and clean up all resources.
|
|
/// Finishes all active event streams.
|
|
public func disconnect() {
|
|
reconnectTask?.cancel()
|
|
reconnectTask = nil
|
|
heartbeat.stop()
|
|
receiveTask?.cancel()
|
|
receiveTask = nil
|
|
|
|
webSocketTask?.cancel(with: .goingAway, reason: nil)
|
|
webSocketTask = nil
|
|
|
|
connectionState = .disconnected
|
|
eventStream.finishAll()
|
|
|
|
AppLogger.realtime.info("WebSocket disconnected")
|
|
}
|
|
|
|
// MARK: - Send
|
|
|
|
/// Send a client event to the server.
|
|
/// - Parameter event: The event to send.
|
|
/// Silently drops the event if not currently connected.
|
|
public func send(event: WSClientEvent) {
|
|
guard connectionState == .connected, let ws = webSocketTask else {
|
|
AppLogger.realtime.warning("Cannot send event '\(event.eventName)': not connected")
|
|
return
|
|
}
|
|
|
|
let envelope: [String: Any] = [
|
|
"event": event.eventName,
|
|
"data": event.payload,
|
|
]
|
|
|
|
guard let jsonData = try? JSONSerialization.data(withJSONObject: envelope),
|
|
let jsonString = String(data: jsonData, encoding: .utf8) else {
|
|
AppLogger.realtime.error("Failed to encode event '\(event.eventName)'")
|
|
return
|
|
}
|
|
|
|
ws.send(.string(jsonString)) { error in
|
|
if let error {
|
|
AppLogger.realtime.error("Send error for '\(event.eventName)': \(error.localizedDescription)")
|
|
}
|
|
}
|
|
}
|
|
|
|
// MARK: - Connection Management
|
|
|
|
private func establishConnection() {
|
|
guard let token = authProvider.accessToken else {
|
|
AppLogger.realtime.error("No access token available for WebSocket connection")
|
|
connectionState = .disconnected
|
|
return
|
|
}
|
|
|
|
let wsURLString = "\(baseWSURL)/messaging/native-ws"
|
|
guard var components = URLComponents(string: wsURLString) else {
|
|
AppLogger.realtime.error("Invalid WebSocket URL: \(wsURLString)")
|
|
connectionState = .disconnected
|
|
return
|
|
}
|
|
|
|
components.queryItems = [URLQueryItem(name: "token", value: token)]
|
|
|
|
guard let url = components.url else {
|
|
connectionState = .disconnected
|
|
return
|
|
}
|
|
|
|
var request = URLRequest(url: url)
|
|
request.setValue("Bearer \(token)", forHTTPHeaderField: "Authorization")
|
|
request.timeoutInterval = 10
|
|
|
|
let config = URLSessionConfiguration.default
|
|
session = URLSession(
|
|
configuration: config,
|
|
delegate: CertificatePinner(),
|
|
delegateQueue: nil
|
|
)
|
|
|
|
webSocketTask = session?.webSocketTask(with: request)
|
|
webSocketTask?.resume()
|
|
|
|
startReceiving()
|
|
if let ws = webSocketTask {
|
|
heartbeat.start(webSocketTask: ws)
|
|
}
|
|
|
|
connectionState = .connected
|
|
reconnectAttempt = 0
|
|
|
|
AppLogger.realtime.info("WebSocket connected to \(wsURLString)")
|
|
}
|
|
|
|
// MARK: - Receive Loop
|
|
|
|
private func startReceiving() {
|
|
receiveTask?.cancel()
|
|
receiveTask = Task { [weak self] in
|
|
guard let self else { return }
|
|
|
|
while !Task.isCancelled {
|
|
guard let ws = self.webSocketTask else { break }
|
|
|
|
do {
|
|
let message = try await ws.receive()
|
|
switch message {
|
|
case .string(let text):
|
|
self.handleMessage(text)
|
|
case .data(let data):
|
|
if let text = String(data: data, encoding: .utf8) {
|
|
self.handleMessage(text)
|
|
}
|
|
@unknown default:
|
|
break
|
|
}
|
|
} catch {
|
|
if !Task.isCancelled {
|
|
AppLogger.realtime.warning("WebSocket receive error: \(error.localizedDescription)")
|
|
self.handleDisconnect()
|
|
}
|
|
break
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// MARK: - Message Handling
|
|
|
|
private func handleMessage(_ text: String) {
|
|
if let event = eventParser.parse(text) {
|
|
eventStream.emit(event)
|
|
}
|
|
}
|
|
|
|
// MARK: - Reconnection
|
|
|
|
private func handleDisconnect() {
|
|
guard connectionState != .disconnected else { return }
|
|
connectionState = .reconnecting
|
|
|
|
heartbeat.stop()
|
|
receiveTask?.cancel()
|
|
webSocketTask?.cancel(with: .abnormalClosure, reason: nil)
|
|
webSocketTask = nil
|
|
|
|
AppLogger.realtime.info("WebSocket connection lost, scheduling reconnect (attempt \(self.reconnectAttempt + 1))")
|
|
scheduleReconnect()
|
|
}
|
|
|
|
private func scheduleReconnect() {
|
|
reconnectTask?.cancel()
|
|
reconnectTask = Task { [weak self] in
|
|
guard let self else { return }
|
|
|
|
let delay = min(pow(2.0, Double(self.reconnectAttempt)), self.maxReconnectDelay)
|
|
self.reconnectAttempt += 1
|
|
|
|
AppLogger.realtime.debug("Reconnecting in \(delay)s")
|
|
try? await Task.sleep(nanoseconds: UInt64(delay * 1_000_000_000))
|
|
|
|
guard !Task.isCancelled, self.connectionState == .reconnecting else { return }
|
|
|
|
self.establishConnection()
|
|
}
|
|
}
|
|
}
|