Skip to main content

Mountain/IPC/Enhanced/MessageCompressor/
Compressor.rs

1#![allow(non_snake_case)]
2
3//! `Compressor::Struct` - message batching + compression
4//! engine. Buffers messages until size or time triggers a
5//! flush, then emits a `CompressedBatch::Struct` using the
6//! configured algorithm. Struct + 14-method impl + utility
7//! functions stay in one file - tightly coupled cluster.
8
9use std::{
10	collections::VecDeque,
11	io::{Read, Write},
12};
13
14use bincode::serde::{decode_from_slice, encode_to_vec};
15use brotli::{CompressorReader, CompressorWriter, enc::BrotliEncoderParams};
16use flate2::{
17	Compression,
18	write::{GzEncoder, ZlibEncoder},
19};
20use tokio::time::Instant;
21
22use crate::IPC::Enhanced::MessageCompressor::{
23	BatchConfig::Struct as BatchConfig,
24	BatchStats::Struct as BatchStats,
25	CompressedBatch::Struct as CompressedBatch,
26	CompressionAlgorithm::Enum as CompressionAlgorithm,
27	CompressionInfo::Struct as CompressionInfo,
28	CompressionLevel::Enum as CompressionLevel,
29};
30
31pub struct Struct {
32	pub(super) Config:BatchConfig,
33
34	pub(super) CurrentBatch:VecDeque<Vec<u8>>,
35
36	pub(super) BatchStartTime:Option<Instant>,
37
38	pub(super) BatchSizeBytes:usize,
39}
40
41impl Struct {
42	pub fn new(config:BatchConfig) -> Self {
43		Self {
44			Config:config,
45
46			CurrentBatch:VecDeque::new(),
47
48			BatchStartTime:None,
49
50			BatchSizeBytes:0,
51		}
52	}
53
54	pub fn add_message(&mut self, MessageData:&[u8]) -> bool {
55		let MessageSize = MessageData.len();
56
57		let _should_compress = MessageSize >= self.Config.CompressionThresholdBytes;
58
59		if self.BatchSizeBytes + MessageSize > self.Config.MaxBatchSize * 1024 {
60			return false;
61		}
62
63		self.CurrentBatch.push_back(MessageData.to_vec());
64
65		self.BatchSizeBytes += MessageSize;
66
67		if self.BatchStartTime.is_none() {
68			self.BatchStartTime = Some(Instant::now());
69		}
70
71		true
72	}
73
74	pub fn should_flush(&self) -> bool {
75		if self.CurrentBatch.is_empty() {
76			return false;
77		}
78
79		if self.CurrentBatch.len() >= self.Config.MaxBatchSize {
80			return true;
81		}
82
83		if let Some(start_time) = self.BatchStartTime {
84			let elapsed = start_time.elapsed();
85
86			if elapsed.as_millis() >= self.Config.MaxBatchDelayMs as u128 {
87				return true;
88			}
89		}
90
91		false
92	}
93
94	pub fn flush_batch(&mut self) -> Result<CompressedBatch, String> {
95		if self.CurrentBatch.is_empty() {
96			return Err("No messages in batch to flush".to_string());
97		}
98
99		let BatchMessages:Vec<Vec<u8>> = self.CurrentBatch.drain(..).collect();
100
101		let total_size = self.BatchSizeBytes;
102
103		self.BatchStartTime = None;
104
105		self.BatchSizeBytes = 0;
106
107		let config = bincode::config::standard();
108
109		let serialized_batch =
110			encode_to_vec(&BatchMessages, config).map_err(|e| format!("Failed to serialize batch: {}", e))?;
111
112		let (CompressedData, compression_info) = if total_size >= self.Config.CompressionThresholdBytes {
113			self.compress_data(&serialized_batch).map(|(data, info)| (Some(data), info))
114		} else {
115			Ok((None, CompressionInfo::none()))
116		}?;
117
118		Ok(CompressedBatch {
119			messages_count:BatchMessages.len(),
120			original_size:total_size,
121			compressed_size:CompressedData.as_ref().map(|d| d.len()).unwrap_or(total_size),
122			compressed_data:CompressedData,
123			compression_info,
124			timestamp:std::time::SystemTime::now()
125				.duration_since(std::time::UNIX_EPOCH)
126				.unwrap_or_default()
127				.as_millis() as u64,
128		})
129	}
130
131	fn compress_data(&self, data:&[u8]) -> Result<(Vec<u8>, CompressionInfo), String> {
132		match self.Config.Algorithm {
133			CompressionAlgorithm::Brotli => self.compress_brotli(data),
134
135			CompressionAlgorithm::Gzip => self.compress_gzip(data),
136
137			CompressionAlgorithm::Zlib => self.compress_zlib(data),
138		}
139	}
140
141	fn compress_brotli(&self, data:&[u8]) -> Result<(Vec<u8>, CompressionInfo), String> {
142		let mut params = BrotliEncoderParams::default();
143
144		params.quality = self.Config.CompressionLevel as i32;
145
146		let mut compressed = Vec::new();
147
148		{
149			let mut writer = CompressorWriter::with_params(&mut compressed, data.len().try_into().unwrap(), &params);
150
151			std::io::Write::write_all(&mut writer, data).map_err(|e| format!("Brotli compression failed: {}", e))?;
152
153			writer.flush().map_err(|e| format!("Brotli flush failed: {}", e))?;
154		}
155
156		let ratio = data.len() as f64 / compressed.len() as f64;
157
158		Ok((
159			compressed,
160			CompressionInfo { algorithm:"brotli".to_string(), level:self.Config.CompressionLevel as u32, ratio },
161		))
162	}
163
164	fn compress_gzip(&self, data:&[u8]) -> Result<(Vec<u8>, CompressionInfo), String> {
165		let mut encoder = GzEncoder::new(Vec::new(), Compression::new(self.Config.CompressionLevel as u32));
166
167		encoder.write_all(data).map_err(|e| format!("Gzip compression failed: {}", e))?;
168
169		let compressed = encoder.finish().map_err(|e| format!("Gzip finish failed: {}", e))?;
170
171		let ratio = data.len() as f64 / compressed.len() as f64;
172
173		Ok((
174			compressed,
175			CompressionInfo { algorithm:"gzip".to_string(), level:self.Config.CompressionLevel as u32, ratio },
176		))
177	}
178
179	fn compress_zlib(&self, data:&[u8]) -> Result<(Vec<u8>, CompressionInfo), String> {
180		let mut encoder = ZlibEncoder::new(Vec::new(), Compression::new(self.Config.CompressionLevel as u32));
181
182		encoder.write_all(data).map_err(|e| format!("Zlib compression failed: {}", e))?;
183
184		let compressed = encoder.finish().map_err(|e| format!("Zlib finish failed: {}", e))?;
185
186		let ratio = data.len() as f64 / compressed.len() as f64;
187
188		Ok((
189			compressed,
190			CompressionInfo { algorithm:"zlib".to_string(), level:self.Config.CompressionLevel as u32, ratio },
191		))
192	}
193
194	pub fn decompress_batch(&self, batch:&CompressedBatch) -> Result<Vec<Vec<u8>>, String> {
195		let data = if let Some(ref compressed_data) = batch.compressed_data {
196			self.decompress_data(compressed_data, &batch.compression_info.algorithm)?
197		} else {
198			encode_to_vec(&batch, bincode::config::standard()).map_err(|e| format!("Serialization failed: {}", e))?
199		};
200
201		let (decoded, _) = decode_from_slice::<Vec<Vec<u8>>, _>(&data, bincode::config::standard())
202			.map_err(|e| format!("Failed to deserialize batch: {}", e))?;
203
204		Ok(decoded)
205	}
206
207	fn decompress_data(&self, data:&[u8], algorithm:&str) -> Result<Vec<u8>, String> {
208		match algorithm {
209			"brotli" => self.decompress_brotli(data),
210
211			"gzip" => self.decompress_gzip(data),
212
213			"zlib" => self.decompress_zlib(data),
214
215			_ => Err(format!("Unsupported compression algorithm: {}", algorithm)),
216		}
217	}
218
219	fn decompress_brotli(&self, data:&[u8]) -> Result<Vec<u8>, String> {
220		let mut decompressed = Vec::new();
221
222		let mut reader = CompressorReader::new(data, 0, data.len().try_into().unwrap(), data.len().try_into().unwrap());
223
224		std::io::Read::read_to_end(&mut reader, &mut decompressed)
225			.map_err(|e| format!("Brotli decompression failed: {}", e))?;
226
227		Ok(decompressed)
228	}
229
230	fn decompress_gzip(&self, data:&[u8]) -> Result<Vec<u8>, String> {
231		use flate2::read::GzDecoder;
232
233		let mut decoder = GzDecoder::new(data);
234
235		let mut decompressed = Vec::new();
236
237		decoder
238			.read_to_end(&mut decompressed)
239			.map_err(|e| format!("Gzip decompression failed: {}", e))?;
240
241		Ok(decompressed)
242	}
243
244	fn decompress_zlib(&self, data:&[u8]) -> Result<Vec<u8>, String> {
245		use flate2::read::ZlibDecoder;
246
247		let mut decoder = ZlibDecoder::new(data);
248
249		let mut decompressed = Vec::new();
250
251		decoder
252			.read_to_end(&mut decompressed)
253			.map_err(|e| format!("Zlib decompression failed: {}", e))?;
254
255		Ok(decompressed)
256	}
257
258	pub fn get_batch_stats(&self) -> BatchStats {
259		BatchStats {
260			messages_count:self.CurrentBatch.len(),
261
262			total_size_bytes:self.BatchSizeBytes,
263
264			batch_age_ms:self.BatchStartTime.map(|t| t.elapsed().as_millis() as u64).unwrap_or(0),
265		}
266	}
267
268	pub fn clear_batch(&mut self) {
269		self.CurrentBatch.clear();
270
271		self.BatchStartTime = None;
272
273		self.BatchSizeBytes = 0;
274	}
275
276	pub fn compress_single_message(
277		message_data:&[u8],
278
279		algorithm:CompressionAlgorithm,
280
281		level:CompressionLevel,
282	) -> Result<(Vec<u8>, CompressionInfo), String> {
283		let config = BatchConfig { Algorithm:algorithm, CompressionLevel:level, ..Default::default() };
284
285		let compressor = Self::new(config);
286
287		compressor.compress_data(message_data)
288	}
289
290	pub fn calculate_compression_ratio(original_size:usize, compressed_size:usize) -> f64 {
291		if compressed_size == 0 {
292			return 0.0;
293		}
294
295		original_size as f64 / compressed_size as f64
296	}
297
298	pub fn estimate_savings(original_size:usize, expected_ratio:f64) -> usize {
299		(original_size as f64 * (1.0 - 1.0 / expected_ratio)) as usize
300	}
301}