Skip to main content

DevelopmentNodeEnvironment_MicrosoftVSCodeDependency_22NodeVersion_Bundle_Clean_Debug_ElectronProfile_EsbuildCompiler_Mountain/Vine/Server/Notification/
OutputChannelCoalesce.rs

1//! Per-channel coalescing buffer for `outputChannel.append` notifications.
2//!
3//! Cocoon's Git extension emits 30+ `append` notifications per `git status`
4//! (one per `[trace] [OperationManager][...]` line, one per executed
5//! sub-command). Each one previously:
6//!
7//!   1. Crossed the Cocoon → Mountain gRPC notification boundary.
8//!   2. Fired its own `Tauri::Emitter::emit("sky://output/append")` round-trip.
9//!   3. Wrote its own dev_log entry.
10//!
11//! For a workspace with the Git extension actively probing repo state on
12//! file changes, the volume of `[OperationManager]` traces alone accounted
13//! for ~1.9k lines of one 28k-line session log.
14//!
15//! This atom buffers appends per-channel for a short window
16//! (`COALESCE_WINDOW`) and flushes the concatenated payload as a single
17//! Sky emit + a single dev_log line. The downstream Output panel still
18//! sees identical text - just delivered in larger chunks - which matches
19//! the user-perceived UX of an output channel (it scrolls in chunks, not
20//! character-by-character).
21//!
22//! ## Why this is safe
23//!
24//! - Per-channel buffer means ordering is preserved within a channel.
25//! - Append-only semantics mean partial-payload visibility cannot expose torn
26//!   writes - the buffered text is always a prefix of the eventual full
27//!   payload.
28//! - `Tauri::Emitter` serialises emits per channel; the flush task running on
29//!   the tokio runtime keeps the same back-pressure shape the per-call path
30//!   had.
31//!
32//! ## Disable hook
33//!
34//! `OutputCoalesce=0` reverts to per-append emit (debugging
35//! synchronisation issues where a single append must be flushed
36//! immediately to disk).
37
38use std::{
39	collections::HashMap,
40	sync::{Mutex as StandardMutex, OnceLock},
41	time::Duration,
42};
43
44use serde_json::{Value, json};
45use tauri::{AppHandle, Emitter};
46use tokio::sync::mpsc::{UnboundedSender, unbounded_channel};
47
48use crate::dev_log;
49
50/// Maximum delay between an append arriving and its flush to Sky. Tuned
51/// against the FSEvents / Git-extension 16ms tick - one frame is enough
52/// for a `git status` burst to fully accumulate without introducing a
53/// human-perceptible scroll lag.
54const COALESCE_WINDOW:Duration = Duration::from_millis(50);
55
56/// Maximum buffered bytes per channel before a forced flush. Caps memory
57/// for any channel emitting unbounded text (a build extension piping
58/// `cargo build` stdout) before the timer fires.
59const MAX_BUFFERED_BYTES:usize = 64 * 1024;
60
61struct PendingAppend {
62	Channel:String,
63
64	Value:String,
65}
66
67struct CoalesceChannel {
68	Sender:UnboundedSender<(AppHandle, PendingAppend)>,
69}
70
71static COALESCE_CHANNEL:OnceLock<CoalesceChannel> = OnceLock::new();
72
73fn IsDisabled() -> bool { matches!(std::env::var("OutputCoalesce").as_deref(), Ok("0") | Ok("false")) }
74
75fn GetOrInitChannel() -> &'static CoalesceChannel {
76	COALESCE_CHANNEL.get_or_init(|| {
77		let (Tx, mut Rx) = unbounded_channel::<(AppHandle, PendingAppend)>();
78
79		tokio::spawn(async move {
80			// Per-channel pending buffer. Two-level map: channel name →
81			// accumulated text. AppHandle is shared across channels (one
82			// per process) so we stash it alongside.
83			let Buffers:StandardMutex<HashMap<String, String>> = StandardMutex::new(HashMap::new());
84
85			loop {
86				let Received = Rx.recv().await;
87
88				let (Handle, First) = match Received {
89					None => break,
90					Some(Pair) => Pair,
91				};
92
93				// Append to per-channel buffer.
94				{
95					let mut Guard = match Buffers.lock() {
96						Ok(G) => G,
97						Err(_) => continue,
98					};
99
100					let Slot = Guard.entry(First.Channel.clone()).or_default();
101
102					Slot.push_str(&First.Value);
103
104					if Slot.len() >= MAX_BUFFERED_BYTES {
105						let Payload = std::mem::take(Slot);
106
107						drop(Guard);
108
109						FlushOne(&Handle, &First.Channel, &Payload);
110
111						continue;
112					}
113				}
114
115				// Drain everything already queued without blocking.
116				let mut Drain:Vec<(AppHandle, PendingAppend)> = Vec::new();
117
118				let Drained = Rx.recv_many(&mut Drain, 4096).await;
119
120				let _ = Drained;
121
122				for (_, Pending) in Drain.drain(..) {
123					if let Ok(mut Guard) = Buffers.lock() {
124						let Slot = Guard.entry(Pending.Channel).or_default();
125						Slot.push_str(&Pending.Value);
126					}
127				}
128
129				// Window pause - let stragglers accumulate inside the
130				// 50 ms frame. Anything received during the sleep is
131				// drained again before flush.
132				tokio::time::sleep(COALESCE_WINDOW).await;
133
134				let mut LateDrain:Vec<(AppHandle, PendingAppend)> = Vec::new();
135
136				let _ = Rx.recv_many(&mut LateDrain, 4096).await;
137
138				for (_, Pending) in LateDrain.drain(..) {
139					if let Ok(mut Guard) = Buffers.lock() {
140						let Slot = Guard.entry(Pending.Channel).or_default();
141						Slot.push_str(&Pending.Value);
142					}
143				}
144
145				// Flush every non-empty channel.
146				let HandleForFlush = Handle.clone();
147
148				let Snapshots = {
149					match Buffers.lock() {
150						Ok(mut Guard) => {
151							Guard
152								.iter_mut()
153								.filter(|(_, V)| !V.is_empty())
154								.map(|(K, V)| (K.clone(), std::mem::take(V)))
155								.collect::<Vec<_>>()
156						},
157						Err(_) => continue,
158					}
159				};
160
161				for (Channel, Payload) in Snapshots {
162					FlushOne(&HandleForFlush, &Channel, &Payload);
163				}
164			}
165		});
166
167		CoalesceChannel { Sender:Tx }
168	})
169}
170
171fn FlushOne(Handle:&AppHandle, Channel:&str, Payload:&str) {
172	let _ = Handle.emit(
173		"sky://output/append",
174		json!({
175			"channel": Channel,
176			"value": Payload,
177		}),
178	);
179
180	// One log line per flush instead of one per append. The git
181	// channel keeps its `grpc`-tag visibility for SCM activation
182	// diagnosis; other channels stay under `output-verbose`.
183	let IsGitFamily = Channel.eq_ignore_ascii_case("git")
184		|| Channel.eq_ignore_ascii_case("source control")
185		|| Channel.eq_ignore_ascii_case("scm");
186
187	let LineCount = Payload.matches('\n').count();
188
189	if IsGitFamily {
190		dev_log!(
191			"grpc",
192			"[OutputChannel:{}] flush bytes={} lines~{}",
193			Channel,
194			Payload.len(),
195			LineCount
196		);
197	} else {
198		dev_log!(
199			"output-verbose",
200			"[OutputChannel] flush channel={} bytes={} lines~{}",
201			Channel,
202			Payload.len(),
203			LineCount
204		);
205	}
206}
207
208/// Submit a pending append for coalescing. Returns `true` when the
209/// item was enqueued (the coalescer will flush within `COALESCE_WINDOW`),
210/// `false` when coalescing is disabled and the caller must flush
211/// inline.
212pub fn TryEnqueue(Handle:&AppHandle, Channel:String, Value:String) -> bool {
213	if IsDisabled() {
214		return false;
215	}
216
217	let Ch = GetOrInitChannel();
218
219	let _ = Ch.Sender.send((Handle.clone(), PendingAppend { Channel, Value }));
220
221	true
222}