diff options
| author | Sophie Forrest <git@sophieforrest.com> | 2024-08-30 23:13:20 +1200 |
|---|---|---|
| committer | Sophie Forrest <git@sophieforrest.com> | 2024-08-30 23:13:44 +1200 |
| commit | e3cb82a3b33bd2a2e49c58ce18d1258fb505869e (patch) | |
| tree | 2375279182fb4f90f5c28560a08cda90591f608b /crates/messenger_server/src/session.rs | |
Diffstat (limited to '')
| -rw-r--r-- | crates/messenger_server/src/session.rs | 193 |
1 files changed, 193 insertions, 0 deletions
diff --git a/crates/messenger_server/src/session.rs b/crates/messenger_server/src/session.rs new file mode 100644 index 0000000..1b15d5a --- /dev/null +++ b/crates/messenger_server/src/session.rs @@ -0,0 +1,193 @@ +//! Code for running websocket related functions on the web server. +//! +//! This includes the messaging system as a whole. + +use std::sync::Arc; + +use axum::extract::ws::{Message, WebSocket}; +use futures::{ + stream::{SplitSink, SplitStream}, + SinkExt, StreamExt, +}; +use messenger_common::{ + client::MessageType as ClientMessageType, + server::{MessageType, UserMessage}, +}; +use thiserror::Error; +use tokio::sync::{broadcast::Receiver, Mutex}; + +use crate::{ + app::State as AppState, + message::{self, deserialize, Server}, +}; + +/// Represents an error that can occur during a session. +#[derive(Debug, Error)] +pub enum Error { + /// An error with a Axum + #[error("an error occurred while attempting to interact with Axum")] + Axum(#[from] axum::Error), + + /// An error with serde_json + #[error("an error occurred while ser/deserializing data with serde_json")] + SerdeJson(#[from] serde_json::Error), +} + +/// Represents a singular session for a user. Handles sending and receiving +/// messages for this session. +#[derive(Debug)] +pub struct Session { + /// Receiving component of the WebSocket split. + receiver: Mutex<SplitStream<WebSocket>>, + + /// Sending component of the WebSocket split. + sender: Mutex<SplitSink<WebSocket, Message>>, + + /// Receiver from the apps state. + state_rx: Mutex<Receiver<String>>, + + /// The username associated with this session. + username: String, +} + +impl Session { + /// Constructs a new instance of [`Session`]. + #[must_use] + pub fn new( + receiver: SplitStream<WebSocket>, + sender: SplitSink<WebSocket, Message>, + state_rx: Receiver<String>, + username: String, + ) -> Self { + Self { + receiver: Mutex::new(receiver), + sender: Mutex::new(sender), + state_rx: Mutex::new(state_rx), + username, + } + } + + /// Feeds the last 100 messages to a singular sender, differentiated by the + /// `WebSocket` sender. + pub async fn feed_message_history(&self, state: &Arc<crate::app::State>) { + // Iterate through the message history and feed it to the sender + for message in &*state.message_history.lock().await { + if self + .sender + .lock() + .await + .feed(Message::Text(message.clone())) + .await + .is_err() + { + break; + } + } + } + + /// Feeds a list of online users to a singular sender, differentiated by the + /// `WebSocket` sender. + pub async fn feed_online_users(&self, state: &Arc<crate::app::State>) -> Result<(), Error> { + self.sender + .lock() + .await + .feed(Message::Text(serde_json::to_string( + &MessageType::OnlineUsers(state.user_set.lock().await.clone()), + )?)) + .await?; + + Ok(()) + } + + /// Receives messages for this session. For use inside a tokio select + /// receive task. + pub async fn receive(&self, state: &Arc<AppState>) { + while let Some(Ok(Message::Text(text))) = self.receiver.lock().await.next().await { + let result = match deserialize(&text) { + Ok(ClientMessageType::UserMessage(content)) => { + // Handle the possibility of a timestamp being unable to be created when a + // message is bring processed + let Ok(timestamp) = time::OffsetDateTime::now_local() else { + tracing::error!("could not create an OffsetDateTime for received message"); + + let mut lock = self.sender.lock().await; + + // Report to the client that their message could not be processed + message::send_error( + &mut lock, + messenger_common::server::Error::CannotProcess, + ) + .await; + + // Drop the lock early + drop(lock); + + // Skip to the next message. + continue; + }; + + let message: MessageType = + UserMessage::new(content, self.username.clone(), timestamp).into(); + + Some(message) + } + Ok(_) | Err(_) => { + let mut lock = self.sender.lock().await; + + // Messages outside of type UserMessage are unexpected, and should be + // reported + message::send_error(&mut lock, messenger_common::server::Error::InvalidMessage) + .await; + + drop(lock); + + None + } + }; + + if let Some(message) = result { + message.send(&state.tx); + if let Err(error) = message.append_to_history(state).await { + tracing::error!( + "error encountered when appending a message to history: {error}" + ); + } + } + } + } + + /// Processes a send operation for this session. + pub async fn send(&self) { + let mut lock = self.state_rx.lock().await; + + while let Ok(msg) = lock.recv().await { + // In any websocket error, break loop. + if self + .sender + .lock() + .await + .send(Message::Text(msg)) + .await + .is_err() + { + break; + } + } + } + + /// Transmits a copy of the last 100 messages and a list of online users to + /// the user. + pub async fn transmit_initial_data(&self, state: &Arc<crate::app::State>) -> Result<(), Error> { + self.feed_message_history(state).await; + self.feed_online_users(state).await?; + + self.sender.lock().await.flush().await?; + + Ok(()) + } + + /// Retrieves a copy to the username associated with this session. + pub const fn username(&self) -> &String { + &self.username + } +} |