Ver Fonte

[Telegram] Migrated to TgUserManager from raw User

Slava Barinov há 2 anos atrás
pai
commit
059cd578b9
3 ficheiros alterados com 154 adições e 39 exclusões
  1. 123 31
      src/telegram.rs
  2. 17 8
      src/tgusermanager.rs
  3. 14 0
      src/user.rs

+ 123 - 31
src/telegram.rs

@@ -5,13 +5,14 @@ use qif_generator::account::{Account, AccountType};
 #[cfg(feature = "monitoring")]
 use crate::monitoring;
 use crate::tgusermanager::user_manager;
-use crate::user::User;
 use std::collections::{HashMap, HashSet};
 use std::fmt;
 
+use crate::tgusermanager::TgManagerCommand;
 use derive_more::From;
 use std::fmt::Debug;
 use std::str::FromStr;
+use std::sync::Arc;
 use teloxide::types::*;
 use teloxide::{
     dispatching::dialogue::InMemStorage, net::Download, prelude::*, types::File as TgFile,
@@ -19,7 +20,7 @@ use teloxide::{
 };
 use thiserror::Error;
 use tokio::fs::File;
-use tokio::sync::mpsc;
+use tokio::sync::{mpsc, oneshot};
 
 #[cfg(feature = "telegram")]
 #[tokio::main]
@@ -54,6 +55,21 @@ enum FileConvertError {
     Io(#[source] std::io::Error),
 }
 
+/// Possible error while receiving a file
+#[cfg(feature = "telegram")]
+#[derive(Debug, Error, From)]
+enum UserManagerError {
+    /// Manager didn't respond
+    #[error("Couldn't request user: {0}")]
+    Request(String),
+}
+
+use tokio::sync::mpsc::Sender;
+
+struct ManagerHandle<T> {
+    tx: Sender<T>,
+}
+
 #[derive(BotCommands, Clone, Debug)]
 #[command(
     rename_rule = "lowercase",
@@ -88,7 +104,10 @@ async fn command_handler(
     _me: teloxide::types::Me,
     msg: Message,
     cmd: Command,
+    manager_handle: Arc<ManagerHandle<TgManagerCommand>>,
 ) -> HandlerResult {
+    let tx = &manager_handle.tx;
+
     match cmd {
         Command::Help => {
             bot.send_message(msg.chat.id, Command::descriptions().to_string())
@@ -113,34 +132,57 @@ async fn command_handler(
                 .await?
         }
         Command::NewAccount { account } => {
-            let mut user = User::new(msg.chat.id.0, &None);
-            user.new_account(account);
-            bot.send_message(msg.chat.id, "Account added".to_string())
-                .await?
-        }
-        Command::Accounts => {
-            let user = User::new(msg.chat.id.0, &None);
+            let (response_tx, response_rx) = oneshot::channel();
 
-            let list = |expense_set: &HashSet<String>| {
-                let mut sorted_expenses: Vec<String> = expense_set
-                    .iter()
-                    .filter(|s| s.starts_with("Expenses:"))
-                    .map(|s| s.trim_start_matches("Expenses:").to_owned())
-                    .collect();
+            tx.send(TgManagerCommand::Get {
+                user_id: msg.chat.id.0,
+                reply_to: response_tx,
+            })
+            .await?;
 
-                sorted_expenses.sort();
+            if let Ok(mut user) = response_rx.await {
+                user.new_account(account);
+                bot.send_message(msg.chat.id, "Account added".to_string())
+                    .await?
+            } else {
+                bot.send_message(msg.chat.id, "Can't find the requested user".to_string())
+                    .await?
+            }
+        }
+        Command::Accounts => {
+            let (response_tx, response_rx) = oneshot::channel();
 
-                sorted_expenses
-                    .into_iter()
-                    .map(|s| s + "\n")
-                    .collect::<String>()
-            };
+            tx.send(TgManagerCommand::Get {
+                user_id: msg.chat.id.0,
+                reply_to: response_tx,
+            })
+            .await?;
 
-            bot.send_message(
-                msg.chat.id,
-                format!("Expense accounts:\n\n{}", list(&user.accounts)),
-            )
-            .await?
+            if let Ok(user) = response_rx.await {
+                let list = |expense_set: &HashSet<String>| {
+                    let mut sorted_expenses: Vec<String> = expense_set
+                        .iter()
+                        .filter(|s| s.starts_with("Expenses:"))
+                        .map(|s| s.trim_start_matches("Expenses:").to_owned())
+                        .collect();
+
+                    sorted_expenses.sort();
+
+                    sorted_expenses
+                        .into_iter()
+                        .map(|s| s + "\n")
+                        .collect::<String>()
+                };
+
+                bot.send_message(
+                    msg.chat.id,
+                    format!("Expense accounts:\n\n{}", list(&user.accounts)),
+                )
+                .await?
+            } else {
+                bot.send_message(msg.chat.id, "Can't find the requested user".to_string())
+                    .await?
+            }
         }
     };
 
@@ -243,6 +285,7 @@ async fn handle_json(
     dialogue: QIFDialogue,
     msg: Message,
     filename: String, // Available from `State::Idle`.
+    manager_handle: Arc<ManagerHandle<TgManagerCommand>>,
 ) -> HandlerResult {
     log::debug!("JSON state");
     log::info!("File {}", &filename);
@@ -268,7 +311,22 @@ async fn handle_json(
 
     if let Ok(newfile) = download_file(&bot, &file_id).await {
         log::info!("Active user: {:} File received: {:} ", msg.chat.id, newfile);
-        let user = User::new(msg.chat.id.0, &None);
+        let tx = &manager_handle.tx;
+        let (response_tx, response_rx) = oneshot::channel();
+
+        tx.send(TgManagerCommand::Get {
+            user_id: msg.chat.id.0,
+            reply_to: response_tx,
+        })
+        .await?;
+
+        let user = response_rx.await.map_err(|_| {
+            log::warn!("No response for TgUserManager");
+            Box::new(UserManagerError::Request(
+                "No response for TgUserManager".to_string(),
+            ))
+        })?;
+
         let (cat, mut uncat) = auto_cat_items(&newfile, &user);
 
         log::debug!("Categorized item list: {:?}", cat);
@@ -323,6 +381,7 @@ async fn handle_category(
     bot: Bot,
     dialogue: QIFDialogue,
     msg: Message,
+    manager_handle: Arc<ManagerHandle<TgManagerCommand>>,
     (filename, item, items_left, items_processed): (
         String,
         String,
@@ -348,7 +407,22 @@ async fn handle_category(
 
     let version = version.unwrap();
 
-    let user = User::new(msg.chat.id.0, &None);
+    let tx = &manager_handle.tx;
+    let (response_tx, response_rx) = oneshot::channel();
+
+    tx.send(TgManagerCommand::Get {
+        user_id: msg.chat.id.0,
+        reply_to: response_tx,
+    })
+    .await?;
+
+    let user = response_rx.await.map_err(|_| {
+        log::warn!("No response for TgUserManager");
+        Box::new(UserManagerError::Request(
+            "No response for TgUserManager".to_string(),
+        ))
+    })?;
+
     let accounts = user
         .accounts
         .iter()
@@ -461,10 +535,26 @@ async fn handle_qif_ready(
     bot: Bot,
     dialogue: QIFDialogue,
     msg: Message,
+    manager_handle: Arc<ManagerHandle<TgManagerCommand>>,
     (filename, item_categories): (String, HashMap<String, String>), // Available from `State::Ready`.
 ) -> HandlerResult {
     log::debug!("QIF Ready state");
-    let mut user = User::new(msg.chat.id.0, &None);
+    let tx = &manager_handle.tx;
+    let (response_tx, response_rx) = oneshot::channel();
+
+    tx.send(TgManagerCommand::Get {
+        user_id: msg.chat.id.0,
+        reply_to: response_tx,
+    })
+    .await?;
+
+    let mut user = response_rx.await.map_err(|_| {
+        log::warn!("No response for TgUserManager");
+        Box::new(UserManagerError::Request(
+            "No response for TgUserManager".to_string(),
+        ))
+    })?;
+
     let memo: &str = msg.text().unwrap_or("purchase");
 
     let acc = Account::new()
@@ -596,10 +686,12 @@ async fn run() {
     let monitoring_handle = tokio::spawn(async move { monitoring::web_main().await });
 
     log::info!("Starting telegram bot");
-    let (_tx, mut rx) = mpsc::channel(32);
+    let (tx, mut rx) = mpsc::channel(32);
 
     let manager = tokio::spawn(async move { user_manager(&mut rx).await });
 
+    let manager_handle = Arc::new(ManagerHandle { tx });
+
     let bot = Bot::from_env();
 
     let handler = dptree::entry()
@@ -648,7 +740,7 @@ async fn run() {
                 .endpoint(callback_handler),
         );
     Dispatcher::builder(bot, handler)
-        .dependencies(dptree::deps![InMemStorage::<State>::new()])
+        .dependencies(dptree::deps![InMemStorage::<State>::new(), manager_handle])
         .enable_ctrlc_handler()
         .build()
         .dispatch()

+ 17 - 8
src/tgusermanager.rs

@@ -1,15 +1,23 @@
+use crate::user::User;
 use tokio::sync::{mpsc, oneshot};
 
 #[derive(Debug)]
 pub enum TgManagerCommand {
     #[allow(dead_code)]
     Get {
-        user_id: String,
-        reply_to: oneshot::Sender<String>,
+        user_id: i64,
+        reply_to: oneshot::Sender<User>,
     },
 }
 
-pub async fn user_manager(rx: &mut mpsc::Receiver<TgManagerCommand>) {
+#[derive(Debug)]
+pub enum TgUserManagerError {
+    SendError,
+}
+
+pub async fn user_manager(
+    rx: &mut mpsc::Receiver<TgManagerCommand>,
+) -> Result<(), TgUserManagerError> {
     log::info!("Request came");
     while let Some(cmd) = rx.recv().await {
         use TgManagerCommand::*;
@@ -19,11 +27,12 @@ pub async fn user_manager(rx: &mut mpsc::Receiver<TgManagerCommand>) {
             Get { user_id, reply_to } => {
                 log::info!("{}", format!("Get command found, sending {}", user_id));
                 reply_to
-                    .send(format!("You've requested {}", user_id))
-                    .unwrap();
+                    .send(User::new(user_id, &None))
+                    .map_err(|_| TgUserManagerError::SendError)?
             }
         }
     }
+    Ok(())
 }
 
 #[cfg(test)]
@@ -37,11 +46,11 @@ mod tgusertest {
         let (response_tx, response_rx) = oneshot::channel();
 
         tokio::spawn(async move {
-            user_manager(&mut rx).await;
+            user_manager(&mut rx).await.unwrap();
         });
 
         tx.send(TgManagerCommand::Get {
-            user_id: "0".to_string(),
+            user_id: 0,
             reply_to: response_tx,
         })
         .await
@@ -49,7 +58,7 @@ mod tgusertest {
 
         // Now, await the response from the user_manager function
         if let Ok(response) = response_rx.await {
-            println!("Received response: {}", response);
+            println!("Received response: {:?}", response);
         } else {
             println!("The sender dropped without sending a response");
         }

+ 14 - 0
src/user.rs

@@ -4,12 +4,16 @@ use pickledb::{PickleDb, PickleDbDumpPolicy, SerializationMethod};
 use radix_trie::Trie;
 use shellexpand::tilde;
 use std::collections::HashSet;
+use std::fmt;
 use std::path::PathBuf;
 use std::time::Duration;
 use thiserror::Error;
 
 /// Configuration for single user
 pub struct User {
+    /// User id
+    pub uid: i64,
+
     /// Categories statistics for the user
     pub catmap: CatStats,
 
@@ -38,6 +42,15 @@ impl Drop for User {
     }
 }
 
+impl fmt::Debug for User {
+    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+        f.debug_struct("User")
+            .field("uid", &self.uid)
+            .field("db", &format_args!("<PickleDb>"))
+            .finish()
+    }
+}
+
 impl User {
     pub fn new(uid: i64, dbfile: &Option<String>) -> Self {
         let ten_sec = Duration::from_secs(10);
@@ -74,6 +87,7 @@ impl User {
         };
 
         User {
+            uid,
             catmap,
             accounts,
             db,