feat: private messages.

This commit is contained in:
Evan Carroll 2026-01-18 15:28:13 -06:00
parent 0492043625
commit 22cc0fdc38
11 changed files with 1135 additions and 44 deletions

View file

@ -36,10 +36,25 @@ pub struct ChannelState {
tx: broadcast::Sender<ServerMessage>,
}
/// Connection info for a connected user.
#[derive(Clone)]
pub struct UserConnection {
/// Direct message sender for this user.
pub direct_tx: mpsc::Sender<ServerMessage>,
/// Realm the user is in.
pub realm_id: Uuid,
/// Channel (scene) the user is in.
pub channel_id: Uuid,
/// User's display name.
pub display_name: String,
}
/// Global state for all WebSocket connections.
pub struct WebSocketState {
/// Map of channel_id -> ChannelState.
channels: DashMap<Uuid, Arc<ChannelState>>,
/// Map of user_id -> UserConnection for direct message routing.
users: DashMap<Uuid, UserConnection>,
}
impl Default for WebSocketState {
@ -53,6 +68,7 @@ impl WebSocketState {
pub fn new() -> Self {
Self {
channels: DashMap::new(),
users: DashMap::new(),
}
}
@ -66,6 +82,47 @@ impl WebSocketState {
})
.clone()
}
/// Register a user connection for direct messaging.
pub fn register_user(
&self,
user_id: Uuid,
direct_tx: mpsc::Sender<ServerMessage>,
realm_id: Uuid,
channel_id: Uuid,
display_name: String,
) {
self.users.insert(
user_id,
UserConnection {
direct_tx,
realm_id,
channel_id,
display_name,
},
);
}
/// Unregister a user connection.
pub fn unregister_user(&self, user_id: Uuid) {
self.users.remove(&user_id);
}
/// Find a user by display name within a realm.
pub fn find_user_by_display_name(&self, realm_id: Uuid, display_name: &str) -> Option<(Uuid, UserConnection)> {
for entry in self.users.iter() {
let (user_id, conn) = entry.pair();
if conn.realm_id == realm_id && conn.display_name.eq_ignore_ascii_case(display_name) {
return Some((*user_id, conn.clone()));
}
}
None
}
/// Get a user's connection info.
pub fn get_user(&self, user_id: Uuid) -> Option<UserConnection> {
self.users.get(&user_id).map(|r| r.clone())
}
}
/// WebSocket upgrade handler.
@ -260,6 +317,9 @@ async fn handle_socket(
}
}
// Save member display_name for user registration (before member is moved)
let member_display_name = member.display_name.clone();
// Broadcast join to others
let avatar = avatars::get_avatar_with_paths_conn(&mut *conn, user.id, realm_id)
.await
@ -295,14 +355,27 @@ async fn handle_socket(
// and pool for cleanup (leave_channel needs user_id match anyway)
drop(conn);
// Channel for sending direct messages (Pong) to client
// Channel for sending direct messages (Pong, whispers) to client
let (direct_tx, mut direct_rx) = mpsc::channel::<ServerMessage>(16);
// Register user for direct message routing
ws_state.register_user(
user_id,
direct_tx.clone(),
realm_id,
channel_id,
member_display_name,
);
// Clone ws_state for use in recv_task
let ws_state_for_recv = ws_state.clone();
// Create recv timeout from config
let recv_timeout = Duration::from_secs(ws_config.recv_timeout_secs);
// Spawn task to handle incoming messages from client
let recv_task = tokio::spawn(async move {
let ws_state = ws_state_for_recv;
let mut disconnect_reason = DisconnectReason::Graceful;
loop {
@ -375,7 +448,7 @@ async fn handle_socket(
// Respond with pong directly (not broadcast)
let _ = direct_tx.send(ServerMessage::Pong).await;
}
ClientMessage::SendChatMessage { content } => {
ClientMessage::SendChatMessage { content, target_display_name } => {
// Validate message
if content.is_empty() || content.len() > 500 {
continue;
@ -395,18 +468,81 @@ async fn handle_socket(
let emotion_name = EmotionState::from_index(member.current_emotion as u8)
.map(|e| e.to_string())
.unwrap_or_else(|| "neutral".to_string());
let msg = ServerMessage::ChatMessageReceived {
message_id: Uuid::new_v4(),
user_id: Some(user_id),
guest_session_id: None,
display_name: member.display_name.clone(),
content,
emotion: emotion_name,
x: member.position_x,
y: member.position_y,
timestamp: chrono::Utc::now().timestamp_millis(),
};
let _ = tx.send(msg);
// Handle whisper (direct message) vs broadcast
if let Some(target_name) = target_display_name {
// Whisper: send directly to target user
if let Some((_target_user_id, target_conn)) =
ws_state.find_user_by_display_name(realm_id, &target_name)
{
// Determine if same scene
let is_same_scene = target_conn.channel_id == channel_id;
let msg = ServerMessage::ChatMessageReceived {
message_id: Uuid::new_v4(),
user_id: Some(user_id),
guest_session_id: None,
display_name: member.display_name.clone(),
content: content.clone(),
emotion: emotion_name.clone(),
x: member.position_x,
y: member.position_y,
timestamp: chrono::Utc::now().timestamp_millis(),
is_whisper: true,
is_same_scene,
};
// Send to target user
let _ = target_conn.direct_tx.send(msg.clone()).await;
// Also send back to sender (so they see their own whisper)
// For sender, is_same_scene is always true (they see it as a bubble)
let sender_msg = ServerMessage::ChatMessageReceived {
message_id: Uuid::new_v4(),
user_id: Some(user_id),
guest_session_id: None,
display_name: member.display_name.clone(),
content,
emotion: emotion_name,
x: member.position_x,
y: member.position_y,
timestamp: chrono::Utc::now().timestamp_millis(),
is_whisper: true,
is_same_scene: true, // Sender always sees as bubble
};
let _ = direct_tx.send(sender_msg).await;
#[cfg(debug_assertions)]
tracing::debug!(
"[WS] Whisper from {} to {} (same_scene={})",
member.display_name,
target_name,
is_same_scene
);
} else {
// Target user not found - send error
let _ = direct_tx.send(ServerMessage::Error {
code: "WHISPER_TARGET_NOT_FOUND".to_string(),
message: format!("User '{}' is not online or not in this realm", target_name),
}).await;
}
} else {
// Broadcast: send to all users in the channel
let msg = ServerMessage::ChatMessageReceived {
message_id: Uuid::new_v4(),
user_id: Some(user_id),
guest_session_id: None,
display_name: member.display_name.clone(),
content,
emotion: emotion_name,
x: member.position_x,
y: member.position_y,
timestamp: chrono::Utc::now().timestamp_millis(),
is_whisper: false,
is_same_scene: true,
};
let _ = tx.send(msg);
}
}
}
ClientMessage::DropProp { inventory_item_id } => {
@ -616,6 +752,9 @@ async fn handle_socket(
}
};
// Unregister user from direct message routing
ws_state.unregister_user(user_id);
tracing::info!(
"[WS] User {} disconnected from channel {} (reason: {:?})",
user_id,