summary refs log tree commit diff
path: root/crates/messenger_server/src/session.rs
diff options
context:
space:
mode:
authorSophie Forrest <git@sophieforrest.com>2024-08-30 23:13:20 +1200
committerSophie Forrest <git@sophieforrest.com>2024-08-30 23:13:44 +1200
commite3cb82a3b33bd2a2e49c58ce18d1258fb505869e (patch)
tree2375279182fb4f90f5c28560a08cda90591f608b /crates/messenger_server/src/session.rs
chore: initial commit (codeberg upload) HEAD main
Diffstat (limited to 'crates/messenger_server/src/session.rs')
-rw-r--r--crates/messenger_server/src/session.rs193
1 files changed, 193 insertions, 0 deletions
diff --git a/crates/messenger_server/src/session.rs b/crates/messenger_server/src/session.rs
new file mode 100644
index 0000000..1b15d5a
--- /dev/null
+++ b/crates/messenger_server/src/session.rs
@@ -0,0 +1,193 @@
+//! Code for running websocket related functions on the web server.
+//!
+//! This includes the messaging system as a whole.
+
+use std::sync::Arc;
+
+use axum::extract::ws::{Message, WebSocket};
+use futures::{
+	stream::{SplitSink, SplitStream},
+	SinkExt, StreamExt,
+};
+use messenger_common::{
+	client::MessageType as ClientMessageType,
+	server::{MessageType, UserMessage},
+};
+use thiserror::Error;
+use tokio::sync::{broadcast::Receiver, Mutex};
+
+use crate::{
+	app::State as AppState,
+	message::{self, deserialize, Server},
+};
+
+/// Represents an error that can occur during a session.
+#[derive(Debug, Error)]
+pub enum Error {
+	/// An error with a Axum
+	#[error("an error occurred while attempting to interact with Axum")]
+	Axum(#[from] axum::Error),
+
+	/// An error with serde_json
+	#[error("an error occurred while ser/deserializing data with serde_json")]
+	SerdeJson(#[from] serde_json::Error),
+}
+
+/// Represents a singular session for a user. Handles sending and receiving
+/// messages for this session.
+#[derive(Debug)]
+pub struct Session {
+	/// Receiving component of the WebSocket split.
+	receiver: Mutex<SplitStream<WebSocket>>,
+
+	/// Sending component of the WebSocket split.
+	sender: Mutex<SplitSink<WebSocket, Message>>,
+
+	/// Receiver from the apps state.
+	state_rx: Mutex<Receiver<String>>,
+
+	/// The username associated with this session.
+	username: String,
+}
+
+impl Session {
+	/// Constructs a new instance of [`Session`].
+	#[must_use]
+	pub fn new(
+		receiver: SplitStream<WebSocket>,
+		sender: SplitSink<WebSocket, Message>,
+		state_rx: Receiver<String>,
+		username: String,
+	) -> Self {
+		Self {
+			receiver: Mutex::new(receiver),
+			sender: Mutex::new(sender),
+			state_rx: Mutex::new(state_rx),
+			username,
+		}
+	}
+
+	/// Feeds the last 100 messages to a singular sender, differentiated by the
+	/// `WebSocket` sender.
+	pub async fn feed_message_history(&self, state: &Arc<crate::app::State>) {
+		// Iterate through the message history and feed it to the sender
+		for message in &*state.message_history.lock().await {
+			if self
+				.sender
+				.lock()
+				.await
+				.feed(Message::Text(message.clone()))
+				.await
+				.is_err()
+			{
+				break;
+			}
+		}
+	}
+
+	/// Feeds a list of online users to a singular sender, differentiated by the
+	/// `WebSocket` sender.
+	pub async fn feed_online_users(&self, state: &Arc<crate::app::State>) -> Result<(), Error> {
+		self.sender
+			.lock()
+			.await
+			.feed(Message::Text(serde_json::to_string(
+				&MessageType::OnlineUsers(state.user_set.lock().await.clone()),
+			)?))
+			.await?;
+
+		Ok(())
+	}
+
+	/// Receives messages for this session. For use inside a tokio select
+	/// receive task.
+	pub async fn receive(&self, state: &Arc<AppState>) {
+		while let Some(Ok(Message::Text(text))) = self.receiver.lock().await.next().await {
+			let result = match deserialize(&text) {
+				Ok(ClientMessageType::UserMessage(content)) => {
+					// Handle the possibility of a timestamp being unable to be created when a
+					// message is bring processed
+					let Ok(timestamp) = time::OffsetDateTime::now_local() else {
+						tracing::error!("could not create an OffsetDateTime for received message");
+
+						let mut lock = self.sender.lock().await;
+
+						// Report to the client that their message could not be processed
+						message::send_error(
+							&mut lock,
+							messenger_common::server::Error::CannotProcess,
+						)
+						.await;
+
+						// Drop the lock early
+						drop(lock);
+
+						// Skip to the next message.
+						continue;
+					};
+
+					let message: MessageType =
+						UserMessage::new(content, self.username.clone(), timestamp).into();
+
+					Some(message)
+				}
+				Ok(_) | Err(_) => {
+					let mut lock = self.sender.lock().await;
+
+					// Messages outside of type UserMessage are unexpected, and should be
+					// reported
+					message::send_error(&mut lock, messenger_common::server::Error::InvalidMessage)
+						.await;
+
+					drop(lock);
+
+					None
+				}
+			};
+
+			if let Some(message) = result {
+				message.send(&state.tx);
+				if let Err(error) = message.append_to_history(state).await {
+					tracing::error!(
+						"error encountered when appending a message to history: {error}"
+					);
+				}
+			}
+		}
+	}
+
+	/// Processes a send operation for this session.
+	pub async fn send(&self) {
+		let mut lock = self.state_rx.lock().await;
+
+		while let Ok(msg) = lock.recv().await {
+			// In any websocket error, break loop.
+			if self
+				.sender
+				.lock()
+				.await
+				.send(Message::Text(msg))
+				.await
+				.is_err()
+			{
+				break;
+			}
+		}
+	}
+
+	/// Transmits a copy of the last 100 messages and a list of online users to
+	/// the user.
+	pub async fn transmit_initial_data(&self, state: &Arc<crate::app::State>) -> Result<(), Error> {
+		self.feed_message_history(state).await;
+		self.feed_online_users(state).await?;
+
+		self.sender.lock().await.flush().await?;
+
+		Ok(())
+	}
+
+	/// Retrieves a copy to the username associated with this session.
+	pub const fn username(&self) -> &String {
+		&self.username
+	}
+}