Skip to main content

Mountain/IPC/Enhanced/ConnectionPool/
Pool.rs

1#![allow(non_snake_case)]
2
3//! `Pool::Struct` - bounded connection pool with health
4//! monitoring, idle/lifetime cleanup, wait-queue timeouts, and
5//! statistics. Acquire via `get_connection` (drops a permit on
6//! the inner `Semaphore`); return via `release_connection`.
7//! The struct + 18-method impl + Clone + tests stay in one
8//! file - tightly coupled cluster.
9
10use std::{
11	collections::HashMap,
12	sync::Arc,
13	time::{Duration, Instant},
14};
15
16use tokio::{
17	sync::{Mutex as AsyncMutex, Notify, RwLock, Semaphore},
18	time::{interval, timeout},
19};
20
21use crate::{
22	IPC::Enhanced::ConnectionPool::{
23		ConnectionHandle::Struct as ConnectionHandle,
24		HealthChecker::Struct as HealthChecker,
25		PoolConfig::Struct as PoolConfig,
26		PoolStats::Struct as PoolStats,
27	},
28	dev_log,
29};
30
31pub struct Struct {
32	pub config:PoolConfig,
33
34	pub connections:Arc<AsyncMutex<HashMap<String, ConnectionHandle>>>,
35
36	pub semaphore:Arc<Semaphore>,
37
38	pub wait_queue:Arc<AsyncMutex<Vec<Arc<Notify>>>>,
39
40	pub stats:Arc<RwLock<PoolStats>>,
41
42	pub health_checker:Arc<AsyncMutex<HealthChecker>>,
43
44	pub is_running:Arc<AsyncMutex<bool>>,
45}
46
47impl Struct {
48	pub fn new(config:PoolConfig) -> Self {
49		let max_connections = config.max_connections;
50
51		let min_connections = config.min_connections;
52
53		let pool = Self {
54			config:config.clone(),
55
56			connections:Arc::new(AsyncMutex::new(HashMap::new())),
57
58			semaphore:Arc::new(Semaphore::new(max_connections)),
59
60			wait_queue:Arc::new(AsyncMutex::new(Vec::new())),
61
62			stats:Arc::new(RwLock::new(PoolStats {
63				total_connections:0,
64				active_connections:0,
65				idle_connections:0,
66				healthy_connections:0,
67				max_connections,
68				min_connections,
69				wait_queue_size:0,
70				average_wait_time_ms:0.0,
71				total_operations:0,
72				successful_operations:0,
73				error_rate:0.0,
74			})),
75
76			health_checker:Arc::new(AsyncMutex::new(HealthChecker::new())),
77
78			is_running:Arc::new(AsyncMutex::new(false)),
79		};
80
81		dev_log!("ipc", "[ConnectionPool] Created pool with max {} connections", max_connections);
82
83		pool
84	}
85
86	pub async fn start(&self) -> Result<(), String> {
87		{
88			let mut running = self.is_running.lock().await;
89
90			if *running {
91				return Ok(());
92			}
93
94			*running = true;
95		}
96
97		self.start_health_monitoring().await;
98
99		self.start_connection_cleanup().await;
100
101		self.initialize_min_connections().await;
102
103		dev_log!("ipc", "[ConnectionPool] Started connection pool");
104
105		Ok(())
106	}
107
108	pub async fn stop(&self) -> Result<(), String> {
109		{
110			let mut running = self.is_running.lock().await;
111
112			if !*running {
113				return Ok(());
114			}
115
116			*running = false;
117		}
118
119		{
120			let mut connections = self.connections.lock().await;
121
122			connections.clear();
123		}
124
125		{
126			let mut wait_queue = self.wait_queue.lock().await;
127
128			for notifier in wait_queue.drain(..) {
129				notifier.notify_one();
130			}
131		}
132
133		dev_log!("ipc", "[ConnectionPool] Stopped connection pool");
134
135		Ok(())
136	}
137
138	pub async fn get_connection(&self) -> Result<ConnectionHandle, String> {
139		let start_time = Instant::now();
140
141		let _permit = timeout(
142			Duration::from_millis(self.config.connection_timeout_ms),
143			self.semaphore.acquire(),
144		)
145		.await
146		.map_err(|_| "Connection timeout".to_string())?
147		.map_err(|e| format!("Failed to acquire connection: {}", e))?;
148
149		let wait_time = start_time.elapsed().as_millis() as f64;
150
151		{
152			let mut stats = self.stats.write().await;
153
154			stats.average_wait_time_ms = (stats.average_wait_time_ms * stats.total_operations as f64 + wait_time)
155				/ (stats.total_operations as f64 + 1.0);
156		}
157
158		let connection = self.find_or_create_connection().await?;
159
160		{
161			let mut stats = self.stats.write().await;
162
163			stats.active_connections += 1;
164
165			stats.total_operations += 1;
166		}
167
168		dev_log!("ipc", "[ConnectionPool] Connection acquired: {}", connection.id);
169
170		Ok(connection)
171	}
172
173	pub async fn release_connection(&self, mut handle:ConnectionHandle) {
174		let connection_id = handle.id.clone();
175
176		handle.last_used = Instant::now();
177
178		{
179			let mut connections = self.connections.lock().await;
180
181			connections.insert(handle.id.clone(), handle.clone());
182		}
183
184		{
185			let mut stats = self.stats.write().await;
186
187			stats.active_connections = stats.active_connections.saturating_sub(1);
188
189			stats.idle_connections += 1;
190		}
191
192		drop(handle);
193
194		dev_log!("ipc", "[ConnectionPool] Connection released: {}", connection_id);
195	}
196
197	async fn find_or_create_connection(&self) -> Result<ConnectionHandle, String> {
198		let mut connections = self.connections.lock().await;
199
200		for (_id, handle) in connections.iter_mut() {
201			if handle.is_healthy() && handle.idle_time().as_millis() < self.config.idle_timeout_ms as u128 {
202				handle.last_used = Instant::now();
203
204				return Ok(handle.clone());
205			}
206		}
207
208		let new_handle = ConnectionHandle::new();
209
210		connections.insert(new_handle.id.clone(), new_handle.clone());
211
212		{
213			let mut stats = self.stats.write().await;
214
215			stats.total_connections += 1;
216
217			stats.healthy_connections += 1;
218		}
219
220		Ok(new_handle)
221	}
222
223	async fn start_health_monitoring(&self) {
224		let pool = Arc::new(self.clone());
225
226		tokio::spawn(async move {
227			let mut interval = interval(Duration::from_millis(pool.config.health_check_interval_ms));
228
229			while *pool.is_running.lock().await {
230				interval.tick().await;
231
232				if let Err(e) = pool.check_connection_health().await {
233					dev_log!("ipc", "error: [ConnectionPool] Health check failed: {}", e);
234				}
235			}
236		});
237	}
238
239	async fn start_connection_cleanup(&self) {
240		let pool = Arc::new(self.clone());
241
242		tokio::spawn(async move {
243			let mut interval = interval(Duration::from_secs(60));
244
245			while *pool.is_running.lock().await {
246				interval.tick().await;
247
248				let cleaned_count = pool.cleanup_stale_connections().await;
249				if cleaned_count > 0 {
250					dev_log!("ipc", "[ConnectionPool] Cleaned {} stale connections", cleaned_count);
251				}
252			}
253		});
254	}
255
256	async fn initialize_min_connections(&self) {
257		let current_count = self.connections.lock().await.len();
258
259		if current_count < self.config.min_connections {
260			let needed = self.config.min_connections - current_count;
261
262			for _ in 0..needed {
263				let handle = ConnectionHandle::new();
264
265				let mut connections = self.connections.lock().await;
266
267				connections.insert(handle.id.clone(), handle);
268			}
269
270			dev_log!("ipc", "[ConnectionPool] Initialized {} minimum connections", needed);
271		}
272	}
273
274	async fn check_connection_health(&self) -> Result<(), String> {
275		let mut connections = self.connections.lock().await;
276
277		let mut _health_checker = self.health_checker.lock().await;
278
279		let mut healthy_count = 0;
280
281		for (_id, handle) in connections.iter_mut() {
282			let is_healthy = _health_checker.check_connection_health(handle).await;
283
284			handle.update_health(is_healthy);
285
286			if handle.is_healthy() {
287				healthy_count += 1;
288			}
289		}
290
291		{
292			let mut stats = self.stats.write().await;
293
294			stats.healthy_connections = healthy_count;
295
296			stats.idle_connections = connections.len().saturating_sub(stats.active_connections);
297
298			if stats.total_operations > 0 {
299				stats.error_rate = 1.0 - (stats.successful_operations as f64 / stats.total_operations as f64);
300			}
301		}
302
303		Ok(())
304	}
305
306	pub async fn cleanup_stale_connections(&self) -> usize {
307		let mut connections = self.connections.lock().await;
308
309		let stale_ids:Vec<String> = connections
310			.iter()
311			.filter(|(_, handle)| {
312				handle.age().as_millis() > self.config.max_lifetime_ms as u128
313					|| handle.idle_time().as_millis() > self.config.idle_timeout_ms as u128
314					|| !handle.is_healthy()
315			})
316			.map(|(id, _)| id.clone())
317			.collect();
318
319		for id in &stale_ids {
320			connections.remove(id);
321		}
322
323		{
324			let mut stats = self.stats.write().await;
325
326			stats.total_connections = connections.len();
327
328			stats.healthy_connections = connections.values().filter(|h| h.is_healthy()).count();
329		}
330
331		stale_ids.len()
332	}
333
334	pub async fn get_stats(&self) -> PoolStats { self.stats.read().await.clone() }
335
336	pub async fn get_active_count(&self) -> usize { self.stats.read().await.active_connections }
337
338	pub async fn get_healthy_count(&self) -> usize { self.stats.read().await.healthy_connections }
339
340	pub async fn is_running(&self) -> bool { *self.is_running.lock().await }
341
342	pub fn default_pool() -> Self { Self::new(PoolConfig::default()) }
343
344	pub fn high_performance_pool() -> Self {
345		Self::new(PoolConfig {
346			max_connections:50,
347			min_connections:10,
348			connection_timeout_ms:10000,
349			max_lifetime_ms:180000,
350			idle_timeout_ms:30000,
351			health_check_interval_ms:15000,
352		})
353	}
354
355	pub fn conservative_pool() -> Self {
356		Self::new(PoolConfig {
357			max_connections:5,
358			min_connections:1,
359			connection_timeout_ms:60000,
360			max_lifetime_ms:600000,
361			idle_timeout_ms:120000,
362			health_check_interval_ms:60000,
363		})
364	}
365
366	pub fn calculate_optimal_pool_size() -> usize {
367		let num_cpus = num_cpus::get();
368
369		(num_cpus * 2).max(4).min(50)
370	}
371}
372
373impl Clone for Struct {
374	fn clone(&self) -> Self {
375		Self {
376			config:self.config.clone(),
377
378			connections:self.connections.clone(),
379
380			semaphore:self.semaphore.clone(),
381
382			wait_queue:self.wait_queue.clone(),
383
384			stats:self.stats.clone(),
385
386			health_checker:self.health_checker.clone(),
387
388			is_running:self.is_running.clone(),
389		}
390	}
391}