Mountain/IPC/Enhanced/PerformanceDashboard/
Dashboard.rs1#![allow(non_snake_case)]
2
3use std::{
10 collections::{HashMap, VecDeque},
11 sync::Arc,
12 time::{Duration, SystemTime},
13};
14
15use tokio::{
16 sync::{Mutex as AsyncMutex, RwLock},
17 time::interval,
18};
19
20use crate::{
21 IPC::Enhanced::PerformanceDashboard::{
22 AlertSeverity::Enum as AlertSeverity,
23 DashboardConfig::Struct as DashboardConfig,
24 DashboardStatistics::Struct as DashboardStatistics,
25 LogLevel::Enum as LogLevel,
26 MetricType::Enum as MetricType,
27 PerformanceAlert::Struct as PerformanceAlert,
28 PerformanceMetric::Struct as PerformanceMetric,
29 TraceLog::Struct as TraceLog,
30 TraceSpan::Struct as TraceSpan,
31 },
32 dev_log,
33};
34
35pub struct Struct {
36 pub(super) config:DashboardConfig,
37
38 pub(super) metrics:Arc<RwLock<VecDeque<PerformanceMetric>>>,
39
40 pub(super) traces:Arc<RwLock<HashMap<String, TraceSpan>>>,
41
42 pub(super) alerts:Arc<RwLock<VecDeque<PerformanceAlert>>>,
43
44 pub(super) statistics:Arc<RwLock<DashboardStatistics>>,
45
46 pub(super) is_running:Arc<AsyncMutex<bool>>,
47}
48
49impl Struct {
50 pub fn new(config:DashboardConfig) -> Self {
51 let config_clone = config.clone();
52
53 let dashboard = Self {
54 config,
55
56 metrics:Arc::new(RwLock::new(VecDeque::new())),
57
58 traces:Arc::new(RwLock::new(HashMap::new())),
59
60 alerts:Arc::new(RwLock::new(VecDeque::new())),
61
62 statistics:Arc::new(RwLock::new(DashboardStatistics {
63 total_metrics_collected:0,
64 total_traces_collected:0,
65 total_alerts_triggered:0,
66 average_processing_time_ms:0.0,
67 peak_processing_time_ms:0,
68 error_rate_percentage:0.0,
69 throughput_messages_per_second:0.0,
70 memory_usage_mb:0.0,
71 last_update:SystemTime::now()
72 .duration_since(SystemTime::UNIX_EPOCH)
73 .unwrap_or_default()
74 .as_secs(),
75 })),
76
77 is_running:Arc::new(AsyncMutex::new(false)),
78 };
79
80 dev_log!(
81 "ipc",
82 "[PerformanceDashboard] Created dashboard with {}ms update interval",
83 config_clone.update_interval_ms
84 );
85
86 dashboard
87 }
88
89 pub async fn start(&self) -> Result<(), String> {
90 {
91 let mut running = self.is_running.lock().await;
92
93 if *running {
94 return Ok(());
95 }
96
97 *running = true;
98 }
99
100 self.start_metrics_collection().await;
101
102 self.start_alert_monitoring().await;
103
104 self.start_data_cleanup().await;
105
106 dev_log!("ipc", "[PerformanceDashboard] Performance dashboard started");
107
108 Ok(())
109 }
110
111 pub async fn stop(&self) -> Result<(), String> {
112 {
113 let mut running = self.is_running.lock().await;
114
115 if !*running {
116 return Ok(());
117 }
118
119 *running = false;
120 }
121
122 {
123 let mut metrics = self.metrics.write().await;
124
125 metrics.clear();
126 }
127
128 {
129 let mut traces = self.traces.write().await;
130
131 traces.clear();
132 }
133
134 {
135 let mut alerts = self.alerts.write().await;
136
137 alerts.clear();
138 }
139
140 dev_log!("ipc", "[PerformanceDashboard] Performance dashboard stopped");
141
142 Ok(())
143 }
144
145 pub async fn record_metric(&self, metric:PerformanceMetric) {
146 let mut metrics = self.metrics.write().await;
147
148 metrics.push_back(metric.clone());
149
150 drop(metrics);
151
152 self.update_statistics().await;
153
154 self.check_alerts(&metric).await;
155
156 dev_log!("ipc", "[PerformanceDashboard] Recorded metric: {:?}", metric.metric_type);
157 }
158
159 pub async fn start_trace_span(&self, operation_name:String) -> TraceSpan {
160 let trace_id = Self::generate_trace_id();
161
162 let span_id = Self::generate_span_id();
163
164 let span = TraceSpan {
165 trace_id:trace_id.clone(),
166
167 span_id:span_id.clone(),
168
169 parent_span_id:None,
170
171 operation_name,
172
173 start_time:SystemTime::now()
174 .duration_since(SystemTime::UNIX_EPOCH)
175 .unwrap_or_default()
176 .as_millis() as u64,
177
178 end_time:None,
179
180 duration_ms:None,
181
182 tags:HashMap::new(),
183
184 logs:Vec::new(),
185 };
186
187 {
188 let mut traces = self.traces.write().await;
189
190 traces.insert(span_id.clone(), span.clone());
191 }
192
193 {
194 let mut stats = self.statistics.write().await;
195
196 stats.total_traces_collected += 1;
197 }
198
199 span
200 }
201
202 pub async fn end_trace_span(&self, span_id:&str) -> Result<(), String> {
203 let mut traces = self.traces.write().await;
204
205 if let Some(span) = traces.get_mut(span_id) {
206 let end_time = SystemTime::now()
207 .duration_since(SystemTime::UNIX_EPOCH)
208 .unwrap_or_default()
209 .as_millis() as u64;
210
211 span.end_time = Some(end_time);
212
213 span.duration_ms = Some(end_time.saturating_sub(span.start_time));
214
215 dev_log!(
216 "ipc",
217 "[PerformanceDashboard] Ended trace span: {} (duration: {}ms)",
218 span.operation_name,
219 span.duration_ms.unwrap_or(0)
220 );
221
222 Ok(())
223 } else {
224 Err(format!("Trace span not found: {}", span_id))
225 }
226 }
227
228 pub async fn add_trace_log(&self, span_id:&str, log:TraceLog) -> Result<(), String> {
229 let mut traces = self.traces.write().await;
230
231 if let Some(span) = traces.get_mut(span_id) {
232 span.logs.push(log);
233
234 Ok(())
235 } else {
236 Err(format!("Trace span not found: {}", span_id))
237 }
238 }
239
240 async fn start_metrics_collection(&self) {
241 let dashboard = Arc::new(self.clone());
242
243 tokio::spawn(async move {
244 let mut interval = interval(Duration::from_millis(dashboard.config.update_interval_ms));
245
246 while *dashboard.is_running.lock().await {
247 interval.tick().await;
248 dashboard.collect_system_metrics().await;
249 dashboard.update_statistics().await;
250 }
251 });
252 }
253
254 async fn start_alert_monitoring(&self) {
255 let dashboard = Arc::new(self.clone());
256
257 tokio::spawn(async move {
258 let mut interval = interval(Duration::from_secs(10));
259
260 while *dashboard.is_running.lock().await {
261 interval.tick().await;
262 dashboard.check_performance_alerts().await;
263 }
264 });
265 }
266
267 async fn start_data_cleanup(&self) {
268 let dashboard = Arc::new(self.clone());
269
270 tokio::spawn(async move {
271 let mut interval = interval(Duration::from_secs(3600));
272
273 while *dashboard.is_running.lock().await {
274 interval.tick().await;
275 dashboard.cleanup_old_data().await;
276 }
277 });
278 }
279
280 async fn collect_system_metrics(&self) {
281 if let Ok(memory_usage) = Self::get_memory_usage() {
282 let metric = PerformanceMetric {
283 metric_type:MetricType::MemoryUsage,
284
285 value:memory_usage,
286
287 timestamp:SystemTime::now()
288 .duration_since(SystemTime::UNIX_EPOCH)
289 .unwrap_or_default()
290 .as_millis() as u64,
291
292 channel:None,
293
294 tags:HashMap::new(),
295 };
296
297 self.record_metric(metric).await;
298 }
299
300 if let Ok(cpu_usage) = Self::get_cpu_usage() {
301 let metric = PerformanceMetric {
302 metric_type:MetricType::CpuUsage,
303
304 value:cpu_usage,
305
306 timestamp:SystemTime::now()
307 .duration_since(SystemTime::UNIX_EPOCH)
308 .unwrap_or_default()
309 .as_millis() as u64,
310
311 channel:None,
312
313 tags:HashMap::new(),
314 };
315
316 self.record_metric(metric).await;
317 }
318 }
319
320 async fn update_statistics(&self) {
321 let metrics = self.metrics.read().await;
322
323 let mut stats = self.statistics.write().await;
324
325 let processing_metrics:Vec<&PerformanceMetric> = metrics
326 .iter()
327 .filter(|m| matches!(m.metric_type, MetricType::MessageProcessingTime))
328 .collect();
329
330 if !processing_metrics.is_empty() {
331 let total_time:f64 = processing_metrics.iter().map(|m| m.value).sum();
332
333 stats.average_processing_time_ms = total_time / processing_metrics.len() as f64;
334
335 stats.peak_processing_time_ms = processing_metrics.iter().map(|m| m.value as u64).max().unwrap_or(0);
336 }
337
338 let error_metrics:Vec<&PerformanceMetric> = metrics
339 .iter()
340 .filter(|m| matches!(m.metric_type, MetricType::ErrorRate))
341 .collect();
342
343 if !error_metrics.is_empty() {
344 let total_errors:f64 = error_metrics.iter().map(|m| m.value).sum();
345
346 stats.error_rate_percentage = total_errors / error_metrics.len() as f64;
347 }
348
349 let throughput_metrics:Vec<&PerformanceMetric> = metrics
350 .iter()
351 .filter(|m| matches!(m.metric_type, MetricType::NetworkThroughput))
352 .collect();
353
354 if !throughput_metrics.is_empty() {
355 let total_throughput:f64 = throughput_metrics.iter().map(|m| m.value).sum();
356
357 stats.throughput_messages_per_second = total_throughput / throughput_metrics.len() as f64;
358 }
359
360 let memory_metrics:Vec<&PerformanceMetric> = metrics
361 .iter()
362 .filter(|m| matches!(m.metric_type, MetricType::MemoryUsage))
363 .collect();
364
365 if !memory_metrics.is_empty() {
366 let total_memory:f64 = memory_metrics.iter().map(|m| m.value).sum();
367
368 stats.memory_usage_mb = total_memory / memory_metrics.len() as f64;
369 }
370
371 stats.last_update = SystemTime::now()
372 .duration_since(SystemTime::UNIX_EPOCH)
373 .unwrap_or_default()
374 .as_secs();
375 }
376
377 async fn check_alerts(&self, metric:&PerformanceMetric) {
378 let threshold = match metric.metric_type {
379 MetricType::MessageProcessingTime => self.config.alert_threshold_ms as f64,
380
381 MetricType::ErrorRate => 5.0,
382
383 MetricType::MemoryUsage => 1024.0,
384
385 MetricType::CpuUsage => 90.0,
386
387 _ => return,
388 };
389
390 if metric.value > threshold {
391 let severity = match metric.value / threshold {
392 ratio if ratio > 5.0 => AlertSeverity::Critical,
393
394 ratio if ratio > 3.0 => AlertSeverity::High,
395
396 ratio if ratio > 2.0 => AlertSeverity::Medium,
397
398 _ => AlertSeverity::Low,
399 };
400
401 let alert = PerformanceAlert {
402 alert_id:Self::generate_alert_id(),
403
404 metric_type:metric.metric_type.clone(),
405
406 threshold,
407
408 current_value:metric.value,
409
410 timestamp:metric.timestamp,
411
412 channel:metric.channel.clone(),
413
414 severity,
415
416 message:format!(
417 "{} exceeded threshold: {} > {}",
418 Self::metric_type_name(&metric.metric_type),
419 metric.value,
420 threshold
421 ),
422 };
423
424 {
425 let mut alerts = self.alerts.write().await;
426
427 alerts.push_back(alert.clone());
428 }
429
430 {
431 let mut stats = self.statistics.write().await;
432
433 stats.total_alerts_triggered += 1;
434 }
435
436 dev_log!("ipc", "warn: [PerformanceDashboard] Alert triggered: {}", alert.message);
437 }
438 }
439
440 async fn check_performance_alerts(&self) {
441 dev_log!("ipc", "[PerformanceDashboard] Checking performance alerts");
442 }
443
444 async fn cleanup_old_data(&self) {
445 let retention_threshold = SystemTime::now()
446 .duration_since(SystemTime::UNIX_EPOCH)
447 .unwrap_or_default()
448 .as_secs()
449 - (self.config.metrics_retention_hours * 3600);
450
451 {
452 let mut metrics = self.metrics.write().await;
453
454 metrics.retain(|m| m.timestamp >= retention_threshold);
455 }
456
457 {
458 let mut traces = self.traces.write().await;
459
460 traces.retain(|_, span| span.start_time >= retention_threshold);
461
462 if traces.len() > self.config.max_traces_stored {
463 let excess = traces.len() - self.config.max_traces_stored;
464
465 let keys_to_remove:Vec<String> = traces.keys().take(excess).cloned().collect();
466
467 for key in keys_to_remove {
468 traces.remove(&key);
469 }
470 }
471 }
472
473 {
474 let mut alerts = self.alerts.write().await;
475
476 alerts.retain(|a| a.timestamp >= retention_threshold);
477 }
478
479 dev_log!("ipc", "[PerformanceDashboard] Cleaned up old data");
480 }
481
482 fn get_memory_usage() -> Result<f64, String> { Ok(100.0) }
483
484 fn get_cpu_usage() -> Result<f64, String> { Ok(25.0) }
485
486 fn generate_trace_id() -> String { uuid::Uuid::new_v4().to_string() }
487
488 fn generate_span_id() -> String { uuid::Uuid::new_v4().to_string() }
489
490 fn generate_alert_id() -> String { uuid::Uuid::new_v4().to_string() }
491
492 fn metric_type_name(metric_type:&MetricType) -> &'static str {
493 match metric_type {
494 MetricType::MessageProcessingTime => "Message Processing Time",
495
496 MetricType::ConnectionLatency => "Connection Latency",
497
498 MetricType::MemoryUsage => "Memory Usage",
499
500 MetricType::CpuUsage => "CPU Usage",
501
502 MetricType::NetworkThroughput => "Network Throughput",
503
504 MetricType::ErrorRate => "Error Rate",
505
506 MetricType::QueueSize => "Queue Size",
507 }
508 }
509
510 pub async fn get_statistics(&self) -> DashboardStatistics { self.statistics.read().await.clone() }
511
512 pub async fn get_recent_metrics(&self, limit:usize) -> Vec<PerformanceMetric> {
513 let metrics = self.metrics.read().await;
514
515 metrics.iter().rev().take(limit).cloned().collect()
516 }
517
518 pub async fn get_active_alerts(&self) -> Vec<PerformanceAlert> {
519 let alerts = self.alerts.read().await;
520
521 alerts.iter().rev().cloned().collect()
522 }
523
524 pub async fn get_trace(&self, trace_id:&str) -> Option<TraceSpan> {
525 let traces = self.traces.read().await;
526
527 traces.values().find(|span| span.trace_id == trace_id).cloned()
528 }
529
530 pub fn default_dashboard() -> Self { Self::new(DashboardConfig::default()) }
531
532 pub fn high_frequency_dashboard() -> Self {
533 Self::new(DashboardConfig {
534 update_interval_ms:1000,
535 metrics_retention_hours:1,
536 alert_threshold_ms:500,
537 trace_sampling_rate:1.0,
538 max_traces_stored:5000,
539 })
540 }
541
542 pub fn create_metric(
543 metric_type:MetricType,
544
545 value:f64,
546
547 channel:Option<String>,
548
549 tags:HashMap<String, String>,
550 ) -> PerformanceMetric {
551 PerformanceMetric {
552 metric_type,
553
554 value,
555
556 timestamp:SystemTime::now()
557 .duration_since(SystemTime::UNIX_EPOCH)
558 .unwrap_or_default()
559 .as_millis() as u64,
560
561 channel,
562
563 tags,
564 }
565 }
566
567 pub fn create_trace_log(message:String, level:LogLevel, fields:HashMap<String, String>) -> TraceLog {
568 TraceLog {
569 timestamp:SystemTime::now()
570 .duration_since(SystemTime::UNIX_EPOCH)
571 .unwrap_or_default()
572 .as_millis() as u64,
573
574 message,
575
576 level,
577
578 fields,
579 }
580 }
581
582 pub fn calculate_performance_score(average_processing_time:f64, error_rate:f64, throughput:f64) -> f64 {
583 let time_score = 100.0 / (1.0 + average_processing_time / 100.0);
584
585 let error_score = 100.0 * (1.0 - error_rate / 100.0);
586
587 let throughput_score = throughput / 1000.0;
588
589 (time_score * 0.4 + error_score * 0.4 + throughput_score * 0.2)
590 .max(0.0)
591 .min(100.0)
592 }
593
594 pub fn format_metric_value(metric_type:&MetricType, value:f64) -> String {
595 match metric_type {
596 MetricType::MessageProcessingTime => format!("{:.2}ms", value),
597
598 MetricType::ConnectionLatency => format!("{:.2}ms", value),
599
600 MetricType::MemoryUsage => format!("{:.2}MB", value),
601
602 MetricType::CpuUsage => format!("{:.2}%", value),
603
604 MetricType::NetworkThroughput => format!("{:.2} msg/s", value),
605
606 MetricType::ErrorRate => format!("{:.2}%", value),
607
608 MetricType::QueueSize => format!("{:.0}", value),
609 }
610 }
611}
612
613impl Clone for Struct {
614 fn clone(&self) -> Self {
615 Self {
616 config:self.config.clone(),
617
618 metrics:self.metrics.clone(),
619
620 traces:self.traces.clone(),
621
622 alerts:self.alerts.clone(),
623
624 statistics:self.statistics.clone(),
625
626 is_running:Arc::new(AsyncMutex::new(false)),
627 }
628 }
629}