tracing_tunnel/sender/
mod.rs

1//! Client-side subscriber.
2
3use core::sync::atomic::{AtomicU32, Ordering};
4
5use tracing_core::{
6    span::{Attributes, Id, Record},
7    Event, Interest, Metadata, Subscriber,
8};
9
10#[cfg(feature = "std")]
11pub use self::sync::Synced;
12use crate::{CallSiteData, MetadataId, RawSpanId, TracedValues, TracingEvent};
13
14#[cfg(feature = "std")]
15mod sync;
16
17impl TracingEvent {
18    fn new_span(span: &Attributes<'_>, metadata_id: MetadataId, id: RawSpanId) -> Self {
19        Self::NewSpan {
20            id,
21            parent_id: span.parent().map(Id::into_u64),
22            metadata_id,
23            values: TracedValues::from_values(span.values()),
24        }
25    }
26
27    fn values_recorded(id: RawSpanId, values: &Record<'_>) -> Self {
28        Self::ValuesRecorded {
29            id,
30            values: TracedValues::from_record(values),
31        }
32    }
33
34    fn new_event(event: &Event<'_>, metadata_id: MetadataId) -> Self {
35        Self::NewEvent {
36            metadata_id,
37            parent: event.parent().map(Id::into_u64),
38            values: TracedValues::from_event(event),
39        }
40    }
41}
42
43/// Event synchronization used by [`TracingEventSender`].
44///
45/// Synchronization might be necessary in a multithreaded environments, where events may arrive from
46/// different threads out of order. This functionality is encapsulated in the (`std`-dependent) [`Synced`]
47/// implementation.
48///
49/// For single-threaded environments (e.g., WASM), there is the no-op `()` implementation.
50pub trait EventSync: 'static + Send + Sync {
51    /// Called when a new callsite event arrives.
52    #[doc(hidden)] // implementation detail
53    fn register_callsite(
54        &self,
55        metadata: &'static Metadata<'static>,
56        sender: impl Fn(TracingEvent),
57    );
58
59    /// Called when a new span or event arrives.
60    #[doc(hidden)] // implementation detail
61    fn ensure_callsite_registered(
62        &self,
63        metadata: &'static Metadata<'static>,
64        sender: impl Fn(TracingEvent),
65    );
66}
67
68/// Default implementation that does not perform any synchronization.
69impl EventSync for () {
70    fn register_callsite(
71        &self,
72        metadata: &'static Metadata<'static>,
73        sender: impl Fn(TracingEvent),
74    ) {
75        sender(TracingEvent::NewCallSite {
76            id: metadata_id(metadata),
77            data: CallSiteData::from(metadata),
78        });
79    }
80
81    fn ensure_callsite_registered(
82        &self,
83        _metadata: &'static Metadata<'static>,
84        _sender: impl Fn(TracingEvent),
85    ) {
86        // Do nothing
87    }
88}
89
90fn metadata_id(metadata: &'static Metadata<'static>) -> MetadataId {
91    metadata as *const _ as MetadataId
92}
93
94/// Tracing [`Subscriber`] that converts tracing events into (de)serializable [presentation]
95/// that can be sent elsewhere using a customizable hook.
96///
97/// As an example, this subscriber is used in the [Tardigrade client library] to send
98/// workflow traces to the host via a WASM import function.
99///
100/// # Examples
101///
102/// See [crate-level docs](index.html) for an example of usage.
103///
104/// [presentation]: TracingEvent
105/// [Tardigrade client library]: https://github.com/slowli/tardigrade
106#[derive(Debug)]
107pub struct TracingEventSender<F = fn(TracingEvent), S = ()> {
108    next_span_id: AtomicU32,
109    on_event: F,
110    sync: S,
111}
112
113impl<F: Fn(TracingEvent) + 'static> TracingEventSender<F> {
114    /// Creates a subscriber with the specified "on event" hook.
115    pub fn new(on_event: F) -> Self {
116        Self {
117            next_span_id: AtomicU32::new(1), // 0 is invalid span ID
118            on_event,
119            sync: (),
120        }
121    }
122}
123
124#[cfg(feature = "std")]
125impl<F: Fn(TracingEvent) + 'static> TracingEventSender<F, Synced> {
126    /// Creates a subscriber with the specified "on event" hook and synchronized event processing.
127    pub fn sync(on_event: F) -> Self {
128        Self {
129            next_span_id: AtomicU32::new(1), // 0 is invalid span ID
130            on_event,
131            sync: Synced::default(),
132        }
133    }
134}
135
136impl<F: Fn(TracingEvent) + 'static, S: EventSync> TracingEventSender<F, S> {
137    fn send(&self, event: TracingEvent) {
138        (self.on_event)(event);
139    }
140}
141
142impl<F: Fn(TracingEvent) + 'static, S: EventSync> Subscriber for TracingEventSender<F, S> {
143    fn register_callsite(&self, metadata: &'static Metadata<'static>) -> Interest {
144        // Ensure callsite is registered synchronously
145        self.sync.register_callsite(metadata, &self.on_event);
146        Interest::always()
147    }
148
149    fn enabled(&self, _metadata: &Metadata<'_>) -> bool {
150        true
151    }
152
153    fn new_span(&self, span: &Attributes<'_>) -> Id {
154        // Ensure callsite is registered before sending NewSpan event.
155        // Practice shows that the caller may not synchronize its register_callsite calls,
156        // allowing a new_span call to take effect before the registration completes.
157        // We guarantee that references are valid, even when multithreaded.
158        self.sync
159            .ensure_callsite_registered(span.metadata(), &self.on_event);
160
161        let metadata_id = metadata_id(span.metadata());
162        let span_id = u64::from(self.next_span_id.fetch_add(1, Ordering::SeqCst));
163        self.send(TracingEvent::new_span(span, metadata_id, span_id));
164        Id::from_u64(span_id)
165    }
166
167    fn record(&self, span: &Id, values: &Record<'_>) {
168        self.send(TracingEvent::values_recorded(span.into_u64(), values));
169    }
170
171    fn record_follows_from(&self, span: &Id, follows: &Id) {
172        self.send(TracingEvent::FollowsFrom {
173            id: span.into_u64(),
174            follows_from: follows.into_u64(),
175        });
176    }
177
178    fn event(&self, event: &Event<'_>) {
179        // Ensure callsite is registered before sending NewEvent.
180        // Practice shows that the caller may not synchronize its register_callsite calls,
181        // allowing an event call to take effect before the registration completes.
182        // We guarantee that references are valid, even when multi-threaded.
183        self.sync
184            .ensure_callsite_registered(event.metadata(), &self.on_event);
185
186        let metadata_id = metadata_id(event.metadata());
187        self.send(TracingEvent::new_event(event, metadata_id));
188    }
189
190    fn enter(&self, span: &Id) {
191        self.send(TracingEvent::SpanEntered {
192            id: span.into_u64(),
193        });
194    }
195
196    fn exit(&self, span: &Id) {
197        self.send(TracingEvent::SpanExited {
198            id: span.into_u64(),
199        });
200    }
201
202    fn clone_span(&self, span: &Id) -> Id {
203        self.send(TracingEvent::SpanCloned {
204            id: span.into_u64(),
205        });
206        span.clone()
207    }
208
209    fn try_close(&self, span: Id) -> bool {
210        self.send(TracingEvent::SpanDropped {
211            id: span.into_u64(),
212        });
213        false
214    }
215}