Skip to main content

Mountain/Vine/Client/
SendNotification.rs

1#![allow(non_snake_case)]
2
3//! Fire-and-forget notification to a sidecar. No response, no per-call
4//! timeout. Prefers the streaming multiplexer under
5//! `LAND_VINE_STREAMING=1`; falls through to unary on any failure.
6//! After a successful wire send, fans out via `PublishNotification::Fn`
7//! so broadcast subscribers (Effect-TS fibers, OTel emitters, future
8//! Mist-WS bridge, dev log) can observe the same flow concurrently.
9
10use serde_json::{Value, to_vec};
11
12use crate::{
13	Vine::{
14		Client::{
15			IsShuttingDown,
16			PublishNotification,
17			Shared::{RecordSideCarFailure, SIDECAR_CLIENTS, UpdateSideCarActivity, ValidateMessageSize},
18		},
19		Error::VineError,
20		Generated::GenericNotification,
21	},
22	dev_log,
23};
24
25pub async fn Fn(SideCarIdentifier:String, Method:String, Parameters:Value) -> Result<(), VineError> {
26	if IsShuttingDown::Fn() {
27		return Ok(());
28	}
29
30	if Method.is_empty() || Method.len() > 128 {
31		return Err(VineError::RPCError(
32			"Method name must be between 1 and 128 characters".to_string(),
33		));
34	}
35
36	if std::env::var("LAND_VINE_STREAMING").as_deref() == Ok("1") {
37		if let Some(Mux) = crate::Vine::Multiplexer::Multiplexer::Lookup(&SideCarIdentifier) {
38			if !Mux.IsClosed() {
39				let MethodForPublish = Method.clone();
40
41				let ParametersForPublish = Parameters.clone();
42
43				match Mux.Notify(Method.clone(), Parameters.clone()).await {
44					Ok(()) => {
45						UpdateSideCarActivity(&SideCarIdentifier);
46
47						PublishNotification::Fn(&SideCarIdentifier, &MethodForPublish, &ParametersForPublish);
48
49						return Ok(());
50					},
51
52					Err(Error) => {
53						dev_log!(
54							"grpc",
55							"warn: [VineClient::SendNotification] streaming send failed for '{}' ({}); falling back \
56							 to unary",
57							SideCarIdentifier,
58							Error
59						);
60					},
61				}
62			}
63		}
64	}
65
66	let ParameterBytes = to_vec(&Parameters)?;
67
68	ValidateMessageSize(&ParameterBytes)?;
69
70	let mut Client = {
71		let Pool = SIDECAR_CLIENTS.lock();
72
73		Pool.get(&SideCarIdentifier).cloned()
74	};
75
76	if let Some(ref mut Client) = Client {
77		let MethodForPublish = Method.clone();
78
79		let Request = GenericNotification { method:Method, parameter:ParameterBytes };
80
81		match Client.send_mountain_notification(Request).await {
82			Ok(_) => {
83				UpdateSideCarActivity(&SideCarIdentifier);
84
85				dev_log!(
86					"grpc",
87					"[VineClient] Notification sent successfully to sidecar '{}'",
88					SideCarIdentifier
89				);
90
91				PublishNotification::Fn(&SideCarIdentifier, &MethodForPublish, &Parameters);
92
93				Ok(())
94			},
95
96			Err(Status) => {
97				RecordSideCarFailure(&SideCarIdentifier);
98
99				dev_log!(
100					"grpc",
101					"error: [VineClient] Failed to send notification to sidecar '{}': {}",
102					SideCarIdentifier,
103					Status
104				);
105
106				Err(VineError::from(Status))
107			},
108		}
109	} else {
110		Err(VineError::ClientNotConnected(SideCarIdentifier))
111	}
112}