//! WebSocket handler for channel presence. //! //! Handles real-time position updates, emotion changes, and member synchronization. use axum::{ extract::{ ws::{Message, WebSocket, WebSocketUpgrade}, FromRef, Path, State, }, response::IntoResponse, }; use dashmap::DashMap; use futures::{SinkExt, StreamExt}; use sqlx::PgPool; use std::sync::Arc; use tokio::sync::broadcast; use uuid::Uuid; use chattyness_db::{ models::{AvatarRenderData, ChannelMemberWithAvatar, EmotionState, User}, queries::{avatars, channel_members, loose_props, realms, scenes}, ws_messages::{ClientMessage, ServerMessage}, }; use chattyness_error::AppError; use crate::auth::AuthUser; /// Channel state for broadcasting updates. pub struct ChannelState { /// Broadcast sender for this channel. tx: broadcast::Sender, } /// Global state for all WebSocket connections. pub struct WebSocketState { /// Map of channel_id -> ChannelState. channels: DashMap>, } impl Default for WebSocketState { fn default() -> Self { Self::new() } } impl WebSocketState { /// Create a new WebSocket state. pub fn new() -> Self { Self { channels: DashMap::new(), } } /// Get or create a channel state. fn get_or_create_channel(&self, channel_id: Uuid) -> Arc { self.channels .entry(channel_id) .or_insert_with(|| { let (tx, _) = broadcast::channel(256); Arc::new(ChannelState { tx }) }) .clone() } } /// WebSocket upgrade handler. /// /// GET /api/realms/{slug}/channels/{channel_id}/ws pub async fn ws_handler( Path((slug, channel_id)): Path<(String, Uuid)>, auth_result: Result, State(pool): State, State(ws_state): State>, ws: WebSocketUpgrade, ) -> Result where S: Send + Sync, PgPool: FromRef, Arc: FromRef, { // Log auth result before checking #[cfg(debug_assertions)] tracing::debug!( "[WS] Connection attempt to {}/channels/{} - auth: {:?}", slug, channel_id, auth_result.as_ref().map(|a| a.0.id).map_err(|e| format!("{:?}", e)) ); let AuthUser(user) = auth_result.map_err(|e| { tracing::warn!("[WS] Auth failed for {}/channels/{}: {:?}", slug, channel_id, e); AppError::from(e) })?; // Verify realm exists let realm = realms::get_realm_by_slug(&pool, &slug) .await? .ok_or_else(|| AppError::NotFound(format!("Realm '{}' not found", slug)))?; // Verify scene exists and belongs to this realm // Note: Using scene_id as channel_id since channel_members uses scenes directly let scene = scenes::get_scene_by_id(&pool, channel_id) .await? .ok_or_else(|| AppError::NotFound("Scene not found".to_string()))?; if scene.realm_id != realm.id { return Err(AppError::NotFound( "Scene not found in this realm".to_string(), )); } #[cfg(debug_assertions)] tracing::debug!( "[WS] Upgrading connection for user {} to channel {}", user.id, channel_id ); Ok(ws.on_upgrade(move |socket| { handle_socket(socket, user, channel_id, realm.id, pool, ws_state) })) } /// Set RLS context on a database connection. async fn set_rls_user_id( conn: &mut sqlx::pool::PoolConnection, user_id: Uuid, ) -> Result<(), sqlx::Error> { sqlx::query("SELECT public.set_current_user_id($1)") .bind(user_id) .execute(&mut **conn) .await?; Ok(()) } /// Handle an active WebSocket connection. async fn handle_socket( socket: WebSocket, user: User, channel_id: Uuid, realm_id: Uuid, pool: PgPool, ws_state: Arc, ) { tracing::info!( "[WS] handle_socket started for user {} channel {} realm {}", user.id, channel_id, realm_id ); // Acquire a dedicated connection for setup operations let mut conn = match pool.acquire().await { Ok(conn) => conn, Err(e) => { tracing::error!("[WS] Failed to acquire DB connection: {:?}", e); return; } }; // Set RLS context on this dedicated connection if let Err(e) = set_rls_user_id(&mut conn, user.id).await { tracing::error!("[WS] Failed to set RLS context for user {}: {:?}", user.id, e); return; } tracing::info!("[WS] RLS context set on dedicated connection"); let channel_state = ws_state.get_or_create_channel(channel_id); let mut rx = channel_state.tx.subscribe(); let (mut sender, mut receiver) = socket.split(); // Ensure active avatar tracing::info!("[WS] Ensuring active avatar..."); if let Err(e) = channel_members::ensure_active_avatar(&mut *conn, user.id, realm_id).await { tracing::error!("[WS] Failed to ensure avatar for user {}: {:?}", user.id, e); return; } tracing::info!("[WS] Avatar ensured"); // Join the channel tracing::info!("[WS] Joining channel..."); if let Err(e) = channel_members::join_channel(&mut *conn, channel_id, user.id).await { tracing::error!( "[WS] Failed to join channel {} for user {}: {:?}", channel_id, user.id, e ); return; } tracing::info!("[WS] Channel joined"); // Get initial state let members = match get_members_with_avatars(&mut conn, channel_id, realm_id).await { Ok(m) => m, Err(e) => { tracing::error!("[WS] Failed to get members: {:?}", e); let _ = channel_members::leave_channel(&mut *conn, channel_id, user.id).await; return; } }; let member = match channel_members::get_channel_member(&mut *conn, channel_id, user.id, realm_id) .await { Ok(Some(m)) => m, Ok(None) => { tracing::error!("[WS] Failed to get member info for user {}", user.id); let _ = channel_members::leave_channel(&mut *conn, channel_id, user.id).await; return; } Err(e) => { tracing::error!("[WS] Error getting member info: {:?}", e); let _ = channel_members::leave_channel(&mut *conn, channel_id, user.id).await; return; } }; // Send welcome message let welcome = ServerMessage::Welcome { member: member.clone(), members, }; if let Ok(json) = serde_json::to_string(&welcome) { #[cfg(debug_assertions)] tracing::debug!("[WS->Client] {}", json); if sender.send(Message::Text(json.into())).await.is_err() { let _ = channel_members::leave_channel(&mut *conn, channel_id, user.id).await; return; } } // Send loose props sync match loose_props::list_channel_loose_props(&mut *conn, channel_id).await { Ok(props) => { let props_sync = ServerMessage::LoosePropsSync { props }; if let Ok(json) = serde_json::to_string(&props_sync) { #[cfg(debug_assertions)] tracing::debug!("[WS->Client] {}", json); if sender.send(Message::Text(json.into())).await.is_err() { let _ = channel_members::leave_channel(&mut *conn, channel_id, user.id).await; return; } } } Err(e) => { tracing::warn!("[WS] Failed to get loose props: {:?}", e); } } // Broadcast join to others let avatar = avatars::get_avatar_with_paths_conn(&mut *conn, user.id, realm_id) .await .ok() .flatten() .map(|a| a.to_render_data()) .unwrap_or_default(); let join_msg = ServerMessage::MemberJoined { member: ChannelMemberWithAvatar { member, avatar }, }; let _ = channel_state.tx.send(join_msg); let user_id = user.id; let tx = channel_state.tx.clone(); // Acquire a second dedicated connection for the receive task // This connection needs its own RLS context let mut recv_conn = match pool.acquire().await { Ok(c) => c, Err(e) => { tracing::error!("[WS] Failed to acquire recv connection: {:?}", e); let _ = channel_members::leave_channel(&mut *conn, channel_id, user_id).await; return; } }; if let Err(e) = set_rls_user_id(&mut recv_conn, user_id).await { tracing::error!("[WS] Failed to set RLS on recv connection: {:?}", e); let _ = channel_members::leave_channel(&mut *conn, channel_id, user_id).await; return; } // Drop the setup connection - we'll use recv_conn for the receive task // and pool for cleanup (leave_channel needs user_id match anyway) drop(conn); // Spawn task to handle incoming messages from client let recv_task = tokio::spawn(async move { while let Some(Ok(msg)) = receiver.next().await { if let Message::Text(text) = msg { #[cfg(debug_assertions)] tracing::debug!("[WS<-Client] {}", text); let Ok(client_msg) = serde_json::from_str::(&text) else { continue; }; match client_msg { ClientMessage::UpdatePosition { x, y } => { if let Err(e) = channel_members::update_position(&mut *recv_conn, channel_id, user_id, x, y) .await { #[cfg(debug_assertions)] tracing::error!("[WS] Position update failed: {:?}", e); continue; } let _ = tx.send(ServerMessage::PositionUpdated { user_id: Some(user_id), guest_session_id: None, x, y, }); } ClientMessage::UpdateEmotion { emotion } => { // Parse emotion name to EmotionState let emotion_state = match emotion.parse::() { Ok(e) => e, Err(_) => { #[cfg(debug_assertions)] tracing::warn!("[WS] Invalid emotion name: {}", emotion); continue; } }; let emotion_layer = match avatars::set_emotion( &mut *recv_conn, user_id, realm_id, emotion_state, ) .await { Ok(layer) => layer, Err(e) => { #[cfg(debug_assertions)] tracing::error!("[WS] Emotion update failed: {:?}", e); continue; } }; let _ = tx.send(ServerMessage::EmotionUpdated { user_id: Some(user_id), guest_session_id: None, emotion, emotion_layer, }); } ClientMessage::Ping => { // Respond with pong directly (not broadcast) // This is handled in the send task via individual message } ClientMessage::SendChatMessage { content } => { // Validate message if content.is_empty() || content.len() > 500 { continue; } // Get member's current position and emotion let member_info = channel_members::get_channel_member( &mut *recv_conn, channel_id, user_id, realm_id, ) .await; if let Ok(Some(member)) = member_info { // Convert emotion index to name let emotion_name = EmotionState::from_index(member.current_emotion as u8) .map(|e| e.to_string()) .unwrap_or_else(|| "neutral".to_string()); let msg = ServerMessage::ChatMessageReceived { message_id: Uuid::new_v4(), user_id: Some(user_id), guest_session_id: None, display_name: member.display_name.clone(), content, emotion: emotion_name, x: member.position_x, y: member.position_y, timestamp: chrono::Utc::now().timestamp_millis(), }; let _ = tx.send(msg); } } ClientMessage::DropProp { inventory_item_id } => { // Get user's current position for random offset let member_info = channel_members::get_channel_member( &mut *recv_conn, channel_id, user_id, realm_id, ) .await; if let Ok(Some(member)) = member_info { // Generate random offset (within ~50 pixels) let offset_x = (rand::random::() - 0.5) * 100.0; let offset_y = (rand::random::() - 0.5) * 100.0; let pos_x = member.position_x + offset_x; let pos_y = member.position_y + offset_y; match loose_props::drop_prop_to_canvas( &mut *recv_conn, inventory_item_id, user_id, channel_id, pos_x, pos_y, ) .await { Ok(prop) => { #[cfg(debug_assertions)] tracing::debug!( "[WS] User {} dropped prop {} at ({}, {})", user_id, prop.id, pos_x, pos_y ); let _ = tx.send(ServerMessage::PropDropped { prop }); } Err(e) => { tracing::error!("[WS] Drop prop failed: {:?}", e); let (code, message) = match &e { chattyness_error::AppError::Forbidden(msg) => { ("PROP_NOT_DROPPABLE".to_string(), msg.clone()) } chattyness_error::AppError::NotFound(msg) => { ("PROP_NOT_FOUND".to_string(), msg.clone()) } _ => ("DROP_FAILED".to_string(), format!("{:?}", e)), }; let _ = tx.send(ServerMessage::Error { code, message }); } } } } ClientMessage::PickUpProp { loose_prop_id } => { match loose_props::pick_up_loose_prop( &mut *recv_conn, loose_prop_id, user_id, ) .await { Ok(_inventory_item) => { #[cfg(debug_assertions)] tracing::debug!( "[WS] User {} picked up prop {}", user_id, loose_prop_id ); let _ = tx.send(ServerMessage::PropPickedUp { prop_id: loose_prop_id, picked_up_by_user_id: Some(user_id), picked_up_by_guest_id: None, }); } Err(e) => { tracing::error!("[WS] Pick up prop failed: {:?}", e); let _ = tx.send(ServerMessage::Error { code: "PICKUP_FAILED".to_string(), message: format!("{:?}", e), }); } } } } } } // Return the connection so we can use it for cleanup recv_conn }); // Spawn task to forward broadcasts to this client let send_task = tokio::spawn(async move { while let Ok(msg) = rx.recv().await { if let Ok(json) = serde_json::to_string(&msg) { #[cfg(debug_assertions)] tracing::debug!("[WS->Client] {}", json); if sender.send(Message::Text(json.into())).await.is_err() { break; } } } }); // Wait for either task to complete tokio::select! { recv_result = recv_task => { // recv_task finished, get connection back for cleanup if let Ok(mut cleanup_conn) = recv_result { let _ = channel_members::leave_channel(&mut *cleanup_conn, channel_id, user_id).await; } else { // Task panicked, use pool (RLS may fail but try anyway) let _ = channel_members::leave_channel(&pool, channel_id, user_id).await; } } _ = send_task => { // send_task finished first, need to acquire a new connection for cleanup if let Ok(mut cleanup_conn) = pool.acquire().await { let _ = set_rls_user_id(&mut cleanup_conn, user_id).await; let _ = channel_members::leave_channel(&mut *cleanup_conn, channel_id, user_id).await; } } } tracing::info!( "[WS] User {} disconnected from channel {}", user_id, channel_id ); // Broadcast departure let _ = channel_state.tx.send(ServerMessage::MemberLeft { user_id: Some(user_id), guest_session_id: None, }); } /// Helper: Get all channel members with their avatar render data. async fn get_members_with_avatars( conn: &mut sqlx::pool::PoolConnection, channel_id: Uuid, realm_id: Uuid, ) -> Result, AppError> { // Get members first let members = channel_members::get_channel_members(&mut **conn, channel_id, realm_id).await?; // Fetch avatar data for each member using full avatar with paths // This avoids the CASE statement approach and handles all emotions correctly let mut result = Vec::with_capacity(members.len()); for member in members { let avatar = if let Some(user_id) = member.user_id { // Get full avatar and convert to render data for current emotion avatars::get_avatar_with_paths_conn(&mut **conn, user_id, realm_id) .await .ok() .flatten() .map(|a| a.to_render_data()) .unwrap_or_default() } else { // Guest users don't have avatars AvatarRenderData::default() }; result.push(ChannelMemberWithAvatar { member, avatar }); } Ok(result) }