DevelopmentNodeEnvironment_MicrosoftVSCodeDependency_22NodeVersion_Bundle_Clean_Debug_ElectronProfile_EsbuildCompiler_Mountain/Vine/Server/Notification/
OutputChannelCoalesce.rs1use std::{
39 collections::HashMap,
40 sync::{Mutex as StandardMutex, OnceLock},
41 time::Duration,
42};
43
44use serde_json::{Value, json};
45use tauri::{AppHandle, Emitter};
46use tokio::sync::mpsc::{UnboundedSender, unbounded_channel};
47
48use crate::dev_log;
49
50const COALESCE_WINDOW:Duration = Duration::from_millis(50);
55
56const MAX_BUFFERED_BYTES:usize = 64 * 1024;
60
61struct PendingAppend {
62 Channel:String,
63
64 Value:String,
65}
66
67struct CoalesceChannel {
68 Sender:UnboundedSender<(AppHandle, PendingAppend)>,
69}
70
71static COALESCE_CHANNEL:OnceLock<CoalesceChannel> = OnceLock::new();
72
73fn IsDisabled() -> bool { matches!(std::env::var("OutputCoalesce").as_deref(), Ok("0") | Ok("false")) }
74
75fn GetOrInitChannel() -> &'static CoalesceChannel {
76 COALESCE_CHANNEL.get_or_init(|| {
77 let (Tx, mut Rx) = unbounded_channel::<(AppHandle, PendingAppend)>();
78
79 tokio::spawn(async move {
80 let Buffers:StandardMutex<HashMap<String, String>> = StandardMutex::new(HashMap::new());
84
85 loop {
86 let Received = Rx.recv().await;
87
88 let (Handle, First) = match Received {
89 None => break,
90 Some(Pair) => Pair,
91 };
92
93 {
95 let mut Guard = match Buffers.lock() {
96 Ok(G) => G,
97 Err(_) => continue,
98 };
99
100 let Slot = Guard.entry(First.Channel.clone()).or_default();
101
102 Slot.push_str(&First.Value);
103
104 if Slot.len() >= MAX_BUFFERED_BYTES {
105 let Payload = std::mem::take(Slot);
106
107 drop(Guard);
108
109 FlushOne(&Handle, &First.Channel, &Payload);
110
111 continue;
112 }
113 }
114
115 let mut Drain:Vec<(AppHandle, PendingAppend)> = Vec::new();
117
118 let Drained = Rx.recv_many(&mut Drain, 4096).await;
119
120 let _ = Drained;
121
122 for (_, Pending) in Drain.drain(..) {
123 if let Ok(mut Guard) = Buffers.lock() {
124 let Slot = Guard.entry(Pending.Channel).or_default();
125 Slot.push_str(&Pending.Value);
126 }
127 }
128
129 tokio::time::sleep(COALESCE_WINDOW).await;
133
134 let mut LateDrain:Vec<(AppHandle, PendingAppend)> = Vec::new();
135
136 let _ = Rx.recv_many(&mut LateDrain, 4096).await;
137
138 for (_, Pending) in LateDrain.drain(..) {
139 if let Ok(mut Guard) = Buffers.lock() {
140 let Slot = Guard.entry(Pending.Channel).or_default();
141 Slot.push_str(&Pending.Value);
142 }
143 }
144
145 let HandleForFlush = Handle.clone();
147
148 let Snapshots = {
149 match Buffers.lock() {
150 Ok(mut Guard) => {
151 Guard
152 .iter_mut()
153 .filter(|(_, V)| !V.is_empty())
154 .map(|(K, V)| (K.clone(), std::mem::take(V)))
155 .collect::<Vec<_>>()
156 },
157 Err(_) => continue,
158 }
159 };
160
161 for (Channel, Payload) in Snapshots {
162 FlushOne(&HandleForFlush, &Channel, &Payload);
163 }
164 }
165 });
166
167 CoalesceChannel { Sender:Tx }
168 })
169}
170
171fn FlushOne(Handle:&AppHandle, Channel:&str, Payload:&str) {
172 let _ = Handle.emit(
173 "sky://output/append",
174 json!({
175 "channel": Channel,
176 "value": Payload,
177 }),
178 );
179
180 let IsGitFamily = Channel.eq_ignore_ascii_case("git")
184 || Channel.eq_ignore_ascii_case("source control")
185 || Channel.eq_ignore_ascii_case("scm");
186
187 let LineCount = Payload.matches('\n').count();
188
189 if IsGitFamily {
190 dev_log!(
191 "grpc",
192 "[OutputChannel:{}] flush bytes={} lines~{}",
193 Channel,
194 Payload.len(),
195 LineCount
196 );
197 } else {
198 dev_log!(
199 "output-verbose",
200 "[OutputChannel] flush channel={} bytes={} lines~{}",
201 Channel,
202 Payload.len(),
203 LineCount
204 );
205 }
206}
207
208pub fn TryEnqueue(Handle:&AppHandle, Channel:String, Value:String) -> bool {
213 if IsDisabled() {
214 return false;
215 }
216
217 let Ch = GetOrInitChannel();
218
219 let _ = Ch.Sender.send((Handle.clone(), PendingAppend { Channel, Value }));
220
221 true
222}