Skip to main content

DevelopmentNodeEnvironment_MicrosoftVSCodeDependency_22NodeVersion_Bundle_Clean_Debug_ElectronProfile_EsbuildCompiler_Mountain/Vine/Server/Notification/
RegisterCommand.rs

1//! Cocoon → Mountain `registerCommand` notification.
2//! Stores the command as a `Proxied` handler in Mountain's
3//! `CommandRegistry` so subsequent `commands.executeCommand` calls get
4//! routed back to Cocoon via `$executeContributedCommand` gRPC.
5//!
6//! ## Batching
7//!
8//! Extension boot fires 1000+ `registerCommand` notifications in a tight
9//! burst. Rather than spawning one short-lived tokio task per call (and
10//! always sleeping 16 ms even for the last item), we push into a
11//! `mpsc::unbounded_channel` and a single long-lived flusher task drains
12//! it: it wakes immediately when the first item arrives, collects
13//! everything already queued via `recv_many`, then sleeps 16 ms and
14//! drains a second time to catch stragglers - then emits one batch event.
15//! The net effect is identical to the old coalescer but avoids 1000+
16//! task spawns and reduces the minimum latency to sub-millisecond for
17//! isolated commands registered after boot.
18
19use std::sync::OnceLock;
20
21use serde_json::{Value, json};
22use tauri::{AppHandle, Emitter};
23use tokio::sync::mpsc::{UnboundedSender, unbounded_channel};
24
25use crate::{
26	Environment::CommandProvider::CommandHandler,
27	Vine::Server::MountainVinegRPCService::MountainVinegRPCService,
28	dev_log,
29};
30
31struct CommandBatchChannel {
32	Sender:UnboundedSender<(AppHandle, Value)>,
33}
34
35static CMD_CHANNEL:OnceLock<CommandBatchChannel> = OnceLock::new();
36
37fn GetOrInitChannel(Handle:&AppHandle) -> &'static CommandBatchChannel {
38	CMD_CHANNEL.get_or_init(|| {
39		let (Tx, mut Rx) = unbounded_channel::<(AppHandle, Value)>();
40
41		tokio::spawn(async move {
42			let mut Buf:Vec<(AppHandle, Value)> = Vec::with_capacity(128);
43
44			loop {
45				// Block until at least one item arrives.
46				match Rx.recv().await {
47					None => break,
48					Some(First) => Buf.push(First),
49				}
50
51				// Drain everything already queued without blocking.
52				Rx.recv_many(&mut Buf, 4096).await;
53
54				// One frame - let stragglers accumulate.
55				tokio::time::sleep(std::time::Duration::from_millis(16)).await;
56
57				// Drain again after the frame window.
58				Rx.recv_many(&mut Buf, 4096).await;
59
60				if Buf.is_empty() {
61					continue;
62				}
63
64				// Emit single batch; all items share the same AppHandle.
65				let Handle = Buf[0].0.clone();
66
67				let Commands:Vec<Value> = Buf.drain(..).map(|(_, V)| V).collect();
68
69				let Count = Commands.len();
70
71				match Handle.emit("sky://command/register", json!({ "commands": Commands })) {
72					Ok(()) => {
73						dev_log!("sky-emit", "[SkyEmit] ok channel=sky://command/register batch={}", Count);
74
75						// Summary line at the default-visible `commands` tag
76						// so `Trace=short` still surfaces the boot burst as
77						// `RegisterCommand batch=N` per 16ms window instead
78						// of N hidden per-command lines under
79						// `command-register`. One line per batch is the
80						// natural granularity - matches the rate of the
81						// downstream Sky emit.
82						dev_log!("commands", "[RegisterCommand] batch={}", Count);
83					},
84					Err(E) => {
85						dev_log!(
86							"sky-emit",
87							"[SkyEmit] fail channel=sky://command/register batch={} error={}",
88							Count,
89							E
90						);
91					},
92				}
93			}
94		});
95
96		CommandBatchChannel { Sender:Tx }
97	})
98}
99
100pub async fn RegisterCommand(Service:&MountainVinegRPCService, Parameter:&Value) {
101	let CommandId = Parameter.get("commandId").and_then(Value::as_str).unwrap_or("");
102
103	dev_log!(
104		"command-register",
105		"[MountainVinegRPCService] Cocoon registered command: {}",
106		CommandId
107	);
108
109	if CommandId.is_empty() {
110		return;
111	}
112
113	let Kind = Parameter.get("kind").and_then(Value::as_str).unwrap_or("command").to_string();
114
115	if let Ok(mut Registry) = Service
116		.RunTime()
117		.Environment
118		.ApplicationState
119		.Extension
120		.Registry
121		.CommandRegistry
122		.lock()
123	{
124		Registry.insert(
125			CommandId.to_string(),
126			CommandHandler::Proxied {
127				SideCarIdentifier:"cocoon-main".to_string(),
128				CommandIdentifier:CommandId.to_string(),
129			},
130		);
131	}
132
133	let Ch = GetOrInitChannel(Service.ApplicationHandle());
134
135	let _ = Ch.Sender.send((
136		Service.ApplicationHandle().clone(),
137		json!({ "id": CommandId, "commandId": CommandId, "kind": Kind }),
138	));
139}