telegram.rs 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534
  1. use crate::categories::{get_category_from_tg, CatStats};
  2. use crate::convert::{convert, non_cat_items};
  3. use crate::tgusermanager::{user_manager, TgManagerCommand};
  4. use crate::user::User;
  5. use std::error::Error as StdError;
  6. use derive_more::From;
  7. use qif_generator::{account::Account, account::AccountType};
  8. use std::fmt::Debug;
  9. use std::str::FromStr;
  10. use std::sync::{
  11. atomic::{AtomicBool, Ordering},
  12. Arc,
  13. };
  14. use teloxide::types::*;
  15. use teloxide::{
  16. dispatching::dialogue::{InMemStorage, Storage},
  17. DownloadError, RequestError,
  18. };
  19. use teloxide::{net::Download, types::File as TgFile, Bot};
  20. use teloxide::{prelude::*, utils::command::BotCommand};
  21. use thiserror::Error;
  22. use tokio::fs::File;
  23. use tokio::io::AsyncWriteExt;
  24. use tokio::sync::{mpsc, oneshot};
  25. use tokio_stream::wrappers::UnboundedReceiverStream;
  26. #[cfg(feature = "telegram")]
  27. #[tokio::main]
  28. pub async fn bot() {
  29. run().await;
  30. }
  31. /// Possible error while receiving a file
  32. #[cfg(feature = "telegram")]
  33. #[derive(Debug, Error, From)]
  34. enum FileReceiveError {
  35. /// Telegram request error
  36. #[error("Web request error: {0}")]
  37. Request(#[source] RequestError),
  38. /// Io error while writing file
  39. #[error("An I/O error: {0}")]
  40. Io(#[source] std::io::Error),
  41. /// Download error while getting file from telegram
  42. #[error("File download error: {0}")]
  43. Download(#[source] DownloadError),
  44. }
  45. /// Possible error while receiving a file
  46. #[cfg(feature = "telegram")]
  47. #[derive(Debug, Error, From)]
  48. enum FileConvertError {
  49. /// Telegram request error
  50. #[error("JSON conversion error: {0}")]
  51. Request(String),
  52. /// Io error while writing file
  53. #[error("An I/O error: {0}")]
  54. Io(#[source] std::io::Error),
  55. }
  56. #[derive(BotCommand, Debug)]
  57. #[command(rename = "lowercase", description = "These commands are supported:")]
  58. enum Command {
  59. #[command(description = "display this text.")]
  60. Help,
  61. #[command(description = "Register new user in bot.")]
  62. Start,
  63. #[command(description = "Delete account.")]
  64. Delete,
  65. #[command(description = "Request something")]
  66. Request,
  67. }
  68. #[cfg(feature = "telegram")]
  69. static IS_RUNNING: AtomicBool = AtomicBool::new(false);
  70. #[cfg(feature = "telegram")]
  71. async fn download_file(downloader: &Bot, file_id: &str) -> Result<String, FileReceiveError> {
  72. let TgFile {
  73. file_id, file_path, ..
  74. } = downloader.get_file(file_id).send().await?;
  75. let filepath = format!("/tmp/{}", file_id);
  76. let mut file = File::create(&filepath).await?;
  77. downloader.download_file(&file_path, &mut file).await?;
  78. Ok(filepath)
  79. }
  80. #[cfg(feature = "telegram")]
  81. async fn convert_file(
  82. jsonfile: &str,
  83. user: &mut User,
  84. ctx: &UpdateWithCx<AutoSend<Bot>, Message>,
  85. ) -> Result<String, FileConvertError> {
  86. let filepath = format!("{}.qif", jsonfile);
  87. log::info!("Converting file into {}", filepath);
  88. let mut file = File::create(&filepath).await?;
  89. log::info!("Got file");
  90. for i in non_cat_items(jsonfile, user) {
  91. log::info!("Message about {}", i);
  92. let newcat = input_category_from_tg(&i, &user.catmap, &user.accounts, ctx).await;
  93. ctx.answer(format!("{} is set to {}", i, newcat))
  94. .await
  95. .unwrap();
  96. }
  97. let acc = Account::new()
  98. .name("Wallet")
  99. .account_type(AccountType::Cash)
  100. .build();
  101. let cat = &|item: &str, stats: &mut CatStats, accounts: &[String]| -> String {
  102. get_category_from_tg(item, stats, accounts, ctx)
  103. };
  104. let t = convert(jsonfile, "Test", user, &acc, cat)?;
  105. file.write(acc.to_string().as_bytes()).await?;
  106. file.write(t.to_string().as_bytes()).await?;
  107. Ok(filepath)
  108. }
  109. #[cfg(feature = "telegram")]
  110. pub fn bot_is_running() -> bool {
  111. IS_RUNNING.load(Ordering::SeqCst)
  112. }
  113. #[cfg(feature = "telegram")]
  114. pub async fn input_category_from_tg(
  115. item: &str,
  116. _cats: &CatStats,
  117. accounts: &[String],
  118. cx: &UpdateWithCx<AutoSend<Bot>, Message>,
  119. ) -> String {
  120. log::info!("{:?}", accounts);
  121. let keyboard = InlineKeyboardMarkup::default().append_row(
  122. accounts
  123. .iter()
  124. .filter(|l| l.starts_with("Expenses:"))
  125. .map(|line| {
  126. InlineKeyboardButton::new(
  127. line.strip_prefix("Expenses:").unwrap(),
  128. InlineKeyboardButtonKind::CallbackData(line.into()),
  129. )
  130. }),
  131. );
  132. cx.answer(format!("Input category for {}", item))
  133. .reply_markup(ReplyMarkup::InlineKeyboard(keyboard))
  134. .await
  135. .unwrap();
  136. String::new()
  137. }
  138. #[derive(Transition, From, Clone)]
  139. pub enum Dialogue {
  140. Idle(IdleState),
  141. NewJson(NewJsonState),
  142. CategorySelect(CategorySelectState),
  143. SubCategorySelect(SubCategorySelectState),
  144. ItemReady(ItemReadyState),
  145. Ready(QIFReadyState),
  146. }
  147. impl Default for Dialogue {
  148. fn default() -> Self {
  149. Self::Idle(IdleState)
  150. }
  151. }
  152. #[derive(Clone)]
  153. pub struct IdleState;
  154. #[derive(Clone)]
  155. pub struct NewJsonState {
  156. pub filename: String,
  157. }
  158. #[derive(Clone)]
  159. pub struct CategorySelectState {
  160. pub filename: String,
  161. pub item: String,
  162. }
  163. #[derive(Clone)]
  164. pub struct SubCategorySelectState {
  165. pub filename: String,
  166. pub item: String,
  167. pub category: String,
  168. }
  169. #[derive(Clone)]
  170. pub struct ItemReadyState {
  171. pub filename: String,
  172. pub item: String,
  173. pub fullcat: String,
  174. }
  175. #[derive(Clone)]
  176. pub struct QIFReadyState;
  177. #[teloxide(subtransition)]
  178. async fn new_json(
  179. state: NewJsonState,
  180. cx: TransitionIn<AutoSend<Bot>>,
  181. item: String,
  182. ) -> TransitionOut<Dialogue> {
  183. log::info!("File {}", &state.filename);
  184. let mut is_file = false;
  185. let mut file_id: String = "".to_string();
  186. {
  187. let update = &cx.update;
  188. if let MessageKind::Common(msg) = &update.kind {
  189. if let MediaKind::Document(doc) = &msg.media_kind {
  190. is_file = true;
  191. file_id = String::from_str(&state.filename).unwrap_or("".to_string());
  192. }
  193. }
  194. }
  195. if is_file {
  196. log::info!("File {} received", file_id);
  197. cx.answer(format!("New file received!!!111 {}", file_id))
  198. .await?;
  199. } else {
  200. cx.answer(format!("Unsupported media provided")).await?;
  201. }
  202. if let Ok(newfile) = download_file(cx.requester.inner(), &file_id).await {
  203. cx.answer(format!("File received: {:} ", newfile)).await?;
  204. if let Some(tguser) = cx.update.from() {
  205. let user = User::new(tguser.id, &None);
  206. cx.answer(format!("Active user: {:} ", tguser.id)).await?;
  207. let filepath = format!("{}.qif", &newfile);
  208. log::info!("Received file {}", &filepath);
  209. let mut i = non_cat_items(&newfile, &user);
  210. if let Some(item) = i.pop() {
  211. log::info!("No category for {}", &item);
  212. cx.answer(format!("Select category for {}", item)).await?;
  213. next(CategorySelectState {
  214. filename: state.filename,
  215. item,
  216. })
  217. } else {
  218. log::info!("Empty state");
  219. next(state)
  220. }
  221. /* if let Ok(result) = convert_file(&newfile, &mut user, &cx).await {
  222. cx.answer(format!("File converted into: {:} ", result))
  223. .await?;
  224. next(CategorySelectState { item: file_id })
  225. } else {
  226. next(state)
  227. }
  228. */
  229. } else {
  230. log::info!("Empty state 2");
  231. next(state)
  232. }
  233. } else {
  234. log::info!("Newfile {} fail", item);
  235. cx.answer("Waiting for a JSON receipt in new_json").await?;
  236. next(state)
  237. }
  238. }
  239. #[teloxide(subtransition)]
  240. async fn category_select(
  241. state: CategorySelectState,
  242. cx: TransitionIn<AutoSend<Bot>>,
  243. ans: String,
  244. ) -> TransitionOut<Dialogue> {
  245. let accounts = [
  246. "Expenses:Alco".to_string(),
  247. "Expenses:Groceries".to_string(),
  248. ];
  249. let keyboard = InlineKeyboardMarkup::default().append_row(
  250. accounts
  251. .iter()
  252. .filter(|l| l.starts_with("Expenses:"))
  253. .map(|line| {
  254. InlineKeyboardButton::new(
  255. line.strip_prefix("Expenses:").unwrap(),
  256. InlineKeyboardButtonKind::CallbackData(line.into()),
  257. )
  258. }),
  259. );
  260. cx.answer(format!("Input category for {}", state.item))
  261. .reply_markup(ReplyMarkup::InlineKeyboard(keyboard))
  262. .await?;
  263. next(SubCategorySelectState {
  264. filename: state.filename,
  265. item: state.item,
  266. category: ans,
  267. })
  268. }
  269. #[teloxide(subtransition)]
  270. async fn subcategory_select(
  271. state: SubCategorySelectState,
  272. cx: TransitionIn<AutoSend<Bot>>,
  273. subcategory: String,
  274. ) -> TransitionOut<Dialogue> {
  275. cx.answer(format!("Select subcategory for {}", state.item))
  276. .await?;
  277. next(ItemReadyState {
  278. filename: state.filename,
  279. item: state.item,
  280. fullcat: format!("{}:{}", state.category, subcategory),
  281. })
  282. }
  283. #[teloxide(subtransition)]
  284. async fn item_ready(
  285. state: ItemReadyState,
  286. cx: TransitionIn<AutoSend<Bot>>,
  287. item: String,
  288. ) -> TransitionOut<Dialogue> {
  289. cx.answer(format!(
  290. "Item {} is ready for caterogy {}",
  291. state.item, state.fullcat
  292. ))
  293. .await?;
  294. next(QIFReadyState)
  295. }
  296. #[teloxide(subtransition)]
  297. async fn qif_ready(
  298. state: QIFReadyState,
  299. cx: TransitionIn<AutoSend<Bot>>,
  300. item: String,
  301. ) -> TransitionOut<Dialogue> {
  302. cx.answer(format!("QIF is ready for {}", item)).await?;
  303. next(IdleState)
  304. }
  305. #[teloxide(subtransition)]
  306. async fn idling(
  307. state: IdleState,
  308. cx: TransitionIn<AutoSend<Bot>>,
  309. item: String,
  310. ) -> TransitionOut<Dialogue> {
  311. cx.answer(format!("Waiting for json or command")).await?;
  312. next(state)
  313. }
  314. type StorageError = <InMemStorage<Dialogue> as Storage<Dialogue>>::Error;
  315. #[derive(Debug, Error)]
  316. enum Error {
  317. #[error("error from Telegram: {0}")]
  318. TelegramError(#[from] RequestError),
  319. }
  320. type In = DialogueWithCx<AutoSend<Bot>, Message, Dialogue, StorageError>;
  321. async fn handle_message(
  322. cx: UpdateWithCx<AutoSend<Bot>, Message>,
  323. dialogue: Dialogue,
  324. tx: mpsc::Sender<TgManagerCommand>,
  325. ) -> TransitionOut<Dialogue> {
  326. let ans = cx.update.text().map(ToOwned::to_owned);
  327. match dialogue {
  328. Dialogue::Idle(_) => {
  329. match ans {
  330. None => {
  331. log::info!("No text");
  332. let mut is_file = false;
  333. let mut file_id: String = "".to_string();
  334. {
  335. let update = &cx.update;
  336. if let MessageKind::Common(msg) = &update.kind {
  337. if let MediaKind::Document(doc) = &msg.media_kind {
  338. is_file = true;
  339. file_id = doc.document.file_id.clone();
  340. }
  341. }
  342. }
  343. if is_file {
  344. log::info!("File {} received", file_id);
  345. next(NewJsonState { filename: file_id })
  346. // dialogue.react(cx, file_id).await
  347. } else {
  348. cx.answer(format!("Unsupported media provided")).await?;
  349. next(dialogue)
  350. }
  351. }
  352. Some(ans) => {
  353. if let Ok(command) = Command::parse(&ans, "tgqif") {
  354. match command {
  355. Command::Help => {
  356. cx.answer(Command::descriptions()).send().await?;
  357. next(dialogue)
  358. }
  359. Command::Start => {
  360. if let Some(user) = cx.update.from() {
  361. cx.answer(format!(
  362. "You registered as @{} with id {}.",
  363. user.first_name, user.id
  364. ))
  365. .await?;
  366. }
  367. next(dialogue)
  368. }
  369. Command::Delete => {
  370. if let Some(user) = cx.update.from() {
  371. cx.answer(format!("Deleting data for user {}", user.id))
  372. .await?;
  373. }
  374. next(dialogue)
  375. }
  376. Command::Request => {
  377. let (send, recv) = oneshot::channel();
  378. if tx
  379. .send(TgManagerCommand::Get {
  380. user_id: ans.clone(),
  381. reply_to: send,
  382. })
  383. .await
  384. .is_err()
  385. {
  386. cx.answer("Can't request data").await?;
  387. };
  388. match recv.await {
  389. Ok(value) => {
  390. cx.answer(format!("I have an answer: {} ", value)).await?
  391. }
  392. Err(_) => cx.answer("No data available").await?,
  393. };
  394. next(dialogue)
  395. }
  396. }
  397. } else {
  398. next(dialogue)
  399. }
  400. }
  401. }
  402. }
  403. _ => dialogue.react(cx, ans.unwrap_or(String::new())).await, //next(dialogue)
  404. // dialogue.react(cx, ans).await
  405. }
  406. }
  407. /// When it receives a callback from a button it edits the message with all
  408. /// those buttons writing a text with the selected Debian version.
  409. async fn callback_handler(
  410. cx: UpdateWithCx<AutoSend<Bot>, CallbackQuery>,
  411. stor: Arc<InMemStorage<Dialogue>>,
  412. ) -> Result<(), Box<dyn StdError + Send + Sync>>
  413. where
  414. {
  415. let UpdateWithCx {
  416. requester: bot,
  417. update: query,
  418. } = cx;
  419. if let Some(version) = query.data {
  420. let text = format!("{}", version);
  421. match query.message {
  422. Some(Message { id, chat, .. }) => {
  423. // bot.edit_message_text(chat.id, id, text).await?;
  424. bot.send_message(chat.id, text).await?;
  425. }
  426. None => {
  427. if let Some(id) = query.inline_message_id {
  428. // bot.edit_message_text_inline(dbg!(id), text).await?;
  429. bot.send_message(id, text).await?;
  430. }
  431. }
  432. }
  433. log::info!("You chose: {}", version);
  434. }
  435. Ok(())
  436. }
  437. #[cfg(feature = "telegram")]
  438. async fn run() {
  439. teloxide::enable_logging!();
  440. log::info!("Starting telegram bot");
  441. IS_RUNNING.store(true, Ordering::SeqCst);
  442. let (tx, mut rx) = mpsc::channel(32);
  443. let manager = tokio::spawn(async move { user_manager(&mut rx).await });
  444. let storage: Arc<InMemStorage<Dialogue>> = InMemStorage::new();
  445. let storage_msg = storage.clone();
  446. let storage_callback = storage.clone();
  447. let bot = Bot::from_env().auto_send();
  448. // TODO: Add Dispatcher to process UpdateKinds
  449. {
  450. let storage_msg = storage_msg.clone();
  451. let storage_callback = storage_callback.clone();
  452. Dispatcher::new(bot)
  453. .messages_handler(DialogueDispatcher::with_storage(
  454. move |DialogueWithCx { cx, dialogue }: In| {
  455. let _tx = tx.clone();
  456. async move {
  457. let dialogue = dialogue.expect("std::convert::Infallible");
  458. handle_message(cx, dialogue, _tx)
  459. .await
  460. .expect("Something wrong with the bot!")
  461. }
  462. },
  463. storage_msg,
  464. ))
  465. .callback_queries_handler({
  466. let storage_callback = storage_callback.clone();
  467. move |rx: DispatcherHandlerRx<AutoSend<Bot>, CallbackQuery>| {
  468. let storage_callback = storage_callback.clone();
  469. UnboundedReceiverStream::new(rx).for_each_concurrent(None, {
  470. let storage_callback = storage_callback.clone();
  471. |cx| async move {
  472. callback_handler(cx, storage_callback.clone())
  473. .await
  474. .log_on_error()
  475. .await;
  476. }
  477. })
  478. }
  479. })
  480. .dispatch()
  481. .await;
  482. }
  483. drop(manager);
  484. IS_RUNNING.store(false, Ordering::SeqCst);
  485. }