diff --git a/Cargo.lock b/Cargo.lock index 81df601..2c44d93 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -617,6 +617,38 @@ dependencies = [ "libc", ] +[[package]] +name = "serde" +version = "1.0.219" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5f0e2c6ed6606019b4e29e69dbaba95b11854410e5347d525002456dbbb786b6" +dependencies = [ + "serde_derive", +] + +[[package]] +name = "serde_derive" +version = "1.0.219" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5b0276cf7f2c73365f7157c8123c21cd9a50fbbd844757af28ca1f5925fc2a00" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "serde_json" +version = "1.0.140" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "20068b6e96dc6c9bd23e01df8827e6c7e1f2fddd43c21810382803c136b99373" +dependencies = [ + "itoa", + "memchr", + "ryu", + "serde", +] + [[package]] name = "shlex" version = "1.3.0" @@ -820,6 +852,8 @@ dependencies = [ "rand", "ratatui", "rumqttc", + "serde", + "serde_json", "tokio", "tokio-stream", ] diff --git a/Cargo.toml b/Cargo.toml index b88347a..791ebf9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,3 +10,5 @@ rumqttc = "0.24.0" tokio = { version = "1.38.0", features = ["full"] } tokio-stream = "0.1.15" rand = "0.8.5" +serde = { version = "1.0", features = ["derive"] } +serde_json = "1.0" diff --git a/src/main.rs b/src/main.rs index 7312bc0..e09f833 100644 --- a/src/main.rs +++ b/src/main.rs @@ -9,17 +9,26 @@ use std::{io, time::Duration}; use tokio::sync::mpsc; use tokio_stream::StreamExt; use rand::Rng; +use serde::{Deserialize, Serialize}; + +#[derive(Debug, Serialize, Deserialize)] +struct ChatMessage { + username: String, + message: String, +} struct App { - messages: Vec, + messages: Vec<(String, String)>, input: String, username: String, } impl App { fn new(username: String) -> App { + let mut messages = Vec::new(); + messages.push(("System".to_string(), format!("Welcome to the chat, {}!", username))); App { - messages: Vec::new(), + messages, input: String::new(), username, } @@ -61,7 +70,7 @@ async fn main() -> Result<(), Box> { } async fn run_app(terminal: &mut Terminal, mut app: App) -> io::Result<()> { - let (tx, mut rx) = mpsc::channel::(100); + let (tx, mut rx) = mpsc::channel::<(String, String)>(100); let mut mqttoptions = MqttOptions::new(app.username.clone(), "172.16.0.3", 1883); mqttoptions.set_keep_alive(Duration::from_secs(5)); @@ -69,14 +78,32 @@ async fn run_app(terminal: &mut Terminal, mut app: App) -> io::Re let (client, mut eventloop) = AsyncClient::new(mqttoptions, 10); client.subscribe("chat/msg", QoS::AtMostOnce).await.unwrap(); + // Publish connect message + let connect_message = ChatMessage { + username: app.username.clone(), + message: "has connected".to_string(), + }; + let connect_payload = serde_json::to_string(&connect_message).unwrap(); + client.publish("chat/msg", QoS::AtMostOnce, false, connect_payload.as_bytes()).await.unwrap(); + let client_clone = client.clone(); tokio::spawn(async move { while let Ok(notification) = eventloop.poll().await { if let rumqttc::Event::Incoming(rumqttc::Packet::Publish(p)) = notification { - let message = String::from_utf8_lossy(&p.payload).to_string(); - if tx.send(message).await.is_err() { - break; + let message_str = String::from_utf8_lossy(&p.payload).to_string(); + match serde_json::from_str::(&message_str) { + Ok(chat_msg) => { + if tx.send((chat_msg.username, chat_msg.message)).await.is_err() { + break; + } + } + Err(_) => { + // Handle malformed messages + if tx.send(("System".to_string(), message_str)).await.is_err() { + break; + } + } } } } @@ -92,15 +119,19 @@ async fn run_app(terminal: &mut Terminal, mut app: App) -> io::Re if let Event::Key(key) = event { match key.code { KeyCode::Enter => { - let message = app.input.drain(..).collect::(); - if message.starts_with("/nick ") { - let new_username = message.split_whitespace().nth(1).unwrap_or(&app.username).to_string(); + let message_text = app.input.drain(..).collect::(); + if message_text.starts_with("/nick ") { + let new_username = message_text.split_whitespace().nth(1).unwrap_or(&app.username).to_string(); app.username = new_username; - app.messages.push(format!("Username changed to: {}", app.username)); + app.messages.push(("System".to_string(), format!("Username changed to: {}", app.username))); } else { - let formatted_message = format!("{}: {}", app.username, message); + let chat_message = ChatMessage { + username: app.username.clone(), + message: message_text, + }; + let payload = serde_json::to_string(&chat_message).unwrap(); client_clone - .publish("chat/msg", QoS::AtMostOnce, false, formatted_message.as_bytes()) + .publish("chat/msg", QoS::AtMostOnce, false, payload.as_bytes()) .await .unwrap(); } @@ -118,8 +149,8 @@ async fn run_app(terminal: &mut Terminal, mut app: App) -> io::Re } } } - Some(message) = rx.recv() => { - app.messages.push(message); + Some((username, message)) = rx.recv() => { + app.messages.push((username, message)); } else => { break; @@ -140,7 +171,7 @@ fn ui(f: &mut Frame, app: &App) { let messages: Vec = app .messages .iter() - .map(|m| ListItem::new(m.as_str())) + .map(|(username, message)| ListItem::new(format!("{}: {}", username, message))) .collect(); let messages = List::new(messages) .block(Block::default().borders(Borders::ALL).title("Messages"));