//! Code for running websocket related functions on the web server. //! //! This includes the messaging system as a whole. use std::sync::Arc; use axum::extract::ws::{Message, WebSocket}; use futures::{ stream::{SplitSink, SplitStream}, SinkExt, StreamExt, }; use messenger_common::{ client::MessageType as ClientMessageType, server::{MessageType, UserMessage}, }; use thiserror::Error; use tokio::sync::{broadcast::Receiver, Mutex}; use crate::{ app::State as AppState, message::{self, deserialize, Server}, }; /// Represents an error that can occur during a session. #[derive(Debug, Error)] pub enum Error { /// An error with a Axum #[error("an error occurred while attempting to interact with Axum")] Axum(#[from] axum::Error), /// An error with serde_json #[error("an error occurred while ser/deserializing data with serde_json")] SerdeJson(#[from] serde_json::Error), } /// Represents a singular session for a user. Handles sending and receiving /// messages for this session. #[derive(Debug)] pub struct Session { /// Receiving component of the WebSocket split. receiver: Mutex>, /// Sending component of the WebSocket split. sender: Mutex>, /// Receiver from the apps state. state_rx: Mutex>, /// The username associated with this session. username: String, } impl Session { /// Constructs a new instance of [`Session`]. #[must_use] pub fn new( receiver: SplitStream, sender: SplitSink, state_rx: Receiver, username: String, ) -> Self { Self { receiver: Mutex::new(receiver), sender: Mutex::new(sender), state_rx: Mutex::new(state_rx), username, } } /// Feeds the last 100 messages to a singular sender, differentiated by the /// `WebSocket` sender. pub async fn feed_message_history(&self, state: &Arc) { // Iterate through the message history and feed it to the sender for message in &*state.message_history.lock().await { if self .sender .lock() .await .feed(Message::Text(message.clone())) .await .is_err() { break; } } } /// Feeds a list of online users to a singular sender, differentiated by the /// `WebSocket` sender. pub async fn feed_online_users(&self, state: &Arc) -> Result<(), Error> { self.sender .lock() .await .feed(Message::Text(serde_json::to_string( &MessageType::OnlineUsers(state.user_set.lock().await.clone()), )?)) .await?; Ok(()) } /// Receives messages for this session. For use inside a tokio select /// receive task. pub async fn receive(&self, state: &Arc) { while let Some(Ok(Message::Text(text))) = self.receiver.lock().await.next().await { let result = match deserialize(&text) { Ok(ClientMessageType::UserMessage(content)) => { // Handle the possibility of a timestamp being unable to be created when a // message is bring processed let Ok(timestamp) = time::OffsetDateTime::now_local() else { tracing::error!("could not create an OffsetDateTime for received message"); let mut lock = self.sender.lock().await; // Report to the client that their message could not be processed message::send_error( &mut lock, messenger_common::server::Error::CannotProcess, ) .await; // Drop the lock early drop(lock); // Skip to the next message. continue; }; let message: MessageType = UserMessage::new(content, self.username.clone(), timestamp).into(); Some(message) } Ok(_) | Err(_) => { let mut lock = self.sender.lock().await; // Messages outside of type UserMessage are unexpected, and should be // reported message::send_error(&mut lock, messenger_common::server::Error::InvalidMessage) .await; drop(lock); None } }; if let Some(message) = result { message.send(&state.tx); if let Err(error) = message.append_to_history(state).await { tracing::error!( "error encountered when appending a message to history: {error}" ); } } } } /// Processes a send operation for this session. pub async fn send(&self) { let mut lock = self.state_rx.lock().await; while let Ok(msg) = lock.recv().await { // In any websocket error, break loop. if self .sender .lock() .await .send(Message::Text(msg)) .await .is_err() { break; } } } /// Transmits a copy of the last 100 messages and a list of online users to /// the user. pub async fn transmit_initial_data(&self, state: &Arc) -> Result<(), Error> { self.feed_message_history(state).await; self.feed_online_users(state).await?; self.sender.lock().await.flush().await?; Ok(()) } /// Retrieves a copy to the username associated with this session. pub const fn username(&self) -> &String { &self.username } }