Skip to main content

DevelopmentNodeEnvironment_MicrosoftVSCodeDependency_22NodeVersion_Bundle_Clean_Debug_ElectronProfile_EsbuildCompiler_Mountain/Vine/Client/
SendNotification.rs

1//! Fire-and-forget notification to a sidecar. No response, no per-call
2//! timeout. Prefers the streaming multiplexer under
3//! `LAND_VINE_STREAMING=1`; falls through to unary on any failure.
4//! After a successful wire send, fans out via `PublishNotification::Fn`
5//! so broadcast subscribers (Effect-TS fibers, OTel emitters, future
6//! Mist-WS bridge, dev log) can observe the same flow concurrently.
7
8use serde_json::{Value, to_vec};
9
10use crate::{
11	Vine::{
12		Client::{
13			IsShuttingDown,
14			PublishNotification,
15			Shared::{RecordSideCarFailure, SIDECAR_CLIENTS, UpdateSideCarActivity, ValidateMessageSize},
16		},
17		Error::VineError,
18		Generated::GenericNotification,
19	},
20	dev_log,
21};
22
23pub async fn Fn(SideCarIdentifier:String, Method:String, Parameters:Value) -> Result<(), VineError> {
24	if IsShuttingDown::Fn() {
25		return Ok(());
26	}
27
28	if Method.is_empty() || Method.len() > 128 {
29		return Err(VineError::RPCError(
30			"Method name must be between 1 and 128 characters".to_string(),
31		));
32	}
33
34	if std::env::var("LAND_VINE_STREAMING").as_deref() == Ok("1") {
35		if let Some(Mux) = crate::Vine::Multiplexer::Multiplexer::Lookup(&SideCarIdentifier) {
36			if !Mux.IsClosed() {
37				let MethodForPublish = Method.clone();
38
39				let ParametersForPublish = Parameters.clone();
40
41				match Mux.Notify(Method.clone(), Parameters.clone()).await {
42					Ok(()) => {
43						UpdateSideCarActivity(&SideCarIdentifier);
44
45						PublishNotification::Fn(&SideCarIdentifier, &MethodForPublish, &ParametersForPublish);
46
47						return Ok(());
48					},
49
50					Err(Error) => {
51						dev_log!(
52							"grpc",
53							"warn: [VineClient::SendNotification] streaming send failed for '{}' ({}); falling back \
54							 to unary",
55							SideCarIdentifier,
56							Error
57						);
58					},
59				}
60			}
61		}
62	}
63
64	let ParameterBytes = to_vec(&Parameters)?;
65
66	ValidateMessageSize(&ParameterBytes)?;
67
68	let mut Client = {
69		let Pool = SIDECAR_CLIENTS.lock();
70
71		Pool.get(&SideCarIdentifier).cloned()
72	};
73
74	if let Some(ref mut Client) = Client {
75		let MethodForPublish = Method.clone();
76
77		let Request = GenericNotification { method:Method, parameter:ParameterBytes };
78
79		match Client.send_mountain_notification(Request).await {
80			Ok(_) => {
81				UpdateSideCarActivity(&SideCarIdentifier);
82
83				dev_log!(
84					"grpc",
85					"[VineClient] Notification sent successfully to sidecar '{}'",
86					SideCarIdentifier
87				);
88
89				PublishNotification::Fn(&SideCarIdentifier, &MethodForPublish, &Parameters);
90
91				Ok(())
92			},
93
94			Err(Status) => {
95				RecordSideCarFailure(&SideCarIdentifier);
96
97				dev_log!(
98					"grpc",
99					"error: [VineClient] Failed to send notification to sidecar '{}': {}",
100					SideCarIdentifier,
101					Status
102				);
103
104				Err(VineError::from(Status))
105			},
106		}
107	} else {
108		Err(VineError::ClientNotConnected(SideCarIdentifier))
109	}
110}