Skip to main content

Mountain/IPC/AdvancedFeatures/
Features.rs

1#![allow(non_snake_case)]
2
3//! `AdvancedFeatures` aggregator - holds the runtime handle,
4//! cumulative `PerformanceStats::Struct`, the realtime
5//! collaboration-session map, and the
6//! `MessageCache::Struct`. Spawns three monitor tasks
7//! (`monitor_performance`, `cleanup_cache`,
8//! `monitor_collaboration_sessions`) on `start_monitoring`.
9//! The 12-method impl is kept in one file - tightly-coupled
10//! cluster.
11
12use std::{
13	collections::HashMap,
14	sync::{Arc, Mutex},
15	time::{Duration, SystemTime},
16};
17
18use tauri::Emitter;
19use tokio::time::interval;
20
21use crate::{
22	IPC::AdvancedFeatures::{
23		CachedMessage::Struct as CachedMessage,
24		CollaborationPermissions::Struct as CollaborationPermissions,
25		CollaborationSession::Struct as CollaborationSession,
26		MessageCache::Struct as MessageCache,
27		PerformanceStats::Struct as PerformanceStats,
28	},
29	RunTime::ApplicationRunTime::ApplicationRunTime,
30	dev_log,
31};
32
33#[derive(Clone)]
34pub struct Struct {
35	pub(super) runtime:Arc<ApplicationRunTime>,
36
37	pub(super) performance_stats:Arc<Mutex<PerformanceStats>>,
38
39	pub(super) collaboration_sessions:Arc<Mutex<HashMap<String, CollaborationSession>>>,
40
41	pub(super) message_cache:Arc<Mutex<MessageCache>>,
42}
43
44impl Struct {
45	pub fn new(runtime:Arc<ApplicationRunTime>) -> Self {
46		dev_log!("lifecycle", "Initializing advanced IPC features");
47
48		Self {
49			runtime,
50
51			performance_stats:Arc::new(Mutex::new(PerformanceStats {
52				total_messages_sent:0,
53				total_messages_received:0,
54				average_processing_time_ms:0.0,
55				peak_message_rate:0,
56				error_count:0,
57				last_update:SystemTime::now()
58					.duration_since(SystemTime::UNIX_EPOCH)
59					.unwrap_or_default()
60					.as_secs(),
61				connection_uptime:0,
62			})),
63
64			collaboration_sessions:Arc::new(Mutex::new(HashMap::new())),
65
66			message_cache:Arc::new(Mutex::new(MessageCache {
67				cached_messages:HashMap::new(),
68				cache_hits:0,
69				cache_misses:0,
70				cache_size:0,
71			})),
72		}
73	}
74
75	pub async fn start_monitoring(&self) -> Result<(), String> {
76		dev_log!("lifecycle", "Starting advanced monitoring");
77
78		let features1 = self.clone_features();
79
80		let features2 = self.clone_features();
81
82		let features3 = self.clone_features();
83
84		tokio::spawn(async move {
85			features1.monitor_performance().await;
86		});
87
88		tokio::spawn(async move {
89			features2.cleanup_cache().await;
90		});
91
92		tokio::spawn(async move {
93			features3.monitor_collaboration_sessions().await;
94		});
95
96		Ok(())
97	}
98
99	async fn monitor_performance(&self) {
100		let mut interval = interval(Duration::from_secs(10));
101
102		loop {
103			interval.tick().await;
104
105			let stats = self.calculate_performance_stats().await;
106
107			if let Err(e) = self.runtime.Environment.ApplicationHandle.emit("ipc-performance-stats", &stats) {
108				dev_log!("ipc", "error: [AdvancedFeatures] Failed to emit performance stats: {}", e);
109			}
110
111			dev_log!("lifecycle", "Performance stats updated");
112		}
113	}
114
115	async fn calculate_performance_stats(&self) -> PerformanceStats {
116		let mut stats = self.performance_stats.lock().unwrap();
117
118		stats.connection_uptime = SystemTime::now()
119			.duration_since(SystemTime::UNIX_EPOCH)
120			.unwrap_or_default()
121			.as_secs()
122			- stats.last_update;
123
124		stats.last_update = SystemTime::now()
125			.duration_since(SystemTime::UNIX_EPOCH)
126			.unwrap_or_default()
127			.as_secs();
128
129		stats.clone()
130	}
131
132	async fn cleanup_cache(&self) {
133		let mut interval = interval(Duration::from_secs(60));
134
135		loop {
136			interval.tick().await;
137
138			let current_time = SystemTime::now()
139				.duration_since(SystemTime::UNIX_EPOCH)
140				.unwrap_or_default()
141				.as_secs();
142
143			let mut cache = self.message_cache.lock().unwrap();
144
145			cache
146				.cached_messages
147				.retain(|_, cached_message| current_time < cached_message.timestamp + cached_message.ttl);
148
149			cache.cache_size = cache.cached_messages.len();
150
151			dev_log!("lifecycle", "Cache cleaned, {} entries remaining", cache.cache_size);
152		}
153	}
154
155	async fn monitor_collaboration_sessions(&self) {
156		let mut interval = interval(Duration::from_secs(30));
157
158		loop {
159			interval.tick().await;
160
161			let current_time = SystemTime::now()
162				.duration_since(SystemTime::UNIX_EPOCH)
163				.unwrap_or_default()
164				.as_secs();
165
166			let mut sessions = self.collaboration_sessions.lock().unwrap();
167
168			sessions.retain(|_, session| current_time - session.last_activity < 300);
169
170			let active_sessions:Vec<CollaborationSession> = sessions.values().cloned().collect();
171
172			if let Err(e) = self
173				.runtime
174				.Environment
175				.ApplicationHandle
176				.emit("collaboration-sessions-update", &active_sessions)
177			{
178				dev_log!("ipc", "error: [AdvancedFeatures] Failed to emit collaboration sessions: {}", e);
179			}
180
181			dev_log!("lifecycle", "Collaboration sessions monitored, {} active", sessions.len());
182		}
183	}
184
185	pub async fn cache_message(&self, message_id:String, data:serde_json::Value, ttl:u64) -> Result<(), String> {
186		let mut cache = self
187			.message_cache
188			.lock()
189			.map_err(|e| format!("Failed to access message cache: {}", e))?;
190
191		let cached_message = CachedMessage {
192			data,
193
194			timestamp:SystemTime::now()
195				.duration_since(SystemTime::UNIX_EPOCH)
196				.unwrap_or_default()
197				.as_secs(),
198
199			ttl,
200		};
201
202		cache.cached_messages.insert(message_id.clone(), cached_message);
203
204		cache.cache_size = cache.cached_messages.len();
205
206		dev_log!("lifecycle", "Message cached: {}, TTL: {}s", message_id, ttl);
207
208		Ok(())
209	}
210
211	pub async fn get_cached_message(&self, message_id:&str) -> Option<serde_json::Value> {
212		let mut cache = self.message_cache.lock().unwrap();
213
214		let result = cache
215			.cached_messages
216			.get(message_id)
217			.map(|cached_message| cached_message.data.clone());
218
219		if result.is_some() {
220			cache.cache_hits += 1;
221		} else {
222			cache.cache_misses += 1;
223		}
224
225		result
226	}
227
228	pub async fn create_collaboration_session(
229		&self,
230
231		session_id:String,
232
233		permissions:CollaborationPermissions,
234	) -> Result<(), String> {
235		let mut sessions = self
236			.collaboration_sessions
237			.lock()
238			.map_err(|e| format!("Failed to access collaboration sessions: {}", e))?;
239
240		let session = CollaborationSession {
241			session_id:session_id.clone(),
242
243			participants:Vec::new(),
244
245			active_documents:Vec::new(),
246
247			last_activity:SystemTime::now()
248				.duration_since(SystemTime::UNIX_EPOCH)
249				.unwrap_or_default()
250				.as_secs(),
251
252			permissions,
253		};
254
255		sessions.insert(session_id, session);
256
257		dev_log!("lifecycle", "Collaboration session created");
258
259		Ok(())
260	}
261
262	pub async fn add_participant(&self, session_id:&str, participant:String) -> Result<(), String> {
263		let mut sessions = self
264			.collaboration_sessions
265			.lock()
266			.map_err(|e| format!("Failed to access collaboration sessions: {}", e))?;
267
268		if let Some(session) = sessions.get_mut(session_id) {
269			if !session.participants.contains(&participant) {
270				session.participants.push(participant);
271
272				session.last_activity = SystemTime::now()
273					.duration_since(SystemTime::UNIX_EPOCH)
274					.unwrap_or_default()
275					.as_secs();
276
277				dev_log!("lifecycle", "Participant added to session: {}", session_id);
278			}
279		} else {
280			return Err(format!("Session not found: {}", session_id));
281		}
282
283		Ok(())
284	}
285
286	pub async fn record_message_statistics(&self, sent:bool, processing_time_ms:u64) {
287		let mut stats = self.performance_stats.lock().unwrap();
288
289		if sent {
290			stats.total_messages_sent += 1;
291		} else {
292			stats.total_messages_received += 1;
293		}
294
295		let total_messages = stats.total_messages_sent + stats.total_messages_received;
296
297		stats.average_processing_time_ms = (stats.average_processing_time_ms * (total_messages - 1) as f64
298			+ processing_time_ms as f64)
299			/ total_messages as f64;
300	}
301
302	pub async fn record_error(&self) {
303		let mut stats = self.performance_stats.lock().unwrap();
304
305		stats.error_count += 1;
306	}
307
308	pub async fn get_performance_stats(&self) -> Result<PerformanceStats, String> {
309		Ok(self.calculate_performance_stats().await)
310	}
311
312	pub async fn get_cache_stats(&self) -> Result<MessageCache, String> {
313		let cache = self.message_cache.lock().unwrap();
314
315		Ok(cache.clone())
316	}
317
318	pub async fn get_collaboration_sessions(&self) -> Vec<CollaborationSession> {
319		let sessions = self.collaboration_sessions.lock().unwrap();
320
321		sessions.values().cloned().collect()
322	}
323
324	pub(super) fn clone_features(&self) -> Self {
325		Self {
326			runtime:self.runtime.clone(),
327
328			performance_stats:self.performance_stats.clone(),
329
330			collaboration_sessions:self.collaboration_sessions.clone(),
331
332			message_cache:self.message_cache.clone(),
333		}
334	}
335}