Skip to main content

DevelopmentNodeEnvironment_MicrosoftVSCodeDependency_22NodeVersion_Bundle_Clean_Debug_ElectronProfile_EsbuildCompiler_Mountain/IPC/DevLog/
EmitOTLPSpan.rs

1//! Fire-and-forget OTLP span exporter. Sends a single
2//! `resourceSpans` payload over plain HTTP to the collector at
3//! `Pipe` (default `127.0.0.1:4318`, configurable via
4//! `.env.Land.PostHog`). Stops trying after the first failure
5//! (`OTLP_AVAILABLE` flips to `false`) so a missing collector
6//! doesn't tax every IPC call. Release builds are compiled out
7//! via `cfg!(debug_assertions)`. Honors the `Capture` master
8//! telemetry kill switch and the per-pipe `Emit` toggle.
9
10use std::{
11	collections::hash_map::DefaultHasher,
12	hash::{Hash, Hasher},
13	sync::{
14		OnceLock,
15		atomic::{AtomicBool, Ordering},
16	},
17};
18
19use crate::{Binary::Build::PostHogPlugin::Constants, IPC::DevLog::NowNano};
20
21static OTLP_AVAILABLE:AtomicBool = AtomicBool::new(true);
22
23static OTLP_TRACE_ID:OnceLock<String> = OnceLock::new();
24
25fn GetTraceId() -> &'static str {
26	OTLP_TRACE_ID.get_or_init(|| {
27		let mut H = DefaultHasher::new();
28		std::process::id().hash(&mut H);
29		NowNano::Fn().hash(&mut H);
30		format!("{:032x}", H.finish() as u128)
31	})
32}
33
34fn RandU64() -> u64 {
35	let mut H = DefaultHasher::new();
36
37	std::thread::current().id().hash(&mut H);
38
39	NowNano::Fn().hash(&mut H);
40
41	H.finish()
42}
43
44pub fn Fn(Name:&str, StartNano:u64, EndNano:u64, Attributes:&[(&str, &str)]) {
45	if !cfg!(debug_assertions) {
46		return;
47	}
48
49	if matches!(Constants::TELEMETRY_CAPTURE, "false" | "0" | "off") {
50		return;
51	}
52
53	if matches!(Constants::OTLP_ENABLED, "false" | "0" | "off") {
54		return;
55	}
56
57	if !OTLP_AVAILABLE.load(Ordering::Relaxed) {
58		return;
59	}
60
61	let SpanId = format!("{:016x}", RandU64());
62
63	let TraceId = GetTraceId().to_string();
64
65	let SpanName = Name.to_string();
66
67	let AttributesJson:Vec<String> = Attributes
68		.iter()
69		.map(|(K, V)| {
70			format!(
71				r#"{{"key":"{}","value":{{"stringValue":"{}"}}}}"#,
72				K,
73				V.replace('\\', "\\\\").replace('"', "\\\"")
74			)
75		})
76		.collect();
77
78	let IsError = SpanName.contains("error");
79
80	let StatusCode = if IsError { 2 } else { 1 };
81
82	let Payload = format!(
83		concat!(
84			r#"{{"resourceSpans":[{{"resource":{{"attributes":["#,
85			r#"{{"key":"service.name","value":{{"stringValue":"land-editor-mountain"}}}},"#,
86			r#"{{"key":"service.version","value":{{"stringValue":"0.0.1"}}}}"#,
87			r#"]}},"scopeSpans":[{{"scope":{{"name":"mountain.ipc","version":"1.0.0"}},"#,
88			r#""spans":[{{"traceId":"{}","spanId":"{}","name":"{}","kind":1,"#,
89			r#""startTimeUnixNano":"{}","endTimeUnixNano":"{}","#,
90			r#""attributes":[{}],"status":{{"code":{}}}}}]}}]}}]}}"#,
91		),
92		TraceId,
93		SpanId,
94		SpanName,
95		StartNano,
96		EndNano,
97		AttributesJson.join(","),
98		StatusCode,
99	);
100
101	// Resolve `Pipe` (e.g. `http://127.0.0.1:4318`) → host:port + path.
102	// Strip scheme, split on `/` for the path component if any, default to
103	// `/v1/traces`.
104	let (HostAddress, PathSegment) = ParseEndpoint(Constants::OTLP_ENDPOINT);
105
106	std::thread::spawn(move || {
107		use std::{
108			io::{Read as IoRead, Write as IoWrite},
109			net::TcpStream,
110			time::Duration,
111		};
112
113		let Ok(SocketAddress) = HostAddress.parse() else {
114			OTLP_AVAILABLE.store(false, Ordering::Relaxed);
115			return;
116		};
117		let Ok(mut Stream) = TcpStream::connect_timeout(&SocketAddress, Duration::from_millis(200)) else {
118			OTLP_AVAILABLE.store(false, Ordering::Relaxed);
119			return;
120		};
121		let _ = Stream.set_write_timeout(Some(Duration::from_millis(200)));
122		let _ = Stream.set_read_timeout(Some(Duration::from_millis(200)));
123
124		let HttpReq = format!(
125			"POST {} HTTP/1.1\r\nHost: {}\r\nContent-Type: application/json\r\nContent-Length: {}\r\nConnection: \
126			 close\r\n\r\n",
127			PathSegment,
128			HostAddress,
129			Payload.len()
130		);
131		if Stream.write_all(HttpReq.as_bytes()).is_err() {
132			return;
133		}
134		if Stream.write_all(Payload.as_bytes()).is_err() {
135			return;
136		}
137		let mut Buf = [0u8; 32];
138		let _ = Stream.read(&mut Buf);
139		if !(Buf.starts_with(b"HTTP/1.1 2") || Buf.starts_with(b"HTTP/1.0 2")) {
140			OTLP_AVAILABLE.store(false, Ordering::Relaxed);
141		}
142	});
143}
144
145/// Split `http://host:port/path` into `(host:port, /path)`. Defaults the
146/// path to `/v1/traces` when the endpoint has none. Returns owned `String`s
147/// so the spawned thread does not borrow the build-time `&'static str`.
148fn ParseEndpoint(Endpoint:&str) -> (String, String) {
149	let WithoutScheme = Endpoint
150		.strip_prefix("http://")
151		.or_else(|| Endpoint.strip_prefix("https://"))
152		.unwrap_or(Endpoint);
153
154	let (HostPort, Path) = match WithoutScheme.split_once('/') {
155		Some((HP, Rest)) => (HP.to_string(), format!("/{}", Rest.trim_start_matches('/'))),
156
157		None => (WithoutScheme.to_string(), "/v1/traces".to_string()),
158	};
159
160	let PathFinal = if Path == "/" { "/v1/traces".to_string() } else { Path };
161
162	(HostPort, PathFinal)
163}