//! WebSocket handler for channel presence. //! //! Handles real-time position updates, emotion changes, and member synchronization. use axum::{ extract::{ FromRef, Path, State, ws::{CloseFrame, Message, WebSocket, WebSocketUpgrade}, }, response::IntoResponse, }; use dashmap::DashMap; use futures::{SinkExt, StreamExt}; use sqlx::PgPool; use std::sync::Arc; use std::time::Duration; use tokio::sync::{broadcast, mpsc}; use uuid::Uuid; use chattyness_db::{ models::{ActionType, AvatarRenderData, ChannelMemberWithAvatar, EmotionState, ForcedAvatarReason, User}, queries::{avatars, channel_members, loose_props, memberships, moderation, realm_avatars, realms, scenes, server_avatars, users}, ws_messages::{close_codes, ClientMessage, DisconnectReason, ServerMessage, WsConfig}, }; use chattyness_error::AppError; use chattyness_shared::WebSocketConfig; use crate::auth::AuthUser; use chrono::Duration as ChronoDuration; /// Parse a duration string like "30m", "2h", "7d" into a chrono::Duration. fn parse_duration(s: &str) -> Option { let s = s.trim().to_lowercase(); if s.is_empty() { return None; } let (num_str, unit) = s.split_at(s.len() - 1); let num: i64 = num_str.parse().ok()?; match unit { "m" => Some(ChronoDuration::minutes(num)), "h" => Some(ChronoDuration::hours(num)), "d" => Some(ChronoDuration::days(num)), _ => None, } } /// Channel state for broadcasting updates. pub struct ChannelState { /// Broadcast sender for this channel. tx: broadcast::Sender, } /// Connection info for a connected user. #[derive(Clone)] pub struct UserConnection { /// Direct message sender for this user. pub direct_tx: mpsc::Sender, /// Realm the user is in. pub realm_id: Uuid, /// Channel (scene) the user is in. pub channel_id: Uuid, /// User's display name. pub display_name: String, } /// Global state for all WebSocket connections. pub struct WebSocketState { /// Map of channel_id -> ChannelState. channels: DashMap>, /// Map of user_id -> UserConnection for direct message routing. users: DashMap, } impl Default for WebSocketState { fn default() -> Self { Self::new() } } impl WebSocketState { /// Create a new WebSocket state. pub fn new() -> Self { Self { channels: DashMap::new(), users: DashMap::new(), } } /// Get or create a channel state. fn get_or_create_channel(&self, channel_id: Uuid) -> Arc { self.channels .entry(channel_id) .or_insert_with(|| { let (tx, _) = broadcast::channel(256); Arc::new(ChannelState { tx }) }) .clone() } /// Register a user connection for direct messaging. pub fn register_user( &self, user_id: Uuid, direct_tx: mpsc::Sender, realm_id: Uuid, channel_id: Uuid, display_name: String, ) { self.users.insert( user_id, UserConnection { direct_tx, realm_id, channel_id, display_name, }, ); } /// Unregister a user connection. pub fn unregister_user(&self, user_id: Uuid) { self.users.remove(&user_id); } /// Find a user by display name within a realm. pub fn find_user_by_display_name( &self, realm_id: Uuid, display_name: &str, ) -> Option<(Uuid, UserConnection)> { for entry in self.users.iter() { let (user_id, conn) = entry.pair(); if conn.realm_id == realm_id && conn.display_name.eq_ignore_ascii_case(display_name) { return Some((*user_id, conn.clone())); } } None } /// Get a user's connection info. pub fn get_user(&self, user_id: Uuid) -> Option { self.users.get(&user_id).map(|r| r.clone()) } } /// WebSocket upgrade handler. /// /// GET /api/realms/{slug}/channels/{channel_id}/ws pub async fn ws_handler( Path((slug, channel_id)): Path<(String, Uuid)>, auth_result: Result, State(pool): State, State(ws_state): State>, State(ws_config): State, ws: WebSocketUpgrade, ) -> Result where S: Send + Sync, PgPool: FromRef, Arc: FromRef, WebSocketConfig: FromRef, { // Log auth result before checking #[cfg(debug_assertions)] tracing::debug!( "[WS] Connection attempt to {}/channels/{} - auth: {:?}", slug, channel_id, auth_result .as_ref() .map(|a| a.0.id) .map_err(|e| format!("{:?}", e)) ); let AuthUser(user) = auth_result.map_err(|e| { tracing::warn!( "[WS] Auth failed for {}/channels/{}: {:?}", slug, channel_id, e ); AppError::from(e) })?; // Verify realm exists let realm = realms::get_realm_by_slug(&pool, &slug) .await? .ok_or_else(|| AppError::NotFound(format!("Realm '{}' not found", slug)))?; // Verify scene exists and belongs to this realm // Note: Using scene_id as channel_id since channel_members uses scenes directly let scene = scenes::get_scene_by_id(&pool, channel_id) .await? .ok_or_else(|| AppError::NotFound("Scene not found".to_string()))?; if scene.realm_id != realm.id { return Err(AppError::NotFound( "Scene not found in this realm".to_string(), )); } #[cfg(debug_assertions)] tracing::debug!( "[WS] Upgrading connection for user {} to channel {}", user.id, channel_id ); Ok(ws.on_upgrade(move |socket| { handle_socket( socket, user, channel_id, realm.id, pool, ws_state, ws_config, ) })) } /// Set RLS context on a database connection. async fn set_rls_user_id( conn: &mut sqlx::pool::PoolConnection, user_id: Uuid, ) -> Result<(), sqlx::Error> { sqlx::query("SELECT public.set_current_user_id($1)") .bind(user_id) .execute(&mut **conn) .await?; Ok(()) } /// Handle an active WebSocket connection. async fn handle_socket( socket: WebSocket, user: User, channel_id: Uuid, realm_id: Uuid, pool: PgPool, ws_state: Arc, ws_config: WebSocketConfig, ) { tracing::info!( "[WS] handle_socket started for user {} channel {} realm {}", user.id, channel_id, realm_id ); // Acquire a dedicated connection for setup operations let mut conn = match pool.acquire().await { Ok(conn) => conn, Err(e) => { tracing::error!("[WS] Failed to acquire DB connection: {:?}", e); return; } }; // Set RLS context on this dedicated connection if let Err(e) = set_rls_user_id(&mut conn, user.id).await { tracing::error!( "[WS] Failed to set RLS context for user {}: {:?}", user.id, e ); return; } tracing::info!("[WS] RLS context set on dedicated connection"); let channel_state = ws_state.get_or_create_channel(channel_id); let mut rx = channel_state.tx.subscribe(); let (mut sender, mut receiver) = socket.split(); // Ensure active avatar tracing::info!("[WS] Ensuring active avatar..."); if let Err(e) = channel_members::ensure_active_avatar(&mut *conn, user.id, realm_id).await { tracing::error!("[WS] Failed to ensure avatar for user {}: {:?}", user.id, e); return; } tracing::info!("[WS] Avatar ensured"); // Join the channel tracing::info!("[WS] Joining channel..."); if let Err(e) = channel_members::join_channel(&mut *conn, channel_id, user.id).await { tracing::error!( "[WS] Failed to join channel {} for user {}: {:?}", channel_id, user.id, e ); return; } tracing::info!("[WS] Channel joined"); // Check if scene has forced avatar if let Ok(Some(scene_forced)) = realm_avatars::get_scene_forced_avatar(&pool, channel_id).await { tracing::info!("[WS] Scene has forced avatar: {:?}", scene_forced.forced_avatar_id); // Apply scene-forced avatar to user if let Err(e) = realm_avatars::apply_scene_forced_avatar( &pool, user.id, realm_id, scene_forced.forced_avatar_id, ).await { tracing::warn!("[WS] Failed to apply scene forced avatar: {:?}", e); } } // Get initial state let members = match get_members_with_avatars(&mut conn, &pool, channel_id, realm_id).await { Ok(m) => m, Err(e) => { tracing::error!("[WS] Failed to get members: {:?}", e); let _ = channel_members::leave_channel(&mut *conn, channel_id, user.id).await; return; } }; let member = match channel_members::get_channel_member( &mut *conn, channel_id, user.id, realm_id, ) .await { Ok(Some(m)) => m, Ok(None) => { tracing::error!("[WS] Failed to get member info for user {}", user.id); let _ = channel_members::leave_channel(&mut *conn, channel_id, user.id).await; return; } Err(e) => { tracing::error!("[WS] Error getting member info: {:?}", e); let _ = channel_members::leave_channel(&mut *conn, channel_id, user.id).await; return; } }; // Send welcome message with config let welcome = ServerMessage::Welcome { member: member.clone(), members, config: WsConfig { ping_interval_secs: ws_config.client_ping_interval_secs, }, }; if let Ok(json) = serde_json::to_string(&welcome) { #[cfg(debug_assertions)] tracing::debug!("[WS->Client] {}", json); if sender.send(Message::Text(json.into())).await.is_err() { let _ = channel_members::leave_channel(&mut *conn, channel_id, user.id).await; return; } } // Send loose props sync match loose_props::list_channel_loose_props(&mut *conn, channel_id).await { Ok(props) => { let props_sync = ServerMessage::LoosePropsSync { props }; if let Ok(json) = serde_json::to_string(&props_sync) { #[cfg(debug_assertions)] tracing::debug!("[WS->Client] {}", json); if sender.send(Message::Text(json.into())).await.is_err() { let _ = channel_members::leave_channel(&mut *conn, channel_id, user.id).await; return; } } } Err(e) => { tracing::warn!("[WS] Failed to get loose props: {:?}", e); } } // Save member display_name for user registration (before member is moved) let member_display_name = member.display_name.clone(); // Broadcast join to others // Use effective avatar resolution to handle priority chain: // forced > custom > selected realm > selected server > realm default > server default let avatar = avatars::get_effective_avatar_render_data(&pool, user.id, realm_id) .await .ok() .flatten() .map(|(render_data, _source)| render_data) .unwrap_or_default(); let join_msg = ServerMessage::MemberJoined { member: ChannelMemberWithAvatar { member, avatar }, }; let _ = channel_state.tx.send(join_msg); let user_id = user.id; let mut is_guest = user.is_guest(); let tx = channel_state.tx.clone(); // Acquire a second dedicated connection for the receive task // This connection needs its own RLS context let mut recv_conn = match pool.acquire().await { Ok(c) => c, Err(e) => { tracing::error!("[WS] Failed to acquire recv connection: {:?}", e); let _ = channel_members::leave_channel(&mut *conn, channel_id, user_id).await; return; } }; if let Err(e) = set_rls_user_id(&mut recv_conn, user_id).await { tracing::error!("[WS] Failed to set RLS on recv connection: {:?}", e); let _ = channel_members::leave_channel(&mut *conn, channel_id, user_id).await; return; } // Drop the setup connection - we'll use recv_conn for the receive task // and pool for cleanup (leave_channel needs user_id match anyway) drop(conn); // Channel for sending direct messages (Pong, whispers) to client let (direct_tx, mut direct_rx) = mpsc::channel::(16); // Register user for direct message routing ws_state.register_user( user_id, direct_tx.clone(), realm_id, channel_id, member_display_name, ); // Clone ws_state for use in recv_task let ws_state_for_recv = ws_state.clone(); // Clone pool for use in recv_task (for teleport queries) let pool_for_recv = pool.clone(); // Create recv timeout from config let recv_timeout = Duration::from_secs(ws_config.recv_timeout_secs); // Channel for sending close frame requests from recv_task to send_task let (close_tx, mut close_rx) = mpsc::channel::<(u16, String)>(1); // Spawn task to handle incoming messages from client let close_tx_for_recv = close_tx.clone(); let recv_task = tokio::spawn(async move { let close_tx = close_tx_for_recv; let pool = pool_for_recv; let ws_state = ws_state_for_recv; let mut disconnect_reason = DisconnectReason::Graceful; loop { // Use timeout to detect connection loss let msg_result = tokio::time::timeout(recv_timeout, receiver.next()).await; match msg_result { Ok(Some(Ok(msg))) => { match msg { Message::Text(text) => { #[cfg(debug_assertions)] tracing::debug!("[WS<-Client] {}", text); let Ok(client_msg) = serde_json::from_str::(&text) else { continue; }; match client_msg { ClientMessage::UpdatePosition { x, y } => { if let Err(e) = channel_members::update_position( &mut *recv_conn, channel_id, user_id, x, y, ) .await { #[cfg(debug_assertions)] tracing::error!("[WS] Position update failed: {:?}", e); continue; } let _ = tx.send(ServerMessage::PositionUpdated { user_id, x, y, }); } ClientMessage::UpdateEmotion { emotion } => { // Parse emotion name to EmotionState let emotion_state = match emotion.parse::() { Ok(e) => e, Err(_) => { #[cfg(debug_assertions)] tracing::warn!( "[WS] Invalid emotion name: {}", emotion ); continue; } }; let emotion_layer = match avatars::set_emotion( &mut *recv_conn, &pool, user_id, realm_id, emotion_state, ) .await { Ok(layer) => layer, Err(e) => { #[cfg(debug_assertions)] tracing::error!("[WS] Emotion update failed: {:?}", e); continue; } }; let _ = tx.send(ServerMessage::EmotionUpdated { user_id, emotion, emotion_layer, }); } ClientMessage::Ping => { // Update last_moved_at to keep member alive for cleanup let _ = channel_members::touch_member( &mut *recv_conn, channel_id, user_id, ) .await; // Respond with pong directly (not broadcast) let _ = direct_tx.send(ServerMessage::Pong).await; } ClientMessage::SendChatMessage { content, target_display_name, } => { // Validate message if content.is_empty() || content.len() > 500 { continue; } // Get member's current position and emotion let member_info = channel_members::get_channel_member( &mut *recv_conn, channel_id, user_id, realm_id, ) .await; if let Ok(Some(member)) = member_info { // Convert emotion index to name let emotion_name = EmotionState::from_index(member.current_emotion as u8) .map(|e| e.to_string()) .unwrap_or_else(|| "neutral".to_string()); // Handle whisper (direct message) vs broadcast if let Some(target_name) = target_display_name { // Check if guest is trying to whisper if is_guest { let _ = direct_tx .send(ServerMessage::Error { code: "GUEST_FEATURE_DISABLED".to_string(), message: "Private messaging is disabled for guests, please register first.".to_string(), }) .await; continue; } // Whisper: send directly to target user if let Some((_target_user_id, target_conn)) = ws_state .find_user_by_display_name(realm_id, &target_name) { // Determine if same scene let is_same_scene = target_conn.channel_id == channel_id; let msg = ServerMessage::ChatMessageReceived { message_id: Uuid::new_v4(), user_id, display_name: member.display_name.clone(), content: content.clone(), emotion: emotion_name.clone(), x: member.position_x, y: member.position_y, timestamp: chrono::Utc::now() .timestamp_millis(), is_whisper: true, is_same_scene, }; // Send to target user let _ = target_conn.direct_tx.send(msg.clone()).await; // Also send back to sender (so they see their own whisper) // For sender, is_same_scene is always true (they see it as a bubble) let sender_msg = ServerMessage::ChatMessageReceived { message_id: Uuid::new_v4(), user_id, display_name: member.display_name.clone(), content, emotion: emotion_name, x: member.position_x, y: member.position_y, timestamp: chrono::Utc::now() .timestamp_millis(), is_whisper: true, is_same_scene: true, // Sender always sees as bubble }; let _ = direct_tx.send(sender_msg).await; #[cfg(debug_assertions)] tracing::debug!( "[WS] Whisper from {} to {} (same_scene={})", member.display_name, target_name, is_same_scene ); } else { // Target user not found - send error let _ = direct_tx.send(ServerMessage::Error { code: "WHISPER_TARGET_NOT_FOUND".to_string(), message: format!("User '{}' is not online or not in this realm", target_name), }).await; } } else { // Broadcast: send to all users in the channel let msg = ServerMessage::ChatMessageReceived { message_id: Uuid::new_v4(), user_id, display_name: member.display_name.clone(), content, emotion: emotion_name, x: member.position_x, y: member.position_y, timestamp: chrono::Utc::now().timestamp_millis(), is_whisper: false, is_same_scene: true, }; let _ = tx.send(msg); } } } ClientMessage::DropProp { inventory_item_id } => { // Ensure instance exists for this scene (required for loose_props FK) // In this system, channel_id = scene_id if let Err(e) = loose_props::ensure_scene_instance( &mut *recv_conn, channel_id, ) .await { tracing::error!( "[WS] Failed to ensure scene instance: {:?}", e ); } // Get user's current position for random offset let member_info = channel_members::get_channel_member( &mut *recv_conn, channel_id, user_id, realm_id, ) .await; if let Ok(Some(member)) = member_info { // Generate random offset (within ~50 pixels) let offset_x = (rand::random::() - 0.5) * 100.0; let offset_y = (rand::random::() - 0.5) * 100.0; let pos_x = member.position_x + offset_x; let pos_y = member.position_y + offset_y; match loose_props::drop_prop_to_canvas( &mut *recv_conn, inventory_item_id, user_id, channel_id, pos_x, pos_y, ) .await { Ok(prop) => { #[cfg(debug_assertions)] tracing::debug!( "[WS] User {} dropped prop {} at ({}, {})", user_id, prop.id, pos_x, pos_y ); let _ = tx.send(ServerMessage::PropDropped { prop }); } Err(e) => { tracing::error!("[WS] Drop prop failed: {:?}", e); let (code, message) = match &e { chattyness_error::AppError::Forbidden(msg) => ( "PROP_NOT_DROPPABLE".to_string(), msg.clone(), ), chattyness_error::AppError::NotFound(msg) => { ("PROP_NOT_FOUND".to_string(), msg.clone()) } _ => ( "DROP_FAILED".to_string(), format!("{:?}", e), ), }; let _ = tx.send(ServerMessage::Error { code, message }); } } } } ClientMessage::PickUpProp { loose_prop_id } => { match loose_props::pick_up_loose_prop( &mut *recv_conn, loose_prop_id, user_id, ) .await { Ok(_inventory_item) => { #[cfg(debug_assertions)] tracing::debug!( "[WS] User {} picked up prop {}", user_id, loose_prop_id ); let _ = tx.send(ServerMessage::PropPickedUp { prop_id: loose_prop_id, picked_up_by_user_id: user_id, }); } Err(e) => { tracing::error!("[WS] Pick up prop failed: {:?}", e); let _ = tx.send(ServerMessage::Error { code: "PICKUP_FAILED".to_string(), message: format!("{:?}", e), }); } } } ClientMessage::SyncAvatar => { // Fetch current effective avatar from database and broadcast to channel // Uses the priority chain: forced > custom > selected realm > selected server > realm default > server default match avatars::get_effective_avatar_render_data( &pool, user_id, realm_id, ) .await { Ok(Some((render_data, _source))) => { #[cfg(debug_assertions)] tracing::debug!( "[WS] User {} syncing avatar to channel", user_id ); let _ = tx.send(ServerMessage::AvatarUpdated { user_id, avatar: render_data, }); } Ok(None) => { #[cfg(debug_assertions)] tracing::warn!( "[WS] No avatar found for user {} to sync", user_id ); } Err(e) => { tracing::error!("[WS] Avatar sync failed: {:?}", e); } } } ClientMessage::Teleport { scene_id } => { // Validate teleport permission and scene // 1. Check realm allows user teleport let realm = match realms::get_realm_by_id( &pool, realm_id, ) .await { Ok(Some(r)) => r, Ok(None) => { let _ = direct_tx.send(ServerMessage::Error { code: "REALM_NOT_FOUND".to_string(), message: "Realm not found".to_string(), }).await; continue; } Err(e) => { tracing::error!("[WS] Teleport realm lookup failed: {:?}", e); let _ = direct_tx.send(ServerMessage::Error { code: "TELEPORT_FAILED".to_string(), message: "Failed to verify teleport permission".to_string(), }).await; continue; } }; if !realm.allow_user_teleport { let _ = direct_tx.send(ServerMessage::Error { code: "TELEPORT_DISABLED".to_string(), message: "Teleporting is not enabled for this realm".to_string(), }).await; continue; } // 2. Validate scene exists, belongs to realm, and is not hidden let scene = match scenes::get_scene_by_id(&pool, scene_id).await { Ok(Some(s)) => s, Ok(None) => { let _ = direct_tx.send(ServerMessage::Error { code: "SCENE_NOT_FOUND".to_string(), message: "Scene not found".to_string(), }).await; continue; } Err(e) => { tracing::error!("[WS] Teleport scene lookup failed: {:?}", e); let _ = direct_tx.send(ServerMessage::Error { code: "TELEPORT_FAILED".to_string(), message: "Failed to verify scene".to_string(), }).await; continue; } }; if scene.realm_id != realm_id { let _ = direct_tx.send(ServerMessage::Error { code: "SCENE_NOT_IN_REALM".to_string(), message: "Scene does not belong to this realm".to_string(), }).await; continue; } if scene.is_hidden { let _ = direct_tx.send(ServerMessage::Error { code: "SCENE_HIDDEN".to_string(), message: "Cannot teleport to a hidden scene".to_string(), }).await; continue; } // 3. Send approval - client will disconnect and reconnect #[cfg(debug_assertions)] tracing::debug!( "[WS] User {} teleporting to scene {} ({})", user_id, scene.name, scene.slug ); let _ = direct_tx.send(ServerMessage::TeleportApproved { scene_id: scene.id, scene_slug: scene.slug, }).await; } ClientMessage::ModCommand { subcommand, args } => { // Check if user is a moderator let is_mod = match memberships::is_moderator(&pool, user_id, realm_id).await { Ok(result) => result, Err(e) => { tracing::error!("[WS] Failed to check moderator status: {:?}", e); false } }; if !is_mod { let _ = direct_tx.send(ServerMessage::ModCommandResult { success: false, message: "You do not have moderator permissions".to_string(), }).await; continue; } // Get moderator's current scene info and display name let mod_member = match channel_members::get_channel_member( &mut *recv_conn, channel_id, user_id, realm_id, ).await { Ok(Some(m)) => m, Ok(None) | Err(_) => { let _ = direct_tx.send(ServerMessage::ModCommandResult { success: false, message: "Failed to get moderator info".to_string(), }).await; continue; } }; // Get moderator's current scene details let mod_scene = match scenes::get_scene_by_id(&pool, channel_id).await { Ok(Some(s)) => s, Ok(None) | Err(_) => { let _ = direct_tx.send(ServerMessage::ModCommandResult { success: false, message: "Failed to get scene info".to_string(), }).await; continue; } }; match subcommand.as_str() { "summon" => { if args.is_empty() { let _ = direct_tx.send(ServerMessage::ModCommandResult { success: false, message: "Usage: /mod summon [nick|*]".to_string(), }).await; continue; } let target = &args[0]; if target == "*" { // Summon all users in the realm let mut summoned_count = 0; let mut target_ids = Vec::new(); // Iterate all connected users in this realm for entry in ws_state.users.iter() { let (target_user_id, target_conn) = entry.pair(); if target_conn.realm_id == realm_id && *target_user_id != user_id { // Send Summoned message to each user let summon_msg = ServerMessage::Summoned { scene_id: mod_scene.id, scene_slug: mod_scene.slug.clone(), summoned_by: mod_member.display_name.clone(), }; if target_conn.direct_tx.send(summon_msg).await.is_ok() { summoned_count += 1; target_ids.push(*target_user_id); } } } // Log the action let metadata = serde_json::json!({ "scene_id": mod_scene.id, "scene_slug": mod_scene.slug, "summoned_count": summoned_count, }); let _ = moderation::log_moderation_action( &pool, realm_id, user_id, ActionType::SummonAll, None, &format!("Summoned {} users to scene {}", summoned_count, mod_scene.name), metadata, ).await; let _ = direct_tx.send(ServerMessage::ModCommandResult { success: true, message: format!("Summoned {} users to {}", summoned_count, mod_scene.name), }).await; } else { // Summon specific user by display name if let Some((target_user_id, target_conn)) = ws_state .find_user_by_display_name(realm_id, target) { if target_user_id == user_id { let _ = direct_tx.send(ServerMessage::ModCommandResult { success: false, message: "You cannot summon yourself".to_string(), }).await; continue; } // Send Summoned message to target let summon_msg = ServerMessage::Summoned { scene_id: mod_scene.id, scene_slug: mod_scene.slug.clone(), summoned_by: mod_member.display_name.clone(), }; if target_conn.direct_tx.send(summon_msg).await.is_ok() { // Log the action let metadata = serde_json::json!({ "scene_id": mod_scene.id, "scene_slug": mod_scene.slug, "target_display_name": target, }); let _ = moderation::log_moderation_action( &pool, realm_id, user_id, ActionType::Summon, Some(target_user_id), &format!("Summoned {} to scene {}", target, mod_scene.name), metadata, ).await; let _ = direct_tx.send(ServerMessage::ModCommandResult { success: true, message: format!("Summoned {} to {}", target, mod_scene.name), }).await; } else { let _ = direct_tx.send(ServerMessage::ModCommandResult { success: false, message: format!("Failed to send summon to {}", target), }).await; } } else { let _ = direct_tx.send(ServerMessage::ModCommandResult { success: false, message: format!("User '{}' is not online in this realm", target), }).await; } } } "teleport" => { if args.len() < 2 { let _ = direct_tx.send(ServerMessage::ModCommandResult { success: false, message: "Usage: /mod teleport [nick] [slug]".to_string(), }).await; continue; } let target_nick = &args[0]; let target_slug = &args[1]; // Look up the target scene by slug let target_scene = match scenes::get_scene_by_slug(&pool, realm_id, target_slug).await { Ok(Some(s)) => s, Ok(None) => { let _ = direct_tx.send(ServerMessage::ModCommandResult { success: false, message: format!("Scene '{}' not found", target_slug), }).await; continue; } Err(_) => { let _ = direct_tx.send(ServerMessage::ModCommandResult { success: false, message: "Failed to look up scene".to_string(), }).await; continue; } }; // Find target user by display name if let Some((target_user_id, target_conn)) = ws_state .find_user_by_display_name(realm_id, target_nick) { if target_user_id == user_id { let _ = direct_tx.send(ServerMessage::ModCommandResult { success: false, message: "You cannot teleport yourself".to_string(), }).await; continue; } // Send Summoned message to target user with the specified scene let teleport_msg = ServerMessage::Summoned { scene_id: target_scene.id, scene_slug: target_scene.slug.clone(), summoned_by: mod_member.display_name.clone(), }; if target_conn.direct_tx.send(teleport_msg).await.is_ok() { // Log the action let metadata = serde_json::json!({ "scene_id": target_scene.id, "scene_slug": target_scene.slug, "target_display_name": target_nick, }); let _ = moderation::log_moderation_action( &pool, realm_id, user_id, ActionType::Teleport, Some(target_user_id), &format!("Teleported {} to scene {}", target_nick, target_scene.name), metadata, ).await; let _ = direct_tx.send(ServerMessage::ModCommandResult { success: true, message: format!("Teleported {} to {}", target_nick, target_scene.name), }).await; } else { let _ = direct_tx.send(ServerMessage::ModCommandResult { success: false, message: format!("Failed to send teleport to {}", target_nick), }).await; } } else { let _ = direct_tx.send(ServerMessage::ModCommandResult { success: false, message: format!("User '{}' is not online in this realm", target_nick), }).await; } } "dress" => { // /mod dress [nick] [avatar-slug] [duration?] // Duration format: 30m, 2h, 7d (minutes/hours/days) if args.len() < 2 { let _ = direct_tx.send(ServerMessage::ModCommandResult { success: false, message: "Usage: /mod dress [nick] [avatar-slug] [duration?]".to_string(), }).await; continue; } let target_nick = &args[0]; let avatar_slug = &args[1]; let duration_str = args.get(2).map(|s| s.as_str()); // Parse duration if provided let duration = if let Some(dur_str) = duration_str { match parse_duration(dur_str) { Some(d) => Some(d), None => { let _ = direct_tx.send(ServerMessage::ModCommandResult { success: false, message: "Invalid duration format. Use: 30m, 2h, 7d".to_string(), }).await; continue; } } } else { None // Permanent until undressed }; // Find target user if let Some((target_user_id, target_conn)) = ws_state .find_user_by_display_name(realm_id, target_nick) { // Try realm avatars first, then server avatars let avatar_result = realm_avatars::get_realm_avatar_by_slug(&pool, realm_id, avatar_slug).await; let (avatar_render_data, avatar_id, source) = match avatar_result { Ok(Some(realm_avatar)) => { // Resolve realm avatar to render data match realm_avatars::resolve_realm_avatar_to_render_data(&pool, &realm_avatar, EmotionState::default()).await { Ok(render_data) => (render_data, realm_avatar.id, "realm"), Err(e) => { let _ = direct_tx.send(ServerMessage::ModCommandResult { success: false, message: format!("Failed to resolve avatar: {:?}", e), }).await; continue; } } } Ok(None) => { // Try server avatars match server_avatars::get_server_avatar_by_slug(&pool, avatar_slug).await { Ok(Some(server_avatar)) => { match server_avatars::resolve_server_avatar_to_render_data(&pool, &server_avatar, EmotionState::default()).await { Ok(render_data) => (render_data, server_avatar.id, "server"), Err(e) => { let _ = direct_tx.send(ServerMessage::ModCommandResult { success: false, message: format!("Failed to resolve avatar: {:?}", e), }).await; continue; } } } Ok(None) => { let _ = direct_tx.send(ServerMessage::ModCommandResult { success: false, message: format!("Avatar '{}' not found", avatar_slug), }).await; continue; } Err(e) => { let _ = direct_tx.send(ServerMessage::ModCommandResult { success: false, message: format!("Failed to lookup avatar: {:?}", e), }).await; continue; } } } Err(e) => { let _ = direct_tx.send(ServerMessage::ModCommandResult { success: false, message: format!("Failed to lookup avatar: {:?}", e), }).await; continue; } }; // Acquire connection and set RLS context for the update let mut rls_conn = match pool.acquire().await { Ok(c) => c, Err(e) => { let _ = direct_tx.send(ServerMessage::ModCommandResult { success: false, message: format!("Database connection error: {:?}", e), }).await; continue; } }; if let Err(e) = set_rls_user_id(&mut rls_conn, user_id).await { let _ = direct_tx.send(ServerMessage::ModCommandResult { success: false, message: format!("Failed to set RLS context: {:?}", e), }).await; continue; } // Apply forced avatar using connection with RLS context let apply_result = if source == "server" { server_avatars::apply_forced_server_avatar( &mut *rls_conn, target_user_id, realm_id, avatar_id, Some(user_id), duration, ).await } else { realm_avatars::apply_forced_realm_avatar( &mut *rls_conn, target_user_id, realm_id, avatar_id, Some(user_id), duration, ).await }; if let Err(e) = apply_result { let _ = direct_tx.send(ServerMessage::ModCommandResult { success: false, message: format!("Failed to apply forced avatar: {:?}", e), }).await; continue; } // Log moderation action let metadata = serde_json::json!({ "avatar_slug": avatar_slug, "avatar_id": avatar_id.to_string(), "source": source, "duration": duration_str, }); let _ = moderation::log_moderation_action( &pool, realm_id, user_id, ActionType::DressUser, Some(target_user_id), &format!("Forced {} to wear avatar '{}'", target_nick, avatar_slug), metadata, ).await; // Send AvatarForced to target user let _ = target_conn.direct_tx.send(ServerMessage::AvatarForced { user_id: target_user_id, avatar: avatar_render_data.clone(), reason: ForcedAvatarReason::ModCommand, forced_by: Some(mod_member.display_name.clone()), }).await; // Broadcast avatar update to channel let _ = tx.send(ServerMessage::AvatarUpdated { user_id: target_user_id, avatar: avatar_render_data, }); let duration_msg = match duration_str { Some(d) => format!(" for {}", d), None => " permanently".to_string(), }; let _ = direct_tx.send(ServerMessage::ModCommandResult { success: true, message: format!("Forced {} to wear '{}'{}", target_nick, avatar_slug, duration_msg), }).await; } else { let _ = direct_tx.send(ServerMessage::ModCommandResult { success: false, message: format!("User '{}' is not online in this realm", target_nick), }).await; } } "undress" => { // /mod undress [nick] if args.is_empty() { let _ = direct_tx.send(ServerMessage::ModCommandResult { success: false, message: "Usage: /mod undress [nick]".to_string(), }).await; continue; } let target_nick = &args[0]; // Find target user if let Some((target_user_id, target_conn)) = ws_state .find_user_by_display_name(realm_id, target_nick) { // Acquire connection and set RLS context for the update let mut rls_conn = match pool.acquire().await { Ok(c) => c, Err(e) => { let _ = direct_tx.send(ServerMessage::ModCommandResult { success: false, message: format!("Database connection error: {:?}", e), }).await; continue; } }; if let Err(e) = set_rls_user_id(&mut rls_conn, user_id).await { let _ = direct_tx.send(ServerMessage::ModCommandResult { success: false, message: format!("Failed to set RLS context: {:?}", e), }).await; continue; } // Clear forced avatar using connection with RLS context if let Err(e) = server_avatars::clear_forced_avatar(&mut *rls_conn, target_user_id, realm_id).await { let _ = direct_tx.send(ServerMessage::ModCommandResult { success: false, message: format!("Failed to clear forced avatar: {:?}", e), }).await; continue; } // Get the user's original avatar let original_avatar = match avatars::get_avatar_with_paths(&pool, target_user_id, realm_id).await { Ok(Some(avatar)) => avatar.to_render_data(), Ok(None) => AvatarRenderData::default(), Err(_) => AvatarRenderData::default(), }; // Log moderation action let metadata = serde_json::json!({}); let _ = moderation::log_moderation_action( &pool, realm_id, user_id, ActionType::UndressUser, Some(target_user_id), &format!("Cleared forced avatar for {}", target_nick), metadata, ).await; // Send AvatarCleared to target user let _ = target_conn.direct_tx.send(ServerMessage::AvatarCleared { user_id: target_user_id, avatar: original_avatar.clone(), cleared_by: Some(mod_member.display_name.clone()), }).await; // Broadcast avatar update to channel let _ = tx.send(ServerMessage::AvatarUpdated { user_id: target_user_id, avatar: original_avatar, }); let _ = direct_tx.send(ServerMessage::ModCommandResult { success: true, message: format!("Cleared forced avatar for {}", target_nick), }).await; } else { let _ = direct_tx.send(ServerMessage::ModCommandResult { success: false, message: format!("User '{}' is not online in this realm", target_nick), }).await; } } _ => { let _ = direct_tx.send(ServerMessage::ModCommandResult { success: false, message: format!("Unknown mod command: {}", subcommand), }).await; } } } ClientMessage::RefreshIdentity => { // Fetch updated user info from database match users::get_user_by_id(&pool, user_id).await { Ok(Some(updated_user)) => { // Update the is_guest flag - critical for allowing // newly registered users to send whispers is_guest = updated_user.is_guest(); let display_name = updated_user.display_name.clone(); tracing::info!( "[WS] User {} refreshed identity: display_name={}, is_guest={}", user_id, display_name, is_guest ); // Update WebSocket state with the new display name // This is critical for whispers and mod commands to find // the user by their new name after registration if let Some(conn) = ws_state.get_user(user_id) { ws_state.register_user( user_id, conn.direct_tx.clone(), conn.realm_id, conn.channel_id, display_name.clone(), ); } // Broadcast identity update to all channel members let _ = tx.send(ServerMessage::MemberIdentityUpdated { user_id, display_name, is_guest, }); } Ok(None) => { tracing::warn!("[WS] RefreshIdentity: user {} not found", user_id); } Err(e) => { tracing::error!("[WS] RefreshIdentity failed for user {}: {:?}", user_id, e); } } } } } Message::Close(close_frame) => { // Check close code for scene change or logout if let Some(CloseFrame { code, .. }) = close_frame { if code == close_codes::SCENE_CHANGE { disconnect_reason = DisconnectReason::SceneChange; } else if code == close_codes::LOGOUT { // Explicit logout - treat as graceful disconnect #[cfg(debug_assertions)] tracing::debug!("[WS] User {} logged out", user_id); disconnect_reason = DisconnectReason::Graceful; } else { disconnect_reason = DisconnectReason::Graceful; } } break; } _ => { // Ignore binary, ping, pong messages } } } Ok(Some(Err(e))) => { // WebSocket error tracing::warn!("[WS] Connection error: {:?}", e); disconnect_reason = DisconnectReason::Timeout; break; } Ok(None) => { // Stream ended gracefully break; } Err(_) => { // Timeout elapsed - connection likely lost tracing::info!("[WS] Connection timeout for user {}", user_id); // Send close frame with timeout code so client can attempt silent reconnection let _ = close_tx .send((close_codes::SERVER_TIMEOUT, "timeout".to_string())) .await; // Brief delay to allow close frame to be sent tokio::time::sleep(Duration::from_millis(100)).await; disconnect_reason = DisconnectReason::Timeout; break; } } } // Return the connection and disconnect reason for cleanup (recv_conn, disconnect_reason) }); // Spawn task to forward broadcasts, direct messages, and close frames to this client let send_task = tokio::spawn(async move { loop { tokio::select! { // Handle close frame requests (from timeout) Some((code, reason)) = close_rx.recv() => { #[cfg(debug_assertions)] tracing::debug!("[WS->Client] Sending close frame: code={}, reason={}", code, reason); let close_frame = CloseFrame { code, reason: reason.into(), }; let _ = sender.send(Message::Close(Some(close_frame))).await; break; } // Handle broadcast messages Ok(msg) = rx.recv() => { if let Ok(json) = serde_json::to_string(&msg) { #[cfg(debug_assertions)] tracing::debug!("[WS->Client] {}", json); if sender.send(Message::Text(json.into())).await.is_err() { break; } } } // Handle direct messages (Pong) Some(msg) = direct_rx.recv() => { if let Ok(json) = serde_json::to_string(&msg) { #[cfg(debug_assertions)] tracing::debug!("[WS->Client] {}", json); if sender.send(Message::Text(json.into())).await.is_err() { break; } } } else => break } } }); // Wait for either task to complete, track disconnect reason let disconnect_reason = tokio::select! { recv_result = recv_task => { // recv_task finished, get connection and reason back for cleanup if let Ok((mut cleanup_conn, reason)) = recv_result { let _ = channel_members::leave_channel(&mut *cleanup_conn, channel_id, user_id).await; reason } else { // Task panicked, use pool (RLS may fail but try anyway) let _ = channel_members::leave_channel(&pool, channel_id, user_id).await; DisconnectReason::Timeout } } _ = send_task => { // send_task finished first (likely client disconnect), need to acquire a new connection for cleanup if let Ok(mut cleanup_conn) = pool.acquire().await { let _ = set_rls_user_id(&mut cleanup_conn, user_id).await; let _ = channel_members::leave_channel(&mut *cleanup_conn, channel_id, user_id).await; } DisconnectReason::Graceful } }; // Unregister user from direct message routing ws_state.unregister_user(user_id); tracing::info!( "[WS] User {} disconnected from channel {} (reason: {:?})", user_id, channel_id, disconnect_reason ); // Broadcast departure with reason let _ = channel_state.tx.send(ServerMessage::MemberLeft { user_id, reason: disconnect_reason, }); } /// Helper: Get all channel members with their avatar render data. async fn get_members_with_avatars( conn: &mut sqlx::pool::PoolConnection, pool: &PgPool, channel_id: Uuid, realm_id: Uuid, ) -> Result, AppError> { // Get members first let members = channel_members::get_channel_members(&mut **conn, channel_id, realm_id).await?; // Fetch avatar data for each member using the effective avatar resolution // This handles the priority chain: forced > custom > selected realm > selected server > realm default > server default let mut result = Vec::with_capacity(members.len()); for member in members { // All members now have a user_id (guests are regular users with the 'guest' tag) // Use the effective avatar resolution which handles all priority levels let avatar = avatars::get_effective_avatar_render_data(pool, member.user_id, realm_id) .await .ok() .flatten() .map(|(render_data, _source)| render_data) .unwrap_or_default(); result.push(ChannelMemberWithAvatar { member, avatar }); } Ok(result) }