//! WebSocket client for channel presence. //! //! Provides a Leptos hook to manage WebSocket connections for real-time //! position updates, emotion changes, and member synchronization. use leptos::prelude::*; use leptos::reactive::owner::LocalStorage; #[cfg(feature = "hydrate")] use chattyness_db::models::EmotionState; use chattyness_db::models::{ChannelMemberWithAvatar, LooseProp}; use chattyness_db::ws_messages::{close_codes, ClientMessage}; #[cfg(feature = "hydrate")] use chattyness_db::ws_messages::{DisconnectReason, ServerMessage}; use super::chat_types::ChatMessage; /// Duration for fade-out animation in milliseconds. pub const FADE_DURATION_MS: i64 = 5000; /// Maximum number of silent reconnection attempts before showing overlay. pub const MAX_SILENT_RECONNECT_ATTEMPTS: u8 = 3; /// Delay between silent reconnection attempts in milliseconds. pub const SILENT_RECONNECT_DELAY_MS: u32 = 1000; /// A member that is currently fading out after a timeout disconnect. #[derive(Clone, Debug)] pub struct FadingMember { /// The member data. pub member: ChannelMemberWithAvatar, /// Timestamp when the fade started (milliseconds since epoch). pub fade_start: i64, /// Duration of the fade in milliseconds. pub fade_duration: i64, } /// WebSocket connection state. #[derive(Clone, Copy, PartialEq, Eq, Debug)] pub enum WsState { /// Attempting to connect. Connecting, /// Connected and ready. Connected, /// Disconnected (not connected). Disconnected, /// Connection error occurred. Error, /// Silently attempting to reconnect after server timeout. /// The u8 is the current attempt number (1-based). SilentReconnecting(u8), } /// Sender function type for WebSocket messages. pub type WsSender = Box; /// Local stored value type for the sender (non-Send, WASM-compatible). pub type WsSenderStorage = StoredValue, LocalStorage>; /// Information about the current channel member (received on Welcome). #[derive(Clone, Debug)] pub struct ChannelMemberInfo { /// The user's user_id (if authenticated user). pub user_id: Option, /// The user's guest_session_id (if guest). pub guest_session_id: Option, /// The user's display name. pub display_name: String, /// Whether this user is a guest (has the 'guest' tag). pub is_guest: bool, } /// WebSocket error info for UI display. #[derive(Clone, Debug)] pub struct WsError { /// Error code from server. pub code: String, /// Human-readable error message. pub message: String, } /// Teleport information received from server. #[derive(Clone, Debug)] pub struct TeleportInfo { /// Scene ID to teleport to. pub scene_id: uuid::Uuid, /// Scene slug for URL. pub scene_slug: String, } /// Hook to manage WebSocket connection for a channel. /// /// Returns a tuple of: /// - `Signal` - The current connection state /// - `WsSenderStorage` - A stored sender function to send messages #[cfg(feature = "hydrate")] pub fn use_channel_websocket( realm_slug: Signal, channel_id: Signal>, reconnect_trigger: RwSignal, on_members_update: Callback>, on_chat_message: Callback, on_loose_props_sync: Callback>, on_prop_dropped: Callback, on_prop_picked_up: Callback, on_member_fading: Callback, on_welcome: Option>, on_error: Option>, on_teleport_approved: Option>, ) -> (Signal, WsSenderStorage) { use std::cell::RefCell; use std::rc::Rc; use wasm_bindgen::{JsCast, closure::Closure}; use web_sys::{CloseEvent, ErrorEvent, MessageEvent, WebSocket}; let (ws_state, set_ws_state) = signal(WsState::Disconnected); let ws_ref: Rc>> = Rc::new(RefCell::new(None)); let members: Rc>> = Rc::new(RefCell::new(Vec::new())); // Track current user's ID to ignore self MemberLeft during reconnection let current_user_id: Rc>> = Rc::new(RefCell::new(None)); // Flag to track intentional closes (teleport, scene change) - guarantees local state // even if close code doesn't arrive correctly due to browser/server quirks let is_intentional_close: Rc> = Rc::new(RefCell::new(false)); // Create a stored sender function (using new_local for WASM single-threaded environment) let ws_ref_for_send = ws_ref.clone(); let sender: WsSenderStorage = StoredValue::new_local(Some(Box::new(move |msg: ClientMessage| { if let Some(ws) = ws_ref_for_send.borrow().as_ref() { if ws.ready_state() == WebSocket::OPEN { if let Ok(json) = serde_json::to_string(&msg) { #[cfg(debug_assertions)] web_sys::console::log_1(&format!("[WS->Server] {}", json).into()); let _ = ws.send_with_str(&json); } } } }))); // Effect to manage WebSocket lifecycle let ws_ref_clone = ws_ref.clone(); let members_clone = members.clone(); let is_intentional_close_for_cleanup = is_intentional_close.clone(); Effect::new(move |_| { let slug = realm_slug.get(); let ch_id = channel_id.get(); // Track reconnect_trigger to force reconnection when it changes let _trigger = reconnect_trigger.get(); // Cleanup previous connection if let Some(old_ws) = ws_ref_clone.borrow_mut().take() { #[cfg(debug_assertions)] web_sys::console::log_1( &format!("[WS] Closing old connection, readyState={}", old_ws.ready_state()).into(), ); // Set flag BEFORE closing - guarantees local state even if close code doesn't arrive *is_intentional_close_for_cleanup.borrow_mut() = true; // Close with SCENE_CHANGE code so onclose handler knows this was intentional let _ = old_ws.close_with_code_and_reason(close_codes::SCENE_CHANGE, "scene change"); } let Some(ch_id) = ch_id else { set_ws_state.set(WsState::Disconnected); return; }; if slug.is_empty() { set_ws_state.set(WsState::Disconnected); return; } // Construct WebSocket URL let window = web_sys::window().unwrap(); let location = window.location(); let protocol = if location.protocol().unwrap_or_default() == "https:" { "wss:" } else { "ws:" }; let host = location.host().unwrap_or_default(); let url = format!( "{}//{}/api/realms/{}/channels/{}/ws", protocol, host, slug, ch_id ); #[cfg(debug_assertions)] web_sys::console::log_1(&format!("[WS] Connecting to: {}", url).into()); set_ws_state.set(WsState::Connecting); let ws = match WebSocket::new(&url) { Ok(ws) => ws, Err(e) => { #[cfg(debug_assertions)] web_sys::console::error_1(&format!("[WS] Failed to create: {:?}", e).into()); set_ws_state.set(WsState::Error); return; } }; // onopen let set_ws_state_open = set_ws_state; let onopen = Closure::wrap(Box::new(move |_: web_sys::Event| { #[cfg(debug_assertions)] web_sys::console::log_1(&"[WS] Connected".into()); set_ws_state_open.set(WsState::Connected); }) as Box); ws.set_onopen(Some(onopen.as_ref().unchecked_ref())); onopen.forget(); // onmessage let members_for_msg = members_clone.clone(); let on_members_update_clone = on_members_update.clone(); let on_chat_message_clone = on_chat_message.clone(); let on_loose_props_sync_clone = on_loose_props_sync.clone(); let on_prop_dropped_clone = on_prop_dropped.clone(); let on_prop_picked_up_clone = on_prop_picked_up.clone(); let on_member_fading_clone = on_member_fading.clone(); let on_welcome_clone = on_welcome.clone(); let on_error_clone = on_error.clone(); let on_teleport_approved_clone = on_teleport_approved.clone(); // For starting heartbeat on Welcome let ws_ref_for_heartbeat = ws_ref.clone(); let heartbeat_started: Rc> = Rc::new(RefCell::new(false)); let heartbeat_started_clone = heartbeat_started.clone(); // For tracking current user ID to ignore self MemberLeft during reconnection let current_user_id_for_msg = current_user_id.clone(); let onmessage = Closure::wrap(Box::new(move |e: MessageEvent| { if let Ok(text) = e.data().dyn_into::() { let text: String = text.into(); #[cfg(debug_assertions)] web_sys::console::log_1(&format!("[WS<-Server] {}", text).into()); if let Ok(msg) = serde_json::from_str::(&text) { // Check for Welcome message to start heartbeat with server-provided config if let ServerMessage::Welcome { ref config, ref member, .. } = msg { // Track current user ID for MemberLeft filtering *current_user_id_for_msg.borrow_mut() = member.user_id; if !*heartbeat_started_clone.borrow() { *heartbeat_started_clone.borrow_mut() = true; let ping_interval_ms = config.ping_interval_secs * 1000; let ws_ref_ping = ws_ref_for_heartbeat.clone(); #[cfg(debug_assertions)] web_sys::console::log_1( &format!( "[WS] Starting heartbeat with interval {}ms", ping_interval_ms ) .into(), ); let heartbeat = gloo_timers::callback::Interval::new( ping_interval_ms as u32, move || { if let Some(ws) = ws_ref_ping.borrow().as_ref() { if ws.ready_state() == WebSocket::OPEN { if let Ok(json) = serde_json::to_string(&ClientMessage::Ping) { let _ = ws.send_with_str(&json); } } } }, ); std::mem::forget(heartbeat); } // Call on_welcome callback with current user info if let Some(ref callback) = on_welcome_clone { let info = ChannelMemberInfo { user_id: member.user_id, guest_session_id: member.guest_session_id, display_name: member.display_name.clone(), is_guest: member.is_guest, }; callback.run(info); } } handle_server_message( msg, &members_for_msg, &on_members_update_clone, &on_chat_message_clone, &on_loose_props_sync_clone, &on_prop_dropped_clone, &on_prop_picked_up_clone, &on_member_fading_clone, &on_error_clone, &on_teleport_approved_clone, ¤t_user_id_for_msg, ); } } }) as Box); ws.set_onmessage(Some(onmessage.as_ref().unchecked_ref())); onmessage.forget(); // onerror let set_ws_state_err = set_ws_state; let ws_state_for_err = ws_state; let reconnect_trigger_for_error = reconnect_trigger; let onerror = Closure::wrap(Box::new(move |e: ErrorEvent| { #[cfg(debug_assertions)] web_sys::console::error_1(&format!("[WS] Error: {:?}", e.message()).into()); // Check if we're in silent reconnection mode let current_state = ws_state_for_err.get_untracked(); if let WsState::SilentReconnecting(attempt) = current_state { if attempt < MAX_SILENT_RECONNECT_ATTEMPTS { // Try another silent reconnection let next_attempt = attempt + 1; #[cfg(debug_assertions)] web_sys::console::log_1( &format!( "[WS] Silent reconnection attempt {} failed, trying attempt {}", attempt, next_attempt ) .into(), ); set_ws_state_err.set(WsState::SilentReconnecting(next_attempt)); // Schedule next reconnection attempt let reconnect_trigger = reconnect_trigger_for_error; gloo_timers::callback::Timeout::new(SILENT_RECONNECT_DELAY_MS, move || { reconnect_trigger.update(|v| *v = v.wrapping_add(1)); }) .forget(); } else { // Max attempts reached, fall back to showing overlay #[cfg(debug_assertions)] web_sys::console::log_1( &"[WS] Silent reconnection failed, showing reconnection overlay".into(), ); set_ws_state_err.set(WsState::Error); } } else { set_ws_state_err.set(WsState::Error); } }) as Box); ws.set_onerror(Some(onerror.as_ref().unchecked_ref())); onerror.forget(); // onclose let set_ws_state_close = set_ws_state; let reconnect_trigger_for_close = reconnect_trigger; let is_intentional_close_for_onclose = is_intentional_close.clone(); let onclose = Closure::wrap(Box::new(move |e: CloseEvent| { let code = e.code(); #[cfg(debug_assertions)] web_sys::console::log_1( &format!("[WS] Closed: code={}, reason={}", code, e.reason()).into(), ); // Handle based on close code with defense-in-depth using flag if code == close_codes::SERVER_TIMEOUT { // Server timeout - attempt silent reconnection (highest priority) #[cfg(debug_assertions)] web_sys::console::log_1(&"[WS] Server timeout, attempting silent reconnection".into()); set_ws_state_close.set(WsState::SilentReconnecting(1)); // Schedule reconnection after delay let reconnect_trigger = reconnect_trigger_for_close; gloo_timers::callback::Timeout::new(SILENT_RECONNECT_DELAY_MS, move || { reconnect_trigger.update(|v| *v = v.wrapping_add(1)); }) .forget(); } else if code == close_codes::SCENE_CHANGE || *is_intentional_close_for_onclose.borrow() { // Intentional close (scene change/teleport) - don't show disconnection // Check both code AND flag for defense-in-depth (flag is guaranteed local state) #[cfg(debug_assertions)] web_sys::console::log_1(&"[WS] Intentional close, not setting Disconnected".into()); // Reset the flag for future connections *is_intentional_close_for_onclose.borrow_mut() = false; } else { // Other close codes - treat as disconnection set_ws_state_close.set(WsState::Disconnected); } }) as Box); ws.set_onclose(Some(onclose.as_ref().unchecked_ref())); onclose.forget(); *ws_ref_clone.borrow_mut() = Some(ws); }); (Signal::derive(move || ws_state.get()), sender) } /// Handle a message received from the server. #[cfg(feature = "hydrate")] fn handle_server_message( msg: ServerMessage, members: &std::rc::Rc>>, on_update: &Callback>, on_chat_message: &Callback, on_loose_props_sync: &Callback>, on_prop_dropped: &Callback, on_prop_picked_up: &Callback, on_member_fading: &Callback, on_error: &Option>, on_teleport_approved: &Option>, current_user_id: &std::rc::Rc>>, ) { let mut members_vec = members.borrow_mut(); match msg { ServerMessage::Welcome { member: _, members: initial_members, config: _, // Config is handled in the caller for heartbeat setup } => { *members_vec = initial_members; on_update.run(members_vec.clone()); } ServerMessage::MemberJoined { member } => { // Remove if exists (rejoin case), then add members_vec.retain(|m| { m.member.user_id != member.member.user_id || m.member.guest_session_id != member.member.guest_session_id }); members_vec.push(member); on_update.run(members_vec.clone()); } ServerMessage::MemberLeft { user_id, guest_session_id, reason, } => { // Check if this is our own MemberLeft due to timeout - ignore it during reconnection // so we don't see our own avatar fade out let own_user_id = *current_user_id.borrow(); let is_self = own_user_id.is_some() && user_id == own_user_id; if is_self && reason == DisconnectReason::Timeout { #[cfg(debug_assertions)] web_sys::console::log_1( &"[WS] Ignoring self MemberLeft during reconnection".into(), ); return; } // Find the member before removing let leaving_member = members_vec .iter() .find(|m| { m.member.user_id == user_id && m.member.guest_session_id == guest_session_id }) .cloned(); // Always remove from active members list members_vec.retain(|m| { m.member.user_id != user_id || m.member.guest_session_id != guest_session_id }); on_update.run(members_vec.clone()); // For timeout disconnects, trigger fading animation if reason == DisconnectReason::Timeout { if let Some(member) = leaving_member { let fading = FadingMember { member, fade_start: js_sys::Date::now() as i64, fade_duration: FADE_DURATION_MS, }; on_member_fading.run(fading); } } } ServerMessage::PositionUpdated { user_id, guest_session_id, x, y, } => { if let Some(m) = members_vec.iter_mut().find(|m| { m.member.user_id == user_id && m.member.guest_session_id == guest_session_id }) { m.member.position_x = x; m.member.position_y = y; } on_update.run(members_vec.clone()); } ServerMessage::EmotionUpdated { user_id, guest_session_id, emotion, emotion_layer, } => { if let Some(m) = members_vec.iter_mut().find(|m| { m.member.user_id == user_id && m.member.guest_session_id == guest_session_id }) { // Convert emotion name to index for internal state m.member.current_emotion = emotion .parse::() .map(|e| e.to_index() as i16) .unwrap_or(0); m.avatar.emotion_layer = emotion_layer; } on_update.run(members_vec.clone()); } ServerMessage::Pong => { // Heartbeat acknowledged - nothing to do } ServerMessage::Error { code, message } => { // Always log errors to console (not just debug mode) web_sys::console::error_1(&format!("[WS] Server error: {} - {}", code, message).into()); // Call error callback if provided if let Some(callback) = on_error { callback.run(WsError { code, message }); } } ServerMessage::ChatMessageReceived { message_id, user_id, guest_session_id, display_name, content, emotion, x, y, timestamp, is_whisper, is_same_scene, } => { let chat_msg = ChatMessage { message_id, user_id, guest_session_id, display_name, content, emotion, x, y, timestamp, is_whisper, is_same_scene, }; on_chat_message.run(chat_msg); } ServerMessage::LoosePropsSync { props } => { on_loose_props_sync.run(props); } ServerMessage::PropDropped { prop } => { on_prop_dropped.run(prop); } ServerMessage::PropPickedUp { prop_id, .. } => { on_prop_picked_up.run(prop_id); } ServerMessage::PropExpired { prop_id } => { // Treat expired props the same as picked up (remove from display) on_prop_picked_up.run(prop_id); } ServerMessage::AvatarUpdated { user_id, guest_session_id, avatar, } => { // Find member and update their avatar layers if let Some(m) = members_vec.iter_mut().find(|m| { m.member.user_id == user_id && m.member.guest_session_id == guest_session_id }) { m.avatar.skin_layer = avatar.skin_layer.clone(); m.avatar.clothes_layer = avatar.clothes_layer.clone(); m.avatar.accessories_layer = avatar.accessories_layer.clone(); m.avatar.emotion_layer = avatar.emotion_layer.clone(); } on_update.run(members_vec.clone()); } ServerMessage::TeleportApproved { scene_id, scene_slug, } => { if let Some(callback) = on_teleport_approved { callback.run(TeleportInfo { scene_id, scene_slug, }); } } } } /// Stub implementation for SSR (server-side rendering). #[cfg(not(feature = "hydrate"))] pub fn use_channel_websocket( _realm_slug: Signal, _channel_id: Signal>, _reconnect_trigger: RwSignal, _on_members_update: Callback>, _on_chat_message: Callback, _on_loose_props_sync: Callback>, _on_prop_dropped: Callback, _on_prop_picked_up: Callback, _on_member_fading: Callback, _on_welcome: Option>, _on_error: Option>, _on_teleport_approved: Option>, ) -> (Signal, WsSenderStorage) { let (ws_state, _) = signal(WsState::Disconnected); let sender: WsSenderStorage = StoredValue::new_local(None); (Signal::derive(move || ws_state.get()), sender) }