Mountain/IPC/Enhanced/ConnectionPool/
Pool.rs1#![allow(non_snake_case)]
2
3use std::{
11 collections::HashMap,
12 sync::Arc,
13 time::{Duration, Instant},
14};
15
16use tokio::{
17 sync::{Mutex as AsyncMutex, Notify, RwLock, Semaphore},
18 time::{interval, timeout},
19};
20
21use crate::{
22 IPC::Enhanced::ConnectionPool::{
23 ConnectionHandle::Struct as ConnectionHandle,
24 HealthChecker::Struct as HealthChecker,
25 PoolConfig::Struct as PoolConfig,
26 PoolStats::Struct as PoolStats,
27 },
28 dev_log,
29};
30
31pub struct Struct {
32 pub config:PoolConfig,
33
34 pub connections:Arc<AsyncMutex<HashMap<String, ConnectionHandle>>>,
35
36 pub semaphore:Arc<Semaphore>,
37
38 pub wait_queue:Arc<AsyncMutex<Vec<Arc<Notify>>>>,
39
40 pub stats:Arc<RwLock<PoolStats>>,
41
42 pub health_checker:Arc<AsyncMutex<HealthChecker>>,
43
44 pub is_running:Arc<AsyncMutex<bool>>,
45}
46
47impl Struct {
48 pub fn new(config:PoolConfig) -> Self {
49 let max_connections = config.max_connections;
50
51 let min_connections = config.min_connections;
52
53 let pool = Self {
54 config:config.clone(),
55
56 connections:Arc::new(AsyncMutex::new(HashMap::new())),
57
58 semaphore:Arc::new(Semaphore::new(max_connections)),
59
60 wait_queue:Arc::new(AsyncMutex::new(Vec::new())),
61
62 stats:Arc::new(RwLock::new(PoolStats {
63 total_connections:0,
64 active_connections:0,
65 idle_connections:0,
66 healthy_connections:0,
67 max_connections,
68 min_connections,
69 wait_queue_size:0,
70 average_wait_time_ms:0.0,
71 total_operations:0,
72 successful_operations:0,
73 error_rate:0.0,
74 })),
75
76 health_checker:Arc::new(AsyncMutex::new(HealthChecker::new())),
77
78 is_running:Arc::new(AsyncMutex::new(false)),
79 };
80
81 dev_log!("ipc", "[ConnectionPool] Created pool with max {} connections", max_connections);
82
83 pool
84 }
85
86 pub async fn start(&self) -> Result<(), String> {
87 {
88 let mut running = self.is_running.lock().await;
89
90 if *running {
91 return Ok(());
92 }
93
94 *running = true;
95 }
96
97 self.start_health_monitoring().await;
98
99 self.start_connection_cleanup().await;
100
101 self.initialize_min_connections().await;
102
103 dev_log!("ipc", "[ConnectionPool] Started connection pool");
104
105 Ok(())
106 }
107
108 pub async fn stop(&self) -> Result<(), String> {
109 {
110 let mut running = self.is_running.lock().await;
111
112 if !*running {
113 return Ok(());
114 }
115
116 *running = false;
117 }
118
119 {
120 let mut connections = self.connections.lock().await;
121
122 connections.clear();
123 }
124
125 {
126 let mut wait_queue = self.wait_queue.lock().await;
127
128 for notifier in wait_queue.drain(..) {
129 notifier.notify_one();
130 }
131 }
132
133 dev_log!("ipc", "[ConnectionPool] Stopped connection pool");
134
135 Ok(())
136 }
137
138 pub async fn get_connection(&self) -> Result<ConnectionHandle, String> {
139 let start_time = Instant::now();
140
141 let _permit = timeout(
142 Duration::from_millis(self.config.connection_timeout_ms),
143 self.semaphore.acquire(),
144 )
145 .await
146 .map_err(|_| "Connection timeout".to_string())?
147 .map_err(|e| format!("Failed to acquire connection: {}", e))?;
148
149 let wait_time = start_time.elapsed().as_millis() as f64;
150
151 {
152 let mut stats = self.stats.write().await;
153
154 stats.average_wait_time_ms = (stats.average_wait_time_ms * stats.total_operations as f64 + wait_time)
155 / (stats.total_operations as f64 + 1.0);
156 }
157
158 let connection = self.find_or_create_connection().await?;
159
160 {
161 let mut stats = self.stats.write().await;
162
163 stats.active_connections += 1;
164
165 stats.total_operations += 1;
166 }
167
168 dev_log!("ipc", "[ConnectionPool] Connection acquired: {}", connection.id);
169
170 Ok(connection)
171 }
172
173 pub async fn release_connection(&self, mut handle:ConnectionHandle) {
174 let connection_id = handle.id.clone();
175
176 handle.last_used = Instant::now();
177
178 {
179 let mut connections = self.connections.lock().await;
180
181 connections.insert(handle.id.clone(), handle.clone());
182 }
183
184 {
185 let mut stats = self.stats.write().await;
186
187 stats.active_connections = stats.active_connections.saturating_sub(1);
188
189 stats.idle_connections += 1;
190 }
191
192 drop(handle);
193
194 dev_log!("ipc", "[ConnectionPool] Connection released: {}", connection_id);
195 }
196
197 async fn find_or_create_connection(&self) -> Result<ConnectionHandle, String> {
198 let mut connections = self.connections.lock().await;
199
200 for (_id, handle) in connections.iter_mut() {
201 if handle.is_healthy() && handle.idle_time().as_millis() < self.config.idle_timeout_ms as u128 {
202 handle.last_used = Instant::now();
203
204 return Ok(handle.clone());
205 }
206 }
207
208 let new_handle = ConnectionHandle::new();
209
210 connections.insert(new_handle.id.clone(), new_handle.clone());
211
212 {
213 let mut stats = self.stats.write().await;
214
215 stats.total_connections += 1;
216
217 stats.healthy_connections += 1;
218 }
219
220 Ok(new_handle)
221 }
222
223 async fn start_health_monitoring(&self) {
224 let pool = Arc::new(self.clone());
225
226 tokio::spawn(async move {
227 let mut interval = interval(Duration::from_millis(pool.config.health_check_interval_ms));
228
229 while *pool.is_running.lock().await {
230 interval.tick().await;
231
232 if let Err(e) = pool.check_connection_health().await {
233 dev_log!("ipc", "error: [ConnectionPool] Health check failed: {}", e);
234 }
235 }
236 });
237 }
238
239 async fn start_connection_cleanup(&self) {
240 let pool = Arc::new(self.clone());
241
242 tokio::spawn(async move {
243 let mut interval = interval(Duration::from_secs(60));
244
245 while *pool.is_running.lock().await {
246 interval.tick().await;
247
248 let cleaned_count = pool.cleanup_stale_connections().await;
249 if cleaned_count > 0 {
250 dev_log!("ipc", "[ConnectionPool] Cleaned {} stale connections", cleaned_count);
251 }
252 }
253 });
254 }
255
256 async fn initialize_min_connections(&self) {
257 let current_count = self.connections.lock().await.len();
258
259 if current_count < self.config.min_connections {
260 let needed = self.config.min_connections - current_count;
261
262 for _ in 0..needed {
263 let handle = ConnectionHandle::new();
264
265 let mut connections = self.connections.lock().await;
266
267 connections.insert(handle.id.clone(), handle);
268 }
269
270 dev_log!("ipc", "[ConnectionPool] Initialized {} minimum connections", needed);
271 }
272 }
273
274 async fn check_connection_health(&self) -> Result<(), String> {
275 let mut connections = self.connections.lock().await;
276
277 let mut _health_checker = self.health_checker.lock().await;
278
279 let mut healthy_count = 0;
280
281 for (_id, handle) in connections.iter_mut() {
282 let is_healthy = _health_checker.check_connection_health(handle).await;
283
284 handle.update_health(is_healthy);
285
286 if handle.is_healthy() {
287 healthy_count += 1;
288 }
289 }
290
291 {
292 let mut stats = self.stats.write().await;
293
294 stats.healthy_connections = healthy_count;
295
296 stats.idle_connections = connections.len().saturating_sub(stats.active_connections);
297
298 if stats.total_operations > 0 {
299 stats.error_rate = 1.0 - (stats.successful_operations as f64 / stats.total_operations as f64);
300 }
301 }
302
303 Ok(())
304 }
305
306 pub async fn cleanup_stale_connections(&self) -> usize {
307 let mut connections = self.connections.lock().await;
308
309 let stale_ids:Vec<String> = connections
310 .iter()
311 .filter(|(_, handle)| {
312 handle.age().as_millis() > self.config.max_lifetime_ms as u128
313 || handle.idle_time().as_millis() > self.config.idle_timeout_ms as u128
314 || !handle.is_healthy()
315 })
316 .map(|(id, _)| id.clone())
317 .collect();
318
319 for id in &stale_ids {
320 connections.remove(id);
321 }
322
323 {
324 let mut stats = self.stats.write().await;
325
326 stats.total_connections = connections.len();
327
328 stats.healthy_connections = connections.values().filter(|h| h.is_healthy()).count();
329 }
330
331 stale_ids.len()
332 }
333
334 pub async fn get_stats(&self) -> PoolStats { self.stats.read().await.clone() }
335
336 pub async fn get_active_count(&self) -> usize { self.stats.read().await.active_connections }
337
338 pub async fn get_healthy_count(&self) -> usize { self.stats.read().await.healthy_connections }
339
340 pub async fn is_running(&self) -> bool { *self.is_running.lock().await }
341
342 pub fn default_pool() -> Self { Self::new(PoolConfig::default()) }
343
344 pub fn high_performance_pool() -> Self {
345 Self::new(PoolConfig {
346 max_connections:50,
347 min_connections:10,
348 connection_timeout_ms:10000,
349 max_lifetime_ms:180000,
350 idle_timeout_ms:30000,
351 health_check_interval_ms:15000,
352 })
353 }
354
355 pub fn conservative_pool() -> Self {
356 Self::new(PoolConfig {
357 max_connections:5,
358 min_connections:1,
359 connection_timeout_ms:60000,
360 max_lifetime_ms:600000,
361 idle_timeout_ms:120000,
362 health_check_interval_ms:60000,
363 })
364 }
365
366 pub fn calculate_optimal_pool_size() -> usize {
367 let num_cpus = num_cpus::get();
368
369 (num_cpus * 2).max(4).min(50)
370 }
371}
372
373impl Clone for Struct {
374 fn clone(&self) -> Self {
375 Self {
376 config:self.config.clone(),
377
378 connections:self.connections.clone(),
379
380 semaphore:self.semaphore.clone(),
381
382 wait_queue:self.wait_queue.clone(),
383
384 stats:self.stats.clone(),
385
386 health_checker:self.health_checker.clone(),
387
388 is_running:self.is_running.clone(),
389 }
390 }
391}