Mountain/IPC/DevLog/
EmitOTLPSpan.rs1#![allow(non_snake_case)]
2
3use std::{
13 collections::hash_map::DefaultHasher,
14 hash::{Hash, Hasher},
15 sync::{
16 OnceLock,
17 atomic::{AtomicBool, Ordering},
18 },
19};
20
21use crate::{Binary::Build::PostHogPlugin::Constants, IPC::DevLog::NowNano};
22
23static OTLP_AVAILABLE:AtomicBool = AtomicBool::new(true);
24
25static OTLP_TRACE_ID:OnceLock<String> = OnceLock::new();
26
27fn GetTraceId() -> &'static str {
28 OTLP_TRACE_ID.get_or_init(|| {
29 let mut H = DefaultHasher::new();
30 std::process::id().hash(&mut H);
31 NowNano::Fn().hash(&mut H);
32 format!("{:032x}", H.finish() as u128)
33 })
34}
35
36fn RandU64() -> u64 {
37 let mut H = DefaultHasher::new();
38
39 std::thread::current().id().hash(&mut H);
40
41 NowNano::Fn().hash(&mut H);
42
43 H.finish()
44}
45
46pub fn Fn(Name:&str, StartNano:u64, EndNano:u64, Attributes:&[(&str, &str)]) {
47 if !cfg!(debug_assertions) {
48 return;
49 }
50
51 if matches!(Constants::TELEMETRY_CAPTURE, "false" | "0" | "off") {
52 return;
53 }
54
55 if matches!(Constants::OTLP_ENABLED, "false" | "0" | "off") {
56 return;
57 }
58
59 if !OTLP_AVAILABLE.load(Ordering::Relaxed) {
60 return;
61 }
62
63 let SpanId = format!("{:016x}", RandU64());
64
65 let TraceId = GetTraceId().to_string();
66
67 let SpanName = Name.to_string();
68
69 let AttributesJson:Vec<String> = Attributes
70 .iter()
71 .map(|(K, V)| {
72 format!(
73 r#"{{"key":"{}","value":{{"stringValue":"{}"}}}}"#,
74 K,
75 V.replace('\\', "\\\\").replace('"', "\\\"")
76 )
77 })
78 .collect();
79
80 let IsError = SpanName.contains("error");
81
82 let StatusCode = if IsError { 2 } else { 1 };
83
84 let Payload = format!(
85 concat!(
86 r#"{{"resourceSpans":[{{"resource":{{"attributes":["#,
87 r#"{{"key":"service.name","value":{{"stringValue":"land-editor-mountain"}}}},"#,
88 r#"{{"key":"service.version","value":{{"stringValue":"0.0.1"}}}}"#,
89 r#"]}},"scopeSpans":[{{"scope":{{"name":"mountain.ipc","version":"1.0.0"}},"#,
90 r#""spans":[{{"traceId":"{}","spanId":"{}","name":"{}","kind":1,"#,
91 r#""startTimeUnixNano":"{}","endTimeUnixNano":"{}","#,
92 r#""attributes":[{}],"status":{{"code":{}}}}}]}}]}}]}}"#,
93 ),
94 TraceId,
95 SpanId,
96 SpanName,
97 StartNano,
98 EndNano,
99 AttributesJson.join(","),
100 StatusCode,
101 );
102
103 let (HostAddress, PathSegment) = ParseEndpoint(Constants::OTLP_ENDPOINT);
107
108 std::thread::spawn(move || {
109 use std::{
110 io::{Read as IoRead, Write as IoWrite},
111 net::TcpStream,
112 time::Duration,
113 };
114
115 let Ok(SocketAddress) = HostAddress.parse() else {
116 OTLP_AVAILABLE.store(false, Ordering::Relaxed);
117 return;
118 };
119 let Ok(mut Stream) = TcpStream::connect_timeout(&SocketAddress, Duration::from_millis(200)) else {
120 OTLP_AVAILABLE.store(false, Ordering::Relaxed);
121 return;
122 };
123 let _ = Stream.set_write_timeout(Some(Duration::from_millis(200)));
124 let _ = Stream.set_read_timeout(Some(Duration::from_millis(200)));
125
126 let HttpReq = format!(
127 "POST {} HTTP/1.1\r\nHost: {}\r\nContent-Type: application/json\r\nContent-Length: {}\r\nConnection: \
128 close\r\n\r\n",
129 PathSegment,
130 HostAddress,
131 Payload.len()
132 );
133 if Stream.write_all(HttpReq.as_bytes()).is_err() {
134 return;
135 }
136 if Stream.write_all(Payload.as_bytes()).is_err() {
137 return;
138 }
139 let mut Buf = [0u8; 32];
140 let _ = Stream.read(&mut Buf);
141 if !(Buf.starts_with(b"HTTP/1.1 2") || Buf.starts_with(b"HTTP/1.0 2")) {
142 OTLP_AVAILABLE.store(false, Ordering::Relaxed);
143 }
144 });
145}
146
147fn ParseEndpoint(Endpoint:&str) -> (String, String) {
151 let WithoutScheme = Endpoint
152 .strip_prefix("http://")
153 .or_else(|| Endpoint.strip_prefix("https://"))
154 .unwrap_or(Endpoint);
155
156 let (HostPort, Path) = match WithoutScheme.split_once('/') {
157 Some((HP, Rest)) => (HP.to_string(), format!("/{}", Rest.trim_start_matches('/'))),
158
159 None => (WithoutScheme.to_string(), "/v1/traces".to_string()),
160 };
161
162 let PathFinal = if Path == "/" { "/v1/traces".to_string() } else { Path };
163
164 (HostPort, PathFinal)
165}