monitoring.rs 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161
  1. use lazy_static::lazy_static;
  2. use prometheus::{IntCounter, Registry};
  3. use std::result::Result;
  4. #[cfg(test)]
  5. use std::sync::atomic::{AtomicBool, Ordering};
  6. use warp::{Filter, Rejection, Reply};
  7. lazy_static! {
  8. pub static ref INCOMING_REQUESTS: IntCounter =
  9. IntCounter::new("incoming_requests", "Incoming Requests").expect("metric can be created");
  10. pub static ref REGISTRY: Registry = Registry::new();
  11. }
  12. #[cfg(test)]
  13. lazy_static! {
  14. static ref METRICS_REGISTERED: AtomicBool = AtomicBool::new(false);
  15. }
  16. #[cfg(test)]
  17. fn register_custom_metrics() {
  18. if METRICS_REGISTERED
  19. .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
  20. .is_ok()
  21. {
  22. REGISTRY
  23. .register(Box::new(INCOMING_REQUESTS.clone()))
  24. .expect("collector can be registered");
  25. // Add more metrics here as needed
  26. }
  27. }
  28. #[cfg(not(test))]
  29. fn register_custom_metrics() {
  30. REGISTRY
  31. .register(Box::new(INCOMING_REQUESTS.clone()))
  32. .expect("collector can be registered");
  33. // Add more metrics here as needed
  34. }
  35. #[cfg(not(tarpaulin_include))]
  36. pub async fn web_main() {
  37. register_custom_metrics();
  38. let metrics_route = warp::path!("metrics").and_then(metrics_handler);
  39. let status_route = warp::path!("status").and_then(status_handler);
  40. warp::serve(metrics_route.or(status_route))
  41. .run(([0, 0, 0, 0], 9090))
  42. .await;
  43. }
  44. async fn status_handler() -> Result<impl Reply, Rejection> {
  45. Ok("ok")
  46. }
  47. async fn metrics_handler() -> Result<impl Reply, Rejection> {
  48. use prometheus::Encoder;
  49. let encoder = prometheus::TextEncoder::new();
  50. let mut buffer = Vec::new();
  51. if let Err(e) = encoder.encode(&REGISTRY.gather(), &mut buffer) {
  52. eprintln!("could not encode custom metrics: {}", e);
  53. };
  54. let mut res = match String::from_utf8(buffer.clone()) {
  55. Ok(v) => v,
  56. Err(e) => {
  57. eprintln!("custom metrics could not be from_utf8'd: {}", e);
  58. String::default()
  59. }
  60. };
  61. buffer.clear();
  62. let mut buffer = Vec::new();
  63. if let Err(e) = encoder.encode(&prometheus::gather(), &mut buffer) {
  64. eprintln!("could not encode prometheus metrics: {}", e);
  65. };
  66. let res_custom = match String::from_utf8(buffer.clone()) {
  67. Ok(v) => v,
  68. Err(e) => {
  69. eprintln!("prometheus metrics could not be from_utf8'd: {}", e);
  70. String::default()
  71. }
  72. };
  73. buffer.clear();
  74. res.push_str(&res_custom);
  75. Ok(res)
  76. }
  77. #[cfg(test)]
  78. mod tests {
  79. use super::*;
  80. use reqwest::{Client, StatusCode};
  81. use warp::Filter;
  82. // Start a Warp server for testing
  83. async fn setup() -> String {
  84. register_custom_metrics();
  85. let metrics_route = warp::path!("metrics").and_then(metrics_handler);
  86. let status_route = warp::path!("status").and_then(status_handler);
  87. let routes = metrics_route.or(status_route);
  88. let (addr, server) = warp::serve(routes).bind_ephemeral(([127, 0, 0, 1], 0)); // Bind to a random port
  89. tokio::spawn(async move {
  90. server.await;
  91. }); // Spawn the server in a background task
  92. format!("http://{}", addr) // Return the address
  93. }
  94. #[tokio::test]
  95. async fn test_status_handler() {
  96. let base_url = setup().await;
  97. println!("{}", base_url);
  98. let client = Client::builder().no_proxy().build().unwrap();
  99. let response = client
  100. .get(format!("{}/status", base_url))
  101. .send()
  102. .await
  103. .unwrap();
  104. assert_eq!(response.status(), StatusCode::OK);
  105. assert_eq!(response.text().await.unwrap(), "ok");
  106. }
  107. #[tokio::test]
  108. async fn test_metrics_handler() {
  109. let base_url = setup().await;
  110. let client = Client::builder().no_proxy().build().unwrap();
  111. let response = client
  112. .get(format!("{}/metrics", base_url))
  113. .send()
  114. .await
  115. .unwrap();
  116. assert_eq!(response.status(), StatusCode::OK);
  117. assert!(response
  118. .text()
  119. .await
  120. .unwrap()
  121. .contains("incoming_requests 0"));
  122. INCOMING_REQUESTS.inc();
  123. let response = client
  124. .get(format!("{}/metrics", base_url))
  125. .send()
  126. .await
  127. .unwrap();
  128. assert!(response
  129. .text()
  130. .await
  131. .unwrap()
  132. .contains("incoming_requests 1"));
  133. }
  134. }