Mountain/Vine/Client/
SendNotification.rs1#![allow(non_snake_case)]
2
3use serde_json::{Value, to_vec};
11
12use crate::{
13 Vine::{
14 Client::{
15 IsShuttingDown,
16 PublishNotification,
17 Shared::{RecordSideCarFailure, SIDECAR_CLIENTS, UpdateSideCarActivity, ValidateMessageSize},
18 },
19 Error::VineError,
20 Generated::GenericNotification,
21 },
22 dev_log,
23};
24
25pub async fn Fn(SideCarIdentifier:String, Method:String, Parameters:Value) -> Result<(), VineError> {
26 if IsShuttingDown::Fn() {
27 return Ok(());
28 }
29
30 if Method.is_empty() || Method.len() > 128 {
31 return Err(VineError::RPCError(
32 "Method name must be between 1 and 128 characters".to_string(),
33 ));
34 }
35
36 if std::env::var("LAND_VINE_STREAMING").as_deref() == Ok("1") {
37 if let Some(Mux) = crate::Vine::Multiplexer::Multiplexer::Lookup(&SideCarIdentifier) {
38 if !Mux.IsClosed() {
39 let MethodForPublish = Method.clone();
40
41 let ParametersForPublish = Parameters.clone();
42
43 match Mux.Notify(Method.clone(), Parameters.clone()).await {
44 Ok(()) => {
45 UpdateSideCarActivity(&SideCarIdentifier);
46
47 PublishNotification::Fn(&SideCarIdentifier, &MethodForPublish, &ParametersForPublish);
48
49 return Ok(());
50 },
51
52 Err(Error) => {
53 dev_log!(
54 "grpc",
55 "warn: [VineClient::SendNotification] streaming send failed for '{}' ({}); falling back \
56 to unary",
57 SideCarIdentifier,
58 Error
59 );
60 },
61 }
62 }
63 }
64 }
65
66 let ParameterBytes = to_vec(&Parameters)?;
67
68 ValidateMessageSize(&ParameterBytes)?;
69
70 let mut Client = {
71 let Pool = SIDECAR_CLIENTS.lock();
72
73 Pool.get(&SideCarIdentifier).cloned()
74 };
75
76 if let Some(ref mut Client) = Client {
77 let MethodForPublish = Method.clone();
78
79 let Request = GenericNotification { method:Method, parameter:ParameterBytes };
80
81 match Client.send_mountain_notification(Request).await {
82 Ok(_) => {
83 UpdateSideCarActivity(&SideCarIdentifier);
84
85 dev_log!(
86 "grpc",
87 "[VineClient] Notification sent successfully to sidecar '{}'",
88 SideCarIdentifier
89 );
90
91 PublishNotification::Fn(&SideCarIdentifier, &MethodForPublish, &Parameters);
92
93 Ok(())
94 },
95
96 Err(Status) => {
97 RecordSideCarFailure(&SideCarIdentifier);
98
99 dev_log!(
100 "grpc",
101 "error: [VineClient] Failed to send notification to sidecar '{}': {}",
102 SideCarIdentifier,
103 Status
104 );
105
106 Err(VineError::from(Status))
107 },
108 }
109 } else {
110 Err(VineError::ClientNotConnected(SideCarIdentifier))
111 }
112}