Skip to main content

tracing_tunnel/sender/
mod.rs

1//! Client-side subscriber.
2
3use core::{
4    ptr,
5    sync::atomic::{AtomicU32, Ordering},
6};
7
8use tracing_core::{
9    Event, Interest, Metadata, Subscriber,
10    span::{Attributes, Id, Record},
11};
12
13#[cfg(feature = "std")]
14pub use self::sync::Synced;
15use crate::{CallSiteData, MetadataId, RawSpanId, TracedValues, TracingEvent};
16
17#[cfg(feature = "std")]
18mod sync;
19
20impl TracingEvent {
21    fn new_span(span: &Attributes<'_>, metadata_id: MetadataId, id: RawSpanId) -> Self {
22        Self::NewSpan {
23            id,
24            parent_id: span.parent().map(Id::into_u64),
25            metadata_id,
26            values: TracedValues::from_values(span.values()),
27        }
28    }
29
30    fn values_recorded(id: RawSpanId, values: &Record<'_>) -> Self {
31        Self::ValuesRecorded {
32            id,
33            values: TracedValues::from_record(values),
34        }
35    }
36
37    fn new_event(event: &Event<'_>, metadata_id: MetadataId) -> Self {
38        Self::NewEvent {
39            metadata_id,
40            parent: event.parent().map(Id::into_u64),
41            values: TracedValues::from_event(event),
42        }
43    }
44}
45
46/// Event synchronization used by [`TracingEventSender`].
47///
48/// Synchronization might be necessary in a multithreaded environments, where events may arrive from
49/// different threads out of order. This functionality is encapsulated in the (`std`-dependent) [`Synced`]
50/// implementation.
51///
52/// For single-threaded environments (e.g., WASM), there is the no-op `()` implementation.
53pub trait EventSync: 'static + Send + Sync {
54    /// Called when a new callsite event arrives.
55    #[doc(hidden)] // implementation detail
56    fn register_callsite(
57        &self,
58        metadata: &'static Metadata<'static>,
59        sender: impl Fn(TracingEvent),
60    );
61
62    /// Called when a new span or event arrives.
63    #[doc(hidden)] // implementation detail
64    fn ensure_callsite_registered(
65        &self,
66        metadata: &'static Metadata<'static>,
67        sender: impl Fn(TracingEvent),
68    );
69}
70
71/// Default implementation that does not perform any synchronization.
72impl EventSync for () {
73    fn register_callsite(
74        &self,
75        metadata: &'static Metadata<'static>,
76        sender: impl Fn(TracingEvent),
77    ) {
78        sender(TracingEvent::NewCallSite {
79            id: metadata_id(metadata),
80            data: CallSiteData::from(metadata),
81        });
82    }
83
84    fn ensure_callsite_registered(
85        &self,
86        _metadata: &'static Metadata<'static>,
87        _sender: impl Fn(TracingEvent),
88    ) {
89        // Do nothing
90    }
91}
92
93fn metadata_id(metadata: &'static Metadata<'static>) -> MetadataId {
94    ptr::from_ref(metadata) as MetadataId
95}
96
97/// Tracing [`Subscriber`] that converts tracing events into (de)serializable [presentation]
98/// that can be sent elsewhere using a customizable hook.
99///
100/// As an example, this subscriber is used in the [Tardigrade client library] to send
101/// workflow traces to the host via a WASM import function.
102///
103/// # Examples
104///
105/// See [crate-level docs](index.html) for an example of usage.
106///
107/// [presentation]: TracingEvent
108/// [Tardigrade client library]: https://github.com/slowli/tardigrade
109#[derive(Debug)]
110pub struct TracingEventSender<F = fn(TracingEvent), S = ()> {
111    next_span_id: AtomicU32,
112    on_event: F,
113    sync: S,
114}
115
116impl<F: Fn(TracingEvent) + 'static> TracingEventSender<F> {
117    /// Creates a subscriber with the specified "on event" hook.
118    pub fn new(on_event: F) -> Self {
119        Self {
120            next_span_id: AtomicU32::new(1), // 0 is invalid span ID
121            on_event,
122            sync: (),
123        }
124    }
125}
126
127#[cfg(feature = "std")]
128impl<F: Fn(TracingEvent) + 'static> TracingEventSender<F, Synced> {
129    /// Creates a subscriber with the specified "on event" hook and synchronized event processing.
130    pub fn sync(on_event: F) -> Self {
131        Self {
132            next_span_id: AtomicU32::new(1), // 0 is invalid span ID
133            on_event,
134            sync: Synced::default(),
135        }
136    }
137}
138
139impl<F: Fn(TracingEvent) + 'static, S: EventSync> TracingEventSender<F, S> {
140    fn send(&self, event: TracingEvent) {
141        (self.on_event)(event);
142    }
143}
144
145impl<F: Fn(TracingEvent) + 'static, S: EventSync> Subscriber for TracingEventSender<F, S> {
146    fn register_callsite(&self, metadata: &'static Metadata<'static>) -> Interest {
147        // Ensure callsite is registered synchronously
148        self.sync.register_callsite(metadata, &self.on_event);
149        Interest::always()
150    }
151
152    fn enabled(&self, _metadata: &Metadata<'_>) -> bool {
153        true
154    }
155
156    fn new_span(&self, span: &Attributes<'_>) -> Id {
157        // Ensure callsite is registered before sending NewSpan event.
158        // Practice shows that the caller may not synchronize its register_callsite calls,
159        // allowing a new_span call to take effect before the registration completes.
160        // We guarantee that references are valid, even when multithreaded.
161        self.sync
162            .ensure_callsite_registered(span.metadata(), &self.on_event);
163
164        let metadata_id = metadata_id(span.metadata());
165        let span_id = u64::from(self.next_span_id.fetch_add(1, Ordering::SeqCst));
166        self.send(TracingEvent::new_span(span, metadata_id, span_id));
167        Id::from_u64(span_id)
168    }
169
170    fn record(&self, span: &Id, values: &Record<'_>) {
171        self.send(TracingEvent::values_recorded(span.into_u64(), values));
172    }
173
174    fn record_follows_from(&self, span: &Id, follows: &Id) {
175        self.send(TracingEvent::FollowsFrom {
176            id: span.into_u64(),
177            follows_from: follows.into_u64(),
178        });
179    }
180
181    fn event(&self, event: &Event<'_>) {
182        // Ensure callsite is registered before sending NewEvent.
183        // Practice shows that the caller may not synchronize its register_callsite calls,
184        // allowing an event call to take effect before the registration completes.
185        // We guarantee that references are valid, even when multi-threaded.
186        self.sync
187            .ensure_callsite_registered(event.metadata(), &self.on_event);
188
189        let metadata_id = metadata_id(event.metadata());
190        self.send(TracingEvent::new_event(event, metadata_id));
191    }
192
193    fn enter(&self, span: &Id) {
194        self.send(TracingEvent::SpanEntered {
195            id: span.into_u64(),
196        });
197    }
198
199    fn exit(&self, span: &Id) {
200        self.send(TracingEvent::SpanExited {
201            id: span.into_u64(),
202        });
203    }
204
205    fn clone_span(&self, span: &Id) -> Id {
206        self.send(TracingEvent::SpanCloned {
207            id: span.into_u64(),
208        });
209        span.clone()
210    }
211
212    fn try_close(&self, span: Id) -> bool {
213        self.send(TracingEvent::SpanDropped {
214            id: span.into_u64(),
215        });
216        false
217    }
218}