add initial crates and apps

This commit is contained in:
Evan Carroll 2026-01-12 15:34:40 -06:00
parent 5c87ba3519
commit 1ca300098f
113 changed files with 28169 additions and 0 deletions

View file

@ -0,0 +1,399 @@
//! WebSocket handler for channel presence.
//!
//! Handles real-time position updates, emotion changes, and member synchronization.
use axum::{
extract::{
ws::{Message, WebSocket, WebSocketUpgrade},
FromRef, Path, State,
},
response::IntoResponse,
};
use dashmap::DashMap;
use futures::{SinkExt, StreamExt};
use sqlx::PgPool;
use std::sync::Arc;
use tokio::sync::broadcast;
use uuid::Uuid;
use chattyness_db::{
models::{AvatarRenderData, ChannelMemberWithAvatar, User},
queries::{avatars, channel_members, realms, scenes},
ws_messages::{ClientMessage, ServerMessage},
};
use chattyness_error::AppError;
use crate::auth::AuthUser;
/// Channel state for broadcasting updates.
pub struct ChannelState {
/// Broadcast sender for this channel.
tx: broadcast::Sender<ServerMessage>,
}
/// Global state for all WebSocket connections.
pub struct WebSocketState {
/// Map of channel_id -> ChannelState.
channels: DashMap<Uuid, Arc<ChannelState>>,
}
impl Default for WebSocketState {
fn default() -> Self {
Self::new()
}
}
impl WebSocketState {
/// Create a new WebSocket state.
pub fn new() -> Self {
Self {
channels: DashMap::new(),
}
}
/// Get or create a channel state.
fn get_or_create_channel(&self, channel_id: Uuid) -> Arc<ChannelState> {
self.channels
.entry(channel_id)
.or_insert_with(|| {
let (tx, _) = broadcast::channel(256);
Arc::new(ChannelState { tx })
})
.clone()
}
}
/// WebSocket upgrade handler.
///
/// GET /api/realms/{slug}/channels/{channel_id}/ws
pub async fn ws_handler<S>(
Path((slug, channel_id)): Path<(String, Uuid)>,
auth_result: Result<AuthUser, crate::auth::AuthError>,
State(pool): State<PgPool>,
State(ws_state): State<Arc<WebSocketState>>,
ws: WebSocketUpgrade,
) -> Result<impl IntoResponse, AppError>
where
S: Send + Sync,
PgPool: FromRef<S>,
Arc<WebSocketState>: FromRef<S>,
{
// Log auth result before checking
#[cfg(debug_assertions)]
tracing::debug!(
"[WS] Connection attempt to {}/channels/{} - auth: {:?}",
slug,
channel_id,
auth_result.as_ref().map(|a| a.0.id).map_err(|e| format!("{:?}", e))
);
let AuthUser(user) = auth_result.map_err(|e| {
tracing::warn!("[WS] Auth failed for {}/channels/{}: {:?}", slug, channel_id, e);
AppError::from(e)
})?;
// Verify realm exists
let realm = realms::get_realm_by_slug(&pool, &slug)
.await?
.ok_or_else(|| AppError::NotFound(format!("Realm '{}' not found", slug)))?;
// Verify channel (scene) exists and belongs to this realm
let scene = scenes::get_scene_by_id(&pool, channel_id)
.await?
.ok_or_else(|| AppError::NotFound("Channel not found".to_string()))?;
if scene.realm_id != realm.id {
return Err(AppError::NotFound(
"Channel not found in this realm".to_string(),
));
}
#[cfg(debug_assertions)]
tracing::debug!(
"[WS] Upgrading connection for user {} to channel {}",
user.id,
channel_id
);
Ok(ws.on_upgrade(move |socket| {
handle_socket(socket, user, channel_id, realm.id, pool, ws_state)
}))
}
/// Set RLS context on a database connection.
async fn set_rls_user_id(
conn: &mut sqlx::pool::PoolConnection<sqlx::Postgres>,
user_id: Uuid,
) -> Result<(), sqlx::Error> {
sqlx::query("SELECT public.set_current_user_id($1)")
.bind(user_id)
.execute(&mut **conn)
.await?;
Ok(())
}
/// Handle an active WebSocket connection.
async fn handle_socket(
socket: WebSocket,
user: User,
channel_id: Uuid,
realm_id: Uuid,
pool: PgPool,
ws_state: Arc<WebSocketState>,
) {
tracing::info!(
"[WS] handle_socket started for user {} channel {} realm {}",
user.id,
channel_id,
realm_id
);
// Acquire a dedicated connection for setup operations
let mut conn = match pool.acquire().await {
Ok(conn) => conn,
Err(e) => {
tracing::error!("[WS] Failed to acquire DB connection: {:?}", e);
return;
}
};
// Set RLS context on this dedicated connection
if let Err(e) = set_rls_user_id(&mut conn, user.id).await {
tracing::error!("[WS] Failed to set RLS context for user {}: {:?}", user.id, e);
return;
}
tracing::info!("[WS] RLS context set on dedicated connection");
let channel_state = ws_state.get_or_create_channel(channel_id);
let mut rx = channel_state.tx.subscribe();
let (mut sender, mut receiver) = socket.split();
// Ensure active avatar
tracing::info!("[WS] Ensuring active avatar...");
if let Err(e) = channel_members::ensure_active_avatar(&mut *conn, user.id, realm_id).await {
tracing::error!("[WS] Failed to ensure avatar for user {}: {:?}", user.id, e);
return;
}
tracing::info!("[WS] Avatar ensured");
// Join the channel
tracing::info!("[WS] Joining channel...");
if let Err(e) = channel_members::join_channel(&mut *conn, channel_id, user.id).await {
tracing::error!(
"[WS] Failed to join channel {} for user {}: {:?}",
channel_id,
user.id,
e
);
return;
}
tracing::info!("[WS] Channel joined");
// Get initial state
let members = match get_members_with_avatars(&mut *conn, channel_id, realm_id).await {
Ok(m) => m,
Err(e) => {
tracing::error!("[WS] Failed to get members: {:?}", e);
let _ = channel_members::leave_channel(&mut *conn, channel_id, user.id).await;
return;
}
};
let member = match channel_members::get_channel_member(&mut *conn, channel_id, user.id, realm_id)
.await
{
Ok(Some(m)) => m,
Ok(None) => {
tracing::error!("[WS] Failed to get member info for user {}", user.id);
let _ = channel_members::leave_channel(&mut *conn, channel_id, user.id).await;
return;
}
Err(e) => {
tracing::error!("[WS] Error getting member info: {:?}", e);
let _ = channel_members::leave_channel(&mut *conn, channel_id, user.id).await;
return;
}
};
// Send welcome message
let welcome = ServerMessage::Welcome {
member: member.clone(),
members,
};
if let Ok(json) = serde_json::to_string(&welcome) {
#[cfg(debug_assertions)]
tracing::debug!("[WS->Client] {}", json);
if sender.send(Message::Text(json.into())).await.is_err() {
let _ = channel_members::leave_channel(&mut *conn, channel_id, user.id).await;
return;
}
}
// Broadcast join to others
let avatar = avatars::get_avatar_render_data(&mut *conn, user.id, realm_id)
.await
.unwrap_or_default();
let join_msg = ServerMessage::MemberJoined {
member: ChannelMemberWithAvatar { member, avatar },
};
let _ = channel_state.tx.send(join_msg);
let user_id = user.id;
let tx = channel_state.tx.clone();
// Acquire a second dedicated connection for the receive task
// This connection needs its own RLS context
let mut recv_conn = match pool.acquire().await {
Ok(c) => c,
Err(e) => {
tracing::error!("[WS] Failed to acquire recv connection: {:?}", e);
let _ = channel_members::leave_channel(&mut *conn, channel_id, user_id).await;
return;
}
};
if let Err(e) = set_rls_user_id(&mut recv_conn, user_id).await {
tracing::error!("[WS] Failed to set RLS on recv connection: {:?}", e);
let _ = channel_members::leave_channel(&mut *conn, channel_id, user_id).await;
return;
}
// Drop the setup connection - we'll use recv_conn for the receive task
// and pool for cleanup (which will use the same RLS context issue, but leave_channel
// needs user_id match anyway)
drop(conn);
// Spawn task to handle incoming messages from client
let recv_task = tokio::spawn(async move {
while let Some(Ok(msg)) = receiver.next().await {
if let Message::Text(text) = msg {
#[cfg(debug_assertions)]
tracing::debug!("[WS<-Client] {}", text);
let Ok(client_msg) = serde_json::from_str::<ClientMessage>(&text) else {
continue;
};
match client_msg {
ClientMessage::UpdatePosition { x, y } => {
if let Err(e) =
channel_members::update_position(&mut *recv_conn, channel_id, user_id, x, y)
.await
{
#[cfg(debug_assertions)]
tracing::error!("[WS] Position update failed: {:?}", e);
continue;
}
let _ = tx.send(ServerMessage::PositionUpdated {
user_id: Some(user_id),
guest_session_id: None,
x,
y,
});
}
ClientMessage::UpdateEmotion { emotion } => {
if emotion > 9 {
continue;
}
let emotion_layer = match avatars::set_emotion(
&mut *recv_conn,
user_id,
realm_id,
emotion as i16,
)
.await
{
Ok(layer) => layer,
Err(e) => {
#[cfg(debug_assertions)]
tracing::error!("[WS] Emotion update failed: {:?}", e);
continue;
}
};
let _ = tx.send(ServerMessage::EmotionUpdated {
user_id: Some(user_id),
guest_session_id: None,
emotion,
emotion_layer,
});
}
ClientMessage::Ping => {
// Respond with pong directly (not broadcast)
// This is handled in the send task via individual message
}
}
}
}
// Return the connection so we can use it for cleanup
recv_conn
});
// Spawn task to forward broadcasts to this client
let send_task = tokio::spawn(async move {
while let Ok(msg) = rx.recv().await {
if let Ok(json) = serde_json::to_string(&msg) {
#[cfg(debug_assertions)]
tracing::debug!("[WS->Client] {}", json);
if sender.send(Message::Text(json.into())).await.is_err() {
break;
}
}
}
});
// Wait for either task to complete
tokio::select! {
recv_result = recv_task => {
// recv_task finished, get connection back for cleanup
if let Ok(mut cleanup_conn) = recv_result {
let _ = channel_members::leave_channel(&mut *cleanup_conn, channel_id, user_id).await;
} else {
// Task panicked, use pool (RLS may fail but try anyway)
let _ = channel_members::leave_channel(&pool, channel_id, user_id).await;
}
}
_ = send_task => {
// send_task finished first, need to acquire a new connection for cleanup
if let Ok(mut cleanup_conn) = pool.acquire().await {
let _ = set_rls_user_id(&mut cleanup_conn, user_id).await;
let _ = channel_members::leave_channel(&mut *cleanup_conn, channel_id, user_id).await;
}
}
}
tracing::info!(
"[WS] User {} disconnected from channel {}",
user_id,
channel_id
);
// Broadcast departure
let _ = channel_state.tx.send(ServerMessage::MemberLeft {
user_id: Some(user_id),
guest_session_id: None,
});
}
/// Helper: Get all channel members with their avatar render data.
async fn get_members_with_avatars<'e>(
executor: impl sqlx::PgExecutor<'e>,
channel_id: Uuid,
realm_id: Uuid,
) -> Result<Vec<ChannelMemberWithAvatar>, AppError> {
// Get members first, then we need to get avatars
// But executor is consumed by the first query, so we need the pool
// Actually, let's just inline this to avoid the complexity
let members = channel_members::get_channel_members(executor, channel_id, realm_id).await?;
// For avatar data, we'll just return default for now since the query
// would need another executor
let result: Vec<ChannelMemberWithAvatar> = members
.into_iter()
.map(|member| ChannelMemberWithAvatar {
member,
avatar: AvatarRenderData::default(),
})
.collect();
Ok(result)
}