update to support user expire, timeout, and disconnect

This commit is contained in:
Evan Carroll 2026-01-17 23:47:02 -06:00
parent fe65835f4a
commit 5fcd49e847
16 changed files with 744 additions and 238 deletions

View file

@ -4,7 +4,7 @@
use axum::{
extract::{
ws::{Message, WebSocket, WebSocketUpgrade},
ws::{CloseFrame, Message, WebSocket, WebSocketUpgrade},
FromRef, Path, State,
},
response::IntoResponse,
@ -13,15 +13,20 @@ use dashmap::DashMap;
use futures::{SinkExt, StreamExt};
use sqlx::PgPool;
use std::sync::Arc;
use tokio::sync::broadcast;
use std::time::Duration;
use tokio::sync::{broadcast, mpsc};
use uuid::Uuid;
use chattyness_db::{
models::{AvatarRenderData, ChannelMemberWithAvatar, EmotionState, User},
queries::{avatars, channel_members, loose_props, realms, scenes},
ws_messages::{ClientMessage, ServerMessage},
ws_messages::{ClientMessage, DisconnectReason, ServerMessage, WsConfig},
};
use chattyness_error::AppError;
use chattyness_shared::WebSocketConfig;
/// Close code for scene change (custom code).
const SCENE_CHANGE_CLOSE_CODE: u16 = 4000;
use crate::auth::AuthUser;
@ -71,12 +76,14 @@ pub async fn ws_handler<S>(
auth_result: Result<AuthUser, crate::auth::AuthError>,
State(pool): State<PgPool>,
State(ws_state): State<Arc<WebSocketState>>,
State(ws_config): State<WebSocketConfig>,
ws: WebSocketUpgrade,
) -> Result<impl IntoResponse, AppError>
where
S: Send + Sync,
PgPool: FromRef<S>,
Arc<WebSocketState>: FromRef<S>,
WebSocketConfig: FromRef<S>,
{
// Log auth result before checking
#[cfg(debug_assertions)]
@ -117,7 +124,7 @@ where
);
Ok(ws.on_upgrade(move |socket| {
handle_socket(socket, user, channel_id, realm.id, pool, ws_state)
handle_socket(socket, user, channel_id, realm.id, pool, ws_state, ws_config)
}))
}
@ -141,6 +148,7 @@ async fn handle_socket(
realm_id: Uuid,
pool: PgPool,
ws_state: Arc<WebSocketState>,
ws_config: WebSocketConfig,
) {
tracing::info!(
"[WS] handle_socket started for user {} channel {} realm {}",
@ -217,10 +225,13 @@ async fn handle_socket(
}
};
// Send welcome message
// Send welcome message with config
let welcome = ServerMessage::Welcome {
member: member.clone(),
members,
config: WsConfig {
ping_interval_secs: ws_config.client_ping_interval_secs,
},
};
if let Ok(json) = serde_json::to_string(&welcome) {
#[cfg(debug_assertions)]
@ -284,269 +295,339 @@ async fn handle_socket(
// and pool for cleanup (leave_channel needs user_id match anyway)
drop(conn);
// Channel for sending direct messages (Pong) to client
let (direct_tx, mut direct_rx) = mpsc::channel::<ServerMessage>(16);
// Create recv timeout from config
let recv_timeout = Duration::from_secs(ws_config.recv_timeout_secs);
// Spawn task to handle incoming messages from client
let recv_task = tokio::spawn(async move {
while let Some(Ok(msg)) = receiver.next().await {
if let Message::Text(text) = msg {
#[cfg(debug_assertions)]
tracing::debug!("[WS<-Client] {}", text);
let mut disconnect_reason = DisconnectReason::Graceful;
let Ok(client_msg) = serde_json::from_str::<ClientMessage>(&text) else {
continue;
};
loop {
// Use timeout to detect connection loss
let msg_result = tokio::time::timeout(recv_timeout, receiver.next()).await;
match client_msg {
ClientMessage::UpdatePosition { x, y } => {
if let Err(e) =
channel_members::update_position(&mut *recv_conn, channel_id, user_id, x, y)
.await
{
match msg_result {
Ok(Some(Ok(msg))) => {
match msg {
Message::Text(text) => {
#[cfg(debug_assertions)]
tracing::error!("[WS] Position update failed: {:?}", e);
continue;
}
let _ = tx.send(ServerMessage::PositionUpdated {
user_id: Some(user_id),
guest_session_id: None,
x,
y,
});
}
ClientMessage::UpdateEmotion { emotion } => {
// Parse emotion name to EmotionState
let emotion_state = match emotion.parse::<EmotionState>() {
Ok(e) => e,
Err(_) => {
#[cfg(debug_assertions)]
tracing::warn!("[WS] Invalid emotion name: {}", emotion);
continue;
}
};
let emotion_layer = match avatars::set_emotion(
&mut *recv_conn,
user_id,
realm_id,
emotion_state,
)
.await
{
Ok(layer) => layer,
Err(e) => {
#[cfg(debug_assertions)]
tracing::error!("[WS] Emotion update failed: {:?}", e);
continue;
}
};
let _ = tx.send(ServerMessage::EmotionUpdated {
user_id: Some(user_id),
guest_session_id: None,
emotion,
emotion_layer,
});
}
ClientMessage::Ping => {
// Respond with pong directly (not broadcast)
// This is handled in the send task via individual message
}
ClientMessage::SendChatMessage { content } => {
// Validate message
if content.is_empty() || content.len() > 500 {
continue;
}
tracing::debug!("[WS<-Client] {}", text);
// Get member's current position and emotion
let member_info = channel_members::get_channel_member(
&mut *recv_conn,
channel_id,
user_id,
realm_id,
)
.await;
if let Ok(Some(member)) = member_info {
// Convert emotion index to name
let emotion_name = EmotionState::from_index(member.current_emotion as u8)
.map(|e| e.to_string())
.unwrap_or_else(|| "neutral".to_string());
let msg = ServerMessage::ChatMessageReceived {
message_id: Uuid::new_v4(),
user_id: Some(user_id),
guest_session_id: None,
display_name: member.display_name.clone(),
content,
emotion: emotion_name,
x: member.position_x,
y: member.position_y,
timestamp: chrono::Utc::now().timestamp_millis(),
let Ok(client_msg) = serde_json::from_str::<ClientMessage>(&text) else {
continue;
};
let _ = tx.send(msg);
}
}
ClientMessage::DropProp { inventory_item_id } => {
// Get user's current position for random offset
let member_info = channel_members::get_channel_member(
&mut *recv_conn,
channel_id,
user_id,
realm_id,
)
.await;
if let Ok(Some(member)) = member_info {
// Generate random offset (within ~50 pixels)
let offset_x = (rand::random::<f64>() - 0.5) * 100.0;
let offset_y = (rand::random::<f64>() - 0.5) * 100.0;
let pos_x = member.position_x + offset_x;
let pos_y = member.position_y + offset_y;
match loose_props::drop_prop_to_canvas(
&mut *recv_conn,
inventory_item_id,
user_id,
channel_id,
pos_x,
pos_y,
)
.await
{
Ok(prop) => {
#[cfg(debug_assertions)]
tracing::debug!(
"[WS] User {} dropped prop {} at ({}, {})",
user_id,
prop.id,
pos_x,
pos_y
);
let _ = tx.send(ServerMessage::PropDropped { prop });
match client_msg {
ClientMessage::UpdatePosition { x, y } => {
if let Err(e) =
channel_members::update_position(&mut *recv_conn, channel_id, user_id, x, y)
.await
{
#[cfg(debug_assertions)]
tracing::error!("[WS] Position update failed: {:?}", e);
continue;
}
let _ = tx.send(ServerMessage::PositionUpdated {
user_id: Some(user_id),
guest_session_id: None,
x,
y,
});
}
Err(e) => {
tracing::error!("[WS] Drop prop failed: {:?}", e);
let (code, message) = match &e {
chattyness_error::AppError::Forbidden(msg) => {
("PROP_NOT_DROPPABLE".to_string(), msg.clone())
ClientMessage::UpdateEmotion { emotion } => {
// Parse emotion name to EmotionState
let emotion_state = match emotion.parse::<EmotionState>() {
Ok(e) => e,
Err(_) => {
#[cfg(debug_assertions)]
tracing::warn!("[WS] Invalid emotion name: {}", emotion);
continue;
}
chattyness_error::AppError::NotFound(msg) => {
("PROP_NOT_FOUND".to_string(), msg.clone())
}
_ => ("DROP_FAILED".to_string(), format!("{:?}", e)),
};
let _ = tx.send(ServerMessage::Error { code, message });
let emotion_layer = match avatars::set_emotion(
&mut *recv_conn,
user_id,
realm_id,
emotion_state,
)
.await
{
Ok(layer) => layer,
Err(e) => {
#[cfg(debug_assertions)]
tracing::error!("[WS] Emotion update failed: {:?}", e);
continue;
}
};
let _ = tx.send(ServerMessage::EmotionUpdated {
user_id: Some(user_id),
guest_session_id: None,
emotion,
emotion_layer,
});
}
ClientMessage::Ping => {
// Update last_moved_at to keep member alive for cleanup
let _ = channel_members::touch_member(&mut *recv_conn, channel_id, user_id).await;
// Respond with pong directly (not broadcast)
let _ = direct_tx.send(ServerMessage::Pong).await;
}
ClientMessage::SendChatMessage { content } => {
// Validate message
if content.is_empty() || content.len() > 500 {
continue;
}
// Get member's current position and emotion
let member_info = channel_members::get_channel_member(
&mut *recv_conn,
channel_id,
user_id,
realm_id,
)
.await;
if let Ok(Some(member)) = member_info {
// Convert emotion index to name
let emotion_name = EmotionState::from_index(member.current_emotion as u8)
.map(|e| e.to_string())
.unwrap_or_else(|| "neutral".to_string());
let msg = ServerMessage::ChatMessageReceived {
message_id: Uuid::new_v4(),
user_id: Some(user_id),
guest_session_id: None,
display_name: member.display_name.clone(),
content,
emotion: emotion_name,
x: member.position_x,
y: member.position_y,
timestamp: chrono::Utc::now().timestamp_millis(),
};
let _ = tx.send(msg);
}
}
ClientMessage::DropProp { inventory_item_id } => {
// Get user's current position for random offset
let member_info = channel_members::get_channel_member(
&mut *recv_conn,
channel_id,
user_id,
realm_id,
)
.await;
if let Ok(Some(member)) = member_info {
// Generate random offset (within ~50 pixels)
let offset_x = (rand::random::<f64>() - 0.5) * 100.0;
let offset_y = (rand::random::<f64>() - 0.5) * 100.0;
let pos_x = member.position_x + offset_x;
let pos_y = member.position_y + offset_y;
match loose_props::drop_prop_to_canvas(
&mut *recv_conn,
inventory_item_id,
user_id,
channel_id,
pos_x,
pos_y,
)
.await
{
Ok(prop) => {
#[cfg(debug_assertions)]
tracing::debug!(
"[WS] User {} dropped prop {} at ({}, {})",
user_id,
prop.id,
pos_x,
pos_y
);
let _ = tx.send(ServerMessage::PropDropped { prop });
}
Err(e) => {
tracing::error!("[WS] Drop prop failed: {:?}", e);
let (code, message) = match &e {
chattyness_error::AppError::Forbidden(msg) => {
("PROP_NOT_DROPPABLE".to_string(), msg.clone())
}
chattyness_error::AppError::NotFound(msg) => {
("PROP_NOT_FOUND".to_string(), msg.clone())
}
_ => ("DROP_FAILED".to_string(), format!("{:?}", e)),
};
let _ = tx.send(ServerMessage::Error { code, message });
}
}
}
}
ClientMessage::PickUpProp { loose_prop_id } => {
match loose_props::pick_up_loose_prop(
&mut *recv_conn,
loose_prop_id,
user_id,
)
.await
{
Ok(_inventory_item) => {
#[cfg(debug_assertions)]
tracing::debug!(
"[WS] User {} picked up prop {}",
user_id,
loose_prop_id
);
let _ = tx.send(ServerMessage::PropPickedUp {
prop_id: loose_prop_id,
picked_up_by_user_id: Some(user_id),
picked_up_by_guest_id: None,
});
}
Err(e) => {
tracing::error!("[WS] Pick up prop failed: {:?}", e);
let _ = tx.send(ServerMessage::Error {
code: "PICKUP_FAILED".to_string(),
message: format!("{:?}", e),
});
}
}
}
ClientMessage::SyncAvatar => {
// Fetch current avatar from database and broadcast to channel
match avatars::get_avatar_with_paths_conn(
&mut *recv_conn,
user_id,
realm_id,
)
.await
{
Ok(Some(avatar_with_paths)) => {
let render_data = avatar_with_paths.to_render_data();
#[cfg(debug_assertions)]
tracing::debug!(
"[WS] User {} syncing avatar to channel",
user_id
);
let _ = tx.send(ServerMessage::AvatarUpdated {
user_id: Some(user_id),
guest_session_id: None,
avatar: render_data,
});
}
Ok(None) => {
#[cfg(debug_assertions)]
tracing::warn!("[WS] No avatar found for user {} to sync", user_id);
}
Err(e) => {
tracing::error!("[WS] Avatar sync failed: {:?}", e);
}
}
}
}
}
}
ClientMessage::PickUpProp { loose_prop_id } => {
match loose_props::pick_up_loose_prop(
&mut *recv_conn,
loose_prop_id,
user_id,
)
.await
{
Ok(_inventory_item) => {
#[cfg(debug_assertions)]
tracing::debug!(
"[WS] User {} picked up prop {}",
user_id,
loose_prop_id
);
let _ = tx.send(ServerMessage::PropPickedUp {
prop_id: loose_prop_id,
picked_up_by_user_id: Some(user_id),
picked_up_by_guest_id: None,
});
}
Err(e) => {
tracing::error!("[WS] Pick up prop failed: {:?}", e);
let _ = tx.send(ServerMessage::Error {
code: "PICKUP_FAILED".to_string(),
message: format!("{:?}", e),
});
Message::Close(close_frame) => {
// Check close code for scene change
if let Some(CloseFrame { code, .. }) = close_frame {
if code == SCENE_CHANGE_CLOSE_CODE {
disconnect_reason = DisconnectReason::SceneChange;
} else {
disconnect_reason = DisconnectReason::Graceful;
}
}
break;
}
}
ClientMessage::SyncAvatar => {
// Fetch current avatar from database and broadcast to channel
match avatars::get_avatar_with_paths_conn(
&mut *recv_conn,
user_id,
realm_id,
)
.await
{
Ok(Some(avatar_with_paths)) => {
let render_data = avatar_with_paths.to_render_data();
#[cfg(debug_assertions)]
tracing::debug!(
"[WS] User {} syncing avatar to channel",
user_id
);
let _ = tx.send(ServerMessage::AvatarUpdated {
user_id: Some(user_id),
guest_session_id: None,
avatar: render_data,
});
}
Ok(None) => {
#[cfg(debug_assertions)]
tracing::warn!("[WS] No avatar found for user {} to sync", user_id);
}
Err(e) => {
tracing::error!("[WS] Avatar sync failed: {:?}", e);
}
_ => {
// Ignore binary, ping, pong messages
}
}
}
}
}
// Return the connection so we can use it for cleanup
recv_conn
});
// Spawn task to forward broadcasts to this client
let send_task = tokio::spawn(async move {
while let Ok(msg) = rx.recv().await {
if let Ok(json) = serde_json::to_string(&msg) {
#[cfg(debug_assertions)]
tracing::debug!("[WS->Client] {}", json);
if sender.send(Message::Text(json.into())).await.is_err() {
Ok(Some(Err(e))) => {
// WebSocket error
tracing::warn!("[WS] Connection error: {:?}", e);
disconnect_reason = DisconnectReason::Timeout;
break;
}
Ok(None) => {
// Stream ended gracefully
break;
}
Err(_) => {
// Timeout elapsed - connection likely lost
tracing::info!("[WS] Connection timeout for user {}", user_id);
disconnect_reason = DisconnectReason::Timeout;
break;
}
}
}
// Return the connection and disconnect reason for cleanup
(recv_conn, disconnect_reason)
});
// Wait for either task to complete
tokio::select! {
// Spawn task to forward broadcasts and direct messages to this client
let send_task = tokio::spawn(async move {
loop {
tokio::select! {
// Handle broadcast messages
Ok(msg) = rx.recv() => {
if let Ok(json) = serde_json::to_string(&msg) {
#[cfg(debug_assertions)]
tracing::debug!("[WS->Client] {}", json);
if sender.send(Message::Text(json.into())).await.is_err() {
break;
}
}
}
// Handle direct messages (Pong)
Some(msg) = direct_rx.recv() => {
if let Ok(json) = serde_json::to_string(&msg) {
#[cfg(debug_assertions)]
tracing::debug!("[WS->Client] {}", json);
if sender.send(Message::Text(json.into())).await.is_err() {
break;
}
}
}
else => break
}
}
});
// Wait for either task to complete, track disconnect reason
let disconnect_reason = tokio::select! {
recv_result = recv_task => {
// recv_task finished, get connection back for cleanup
if let Ok(mut cleanup_conn) = recv_result {
// recv_task finished, get connection and reason back for cleanup
if let Ok((mut cleanup_conn, reason)) = recv_result {
let _ = channel_members::leave_channel(&mut *cleanup_conn, channel_id, user_id).await;
reason
} else {
// Task panicked, use pool (RLS may fail but try anyway)
let _ = channel_members::leave_channel(&pool, channel_id, user_id).await;
DisconnectReason::Timeout
}
}
_ = send_task => {
// send_task finished first, need to acquire a new connection for cleanup
// send_task finished first (likely client disconnect), need to acquire a new connection for cleanup
if let Ok(mut cleanup_conn) = pool.acquire().await {
let _ = set_rls_user_id(&mut cleanup_conn, user_id).await;
let _ = channel_members::leave_channel(&mut *cleanup_conn, channel_id, user_id).await;
}
DisconnectReason::Graceful
}
}
};
tracing::info!(
"[WS] User {} disconnected from channel {}",
"[WS] User {} disconnected from channel {} (reason: {:?})",
user_id,
channel_id
channel_id,
disconnect_reason
);
// Broadcast departure
// Broadcast departure with reason
let _ = channel_state.tx.send(ServerMessage::MemberLeft {
user_id: Some(user_id),
guest_session_id: None,
reason: disconnect_reason,
});
}