tracing_tunnel/receiver/
mod.rs

1//! `TracingEvent` receiver.
2
3use std::{
4    collections::{HashMap, HashSet},
5    error, fmt, mem,
6};
7
8use serde::{Deserialize, Serialize};
9use tracing_core::{
10    dispatcher::{self, Dispatch},
11    field::{self, FieldSet, Value, ValueSet},
12    span::{Attributes, Id, Record},
13    Event, Field, Metadata,
14};
15
16use self::arena::ARENA;
17use crate::{CallSiteData, MetadataId, RawSpanId, TracedValue, TracedValues, TracingEvent};
18
19mod arena;
20#[cfg(test)]
21mod tests;
22
23enum CowValue<'a> {
24    Borrowed(&'a dyn Value),
25    Owned(Box<dyn Value + 'a>),
26}
27
28impl fmt::Debug for CowValue<'_> {
29    fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
30        match self {
31            Self::Borrowed(_) => formatter.debug_struct("Borrowed").finish_non_exhaustive(),
32            Self::Owned(_) => formatter.debug_struct("Owned").finish_non_exhaustive(),
33        }
34    }
35}
36
37impl<'a> CowValue<'a> {
38    fn as_ref(&self) -> &(dyn Value + 'a) {
39        match self {
40            Self::Borrowed(value) => value,
41            Self::Owned(boxed) => boxed.as_ref(),
42        }
43    }
44}
45
46impl TracedValue {
47    fn as_value(&self) -> CowValue<'_> {
48        CowValue::Borrowed(match self {
49            Self::Bool(value) => value,
50            Self::Int(value) => value,
51            Self::UInt(value) => value,
52            Self::Float(value) => value,
53            Self::String(value) => value,
54            Self::Object(value) => return CowValue::Owned(Box::new(field::debug(value))),
55            Self::Error(err) => {
56                let err = err as &(dyn error::Error + 'static);
57                return CowValue::Owned(Box::new(err));
58            }
59        })
60    }
61}
62
63#[derive(Debug, Clone, Serialize, Deserialize)]
64struct SpanData {
65    metadata_id: MetadataId,
66    #[serde(default, skip_serializing_if = "Option::is_none")]
67    parent_id: Option<RawSpanId>,
68    ref_count: usize,
69    values: TracedValues<String>,
70}
71
72/// Information about span / event [`Metadata`] that is [serializable] and thus
73/// can be persisted across multiple [`TracingEventReceiver`] lifetimes.
74///
75/// `PersistedMetadata` logically corresponds to a program executable (e.g., a WASM module),
76/// not to its particular execution (e.g., a WASM module instance).
77/// Multiple executions of the same executable can (and optimally should)
78/// share `PersistedMetadata`.
79///
80/// [serializable]: https://docs.rs/serde/1/serde
81#[derive(Debug, Clone, Default, Serialize, Deserialize)]
82#[serde(transparent)]
83pub struct PersistedMetadata {
84    inner: HashMap<MetadataId, CallSiteData>,
85}
86
87impl PersistedMetadata {
88    /// Returns the number of metadata entries.
89    pub fn len(&self) -> usize {
90        self.inner.len()
91    }
92
93    /// Checks whether this metadata collection is empty (i.e., no metadata was recorded yet).
94    pub fn is_empty(&self) -> bool {
95        self.inner.is_empty()
96    }
97
98    /// Iterates over contained call site metadata together with the corresponding
99    /// [`MetadataId`]s.
100    pub fn iter(&self) -> impl Iterator<Item = (MetadataId, &CallSiteData)> + '_ {
101        self.inner.iter().map(|(id, data)| (*id, data))
102    }
103
104    /// Merges entries from another `PersistedMetadata` instance.
105    pub fn extend(&mut self, other: Self) {
106        self.inner.extend(other.inner);
107    }
108}
109
110/// Information about alive tracing spans for a particular execution that is (de)serializable and
111/// can be persisted across multiple [`TracingEventReceiver`] lifetimes.
112///
113/// Unlike [`PersistedMetadata`], `PersistedSpans` are specific to an executable invocation
114/// (e.g., a WASM module instance). Compared to [`LocalSpans`], `PersistedSpans` have
115/// the lifetime of the execution and not the host [`Subscriber`].
116///
117/// [`Subscriber`]: tracing_core::Subscriber
118#[derive(Debug, Clone, Default, Serialize, Deserialize)]
119#[serde(transparent)]
120pub struct PersistedSpans {
121    inner: HashMap<RawSpanId, SpanData>,
122}
123
124impl PersistedSpans {
125    /// Returns the number of alive spans.
126    pub fn len(&self) -> usize {
127        self.inner.len()
128    }
129
130    /// Checks whether this span collection is empty (i.e., no spans were recorded yet).
131    pub fn is_empty(&self) -> bool {
132        self.inner.is_empty()
133    }
134}
135
136/// [`Subscriber`]-specific information about tracing spans for a particular execution
137/// (e.g., a WASM module instance).
138///
139/// Unlike [`PersistedSpans`], this information is not serializable and lives along with
140/// the host [`Subscriber`]. It is intended to be placed in something like
141/// (an initially empty) `HashMap<K, LocalSpans>`, where `K` denotes the execution ID.
142///
143/// [`Subscriber`]: tracing_core::Subscriber
144#[derive(Debug, Default)]
145pub struct LocalSpans {
146    inner: HashMap<RawSpanId, Id>,
147}
148
149/// Error processing a [`TracingEvent`] by a [`TracingEventReceiver`].
150#[derive(Debug)]
151#[non_exhaustive]
152pub enum ReceiveError {
153    /// The event contains a reference to an unknown [`Metadata`] ID.
154    UnknownMetadataId(MetadataId),
155    /// The event contains a reference to an unknown span ID.
156    UnknownSpanId(RawSpanId),
157    /// The event contains too many values.
158    TooManyValues {
159        /// Maximum supported number of values per event.
160        max: usize,
161        /// Actual number of values.
162        actual: usize,
163    },
164}
165
166impl fmt::Display for ReceiveError {
167    fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
168        match self {
169            Self::UnknownMetadataId(id) => write!(formatter, "unknown metadata ID: {id}"),
170            Self::UnknownSpanId(id) => write!(formatter, "unknown span ID: {id}"),
171            Self::TooManyValues { max, actual } => write!(
172                formatter,
173                "too many values provided ({actual}), should be no more than {max}"
174            ),
175        }
176    }
177}
178
179impl error::Error for ReceiveError {}
180
181macro_rules! create_value_set {
182    ($fields:ident, $values:ident, [$($i:expr,)+]) => {
183        match $values.len() {
184            0 => $fields.value_set(&[]),
185            $(
186            $i => $fields.value_set(<&[_; $i]>::try_from($values).unwrap()),
187            )+
188            _ => unreachable!(),
189        }
190    };
191}
192
193/// Container for non-persisted information specific to a particular traced execution.
194#[derive(Debug, Default)]
195struct CurrentExecution {
196    uncommitted_span_ids: HashSet<RawSpanId>,
197    entered_span_ids: HashSet<RawSpanId>,
198}
199
200impl CurrentExecution {
201    fn remove_span(&mut self, id: RawSpanId) {
202        self.entered_span_ids.remove(&id);
203        self.uncommitted_span_ids.remove(&id);
204    }
205
206    fn finalize(&mut self, local_spans: &LocalSpans) {
207        for id in mem::take(&mut self.entered_span_ids) {
208            if let Some(local_id) = local_spans.inner.get(&id) {
209                TracingEventReceiver::dispatch(|dispatch| dispatch.exit(local_id));
210            }
211        }
212        for id in mem::take(&mut self.uncommitted_span_ids) {
213            if let Some(local_id) = local_spans.inner.get(&id) {
214                TracingEventReceiver::dispatch(|dispatch| dispatch.try_close(local_id.clone()));
215            }
216        }
217    }
218}
219
220/// Receiver of [`TracingEvent`]s produced by [`TracingEventSender`] that relays them
221/// to the tracing infrastructure.
222///
223/// The receiver takes care of persisting [`Metadata`] / spans that can outlive
224/// the lifetime of the host program (not just the `TracingEventReceiver` instance!).
225/// As an example, in [the Tardigrade runtime], a consumer instance is created each time
226/// a workflow is executed. It relays tracing events from the workflow logic (executed in WASM)
227/// to the host.
228///
229/// In some cases, the execution tracked by `TracingEventReceiver` may finish abnormally.
230/// (E.g., a WASM module instance panics while it has the `panic = abort` set
231/// in the compilation options.) In these cases, all entered
232/// spans are force-exited when the receiver is dropped. Additionally, spans created
233/// by the execution are closed on drop as long as they are not [persisted](Self::persist()).
234/// That is, persistence acts as a commitment of the execution results, while the default
235/// behavior is rollback.
236///
237/// # ⚠ Resource consumption
238///
239/// To fit the API of the [`tracing-core`] crate, the receiver leaks string parts
240/// of [`CallSiteData`]: we need a `&'static str` when we only have a `String`. Steps are taken
241/// to limit the amount of leaked memory; we use a `static` string arena which checks whether
242/// a particular string was already leaked, and reuses the existing `&'static str` if possible.
243/// Still, this has negative implications regarding both memory consumption and performance,
244/// so you probably should limit the number of executables to use with a `TracingEventReceiver`.
245/// The number of *executions* of each executable is not a limiting factor.
246///
247/// # Examples
248///
249/// See [crate-level docs](index.html) for an example of usage.
250///
251/// [`TracingEventSender`]: crate::TracingEventSender
252/// [the Tardigrade runtime]: https://github.com/slowli/tardigrade
253/// [`tracing-core`]: https://docs.rs/tracing-core/
254#[derive(Debug, Default)]
255pub struct TracingEventReceiver {
256    metadata: HashMap<MetadataId, &'static Metadata<'static>>,
257    spans: PersistedSpans,
258    local_spans: LocalSpans,
259    current_execution: CurrentExecution,
260}
261
262impl TracingEventReceiver {
263    /// Maximum supported number of values in a span or event.
264    const MAX_VALUES: usize = 32;
265
266    /// Restores the receiver from the persisted metadata and tracing spans.
267    ///
268    /// A receiver will work fine if `local_spans` information is lost (e.g., reset to the default
269    /// empty value). However, this is likely to result in span leakage
270    /// in the underlying [`Subscriber`]. On the other hand, mismatch between `metadata` / `spans`
271    /// and the execution producing [`TracingEvent`]s is **bad**; it will most likely result
272    /// in errors returned from [`Self::try_receive()`].
273    ///
274    /// [`Subscriber`]: tracing_core::Subscriber
275    pub fn new(
276        metadata: PersistedMetadata,
277        spans: PersistedSpans,
278        local_spans: LocalSpans,
279    ) -> Self {
280        let mut this = Self {
281            metadata: HashMap::new(),
282            spans,
283            local_spans,
284            current_execution: CurrentExecution::default(),
285        };
286
287        for (id, data) in metadata.inner {
288            this.on_new_call_site(id, data);
289        }
290        this
291    }
292
293    fn dispatch<T>(dispatch_fn: impl FnOnce(&Dispatch) -> T) -> T {
294        dispatch_fn(&dispatcher::get_default(Dispatch::clone))
295    }
296
297    fn metadata(&self, id: MetadataId) -> Result<&'static Metadata<'static>, ReceiveError> {
298        self.metadata
299            .get(&id)
300            .copied()
301            .ok_or(ReceiveError::UnknownMetadataId(id))
302    }
303
304    fn span(&self, id: RawSpanId) -> Result<&SpanData, ReceiveError> {
305        self.spans
306            .inner
307            .get(&id)
308            .ok_or(ReceiveError::UnknownSpanId(id))
309    }
310
311    fn span_mut(&mut self, id: RawSpanId) -> Result<&mut SpanData, ReceiveError> {
312        self.spans
313            .inner
314            .get_mut(&id)
315            .ok_or(ReceiveError::UnknownSpanId(id))
316    }
317
318    /// Returns `Ok(None)` if the local span ID is (validly) not set yet, and `Err(_)`
319    /// if it must have been set by this point.
320    fn map_span_id(&self, remote_id: RawSpanId) -> Result<Option<&Id>, ReceiveError> {
321        match self.local_spans.inner.get(&remote_id) {
322            Some(local_id) => Ok(Some(local_id)),
323            None => {
324                // Check if the the referenced span is alive.
325                if self.spans.inner.contains_key(&remote_id) {
326                    Ok(None)
327                } else {
328                    Err(ReceiveError::UnknownSpanId(remote_id))
329                }
330            }
331        }
332    }
333
334    fn ensure_values_len(values: &TracedValues<String>) -> Result<(), ReceiveError> {
335        if values.len() > Self::MAX_VALUES {
336            return Err(ReceiveError::TooManyValues {
337                actual: values.len(),
338                max: Self::MAX_VALUES,
339            });
340        }
341        Ok(())
342    }
343
344    fn generate_fields<'a>(
345        metadata: &'static Metadata<'static>,
346        values: &'a TracedValues<String>,
347    ) -> Vec<(Field, CowValue<'a>)> {
348        let fields = metadata.fields();
349        values
350            .iter()
351            .filter_map(|(field_name, value)| {
352                fields
353                    .field(field_name)
354                    .map(|field| (field, value.as_value()))
355            })
356            .collect()
357    }
358
359    fn expand_fields<'a>(
360        values: &'a [(Field, CowValue<'_>)],
361    ) -> Vec<(&'a Field, Option<&'a dyn Value>)> {
362        values
363            .iter()
364            .map(|(field, value)| (field, Some(value.as_ref())))
365            .collect()
366    }
367
368    fn create_values<'a>(
369        fields: &'a FieldSet,
370        values: &'a [(&Field, Option<&dyn Value>)],
371    ) -> ValueSet<'a> {
372        create_value_set!(
373            fields,
374            values,
375            [
376                1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23,
377                24, 25, 26, 27, 28, 29, 30, 31, 32,
378            ]
379        )
380    }
381
382    fn on_new_call_site(&mut self, id: MetadataId, data: CallSiteData) {
383        let (metadata, is_new) = ARENA.alloc_metadata(data);
384        self.metadata.insert(id, metadata);
385        if is_new {
386            Self::dispatch(|dispatch| dispatch.register_callsite(metadata));
387        }
388    }
389
390    fn create_local_span(&self, data: &SpanData) -> Result<Id, ReceiveError> {
391        let metadata = self.metadata(data.metadata_id)?;
392        let local_parent_id = data
393            .parent_id
394            .map(|parent_id| self.map_span_id(parent_id))
395            .transpose()?
396            .flatten();
397
398        let value_set = Self::generate_fields(metadata, &data.values);
399        let value_set = Self::expand_fields(&value_set);
400        let value_set = Self::create_values(metadata.fields(), &value_set);
401        let attributes = if let Some(local_parent_id) = local_parent_id {
402            Attributes::child_of(local_parent_id.clone(), metadata, &value_set)
403        } else {
404            Attributes::new(metadata, &value_set)
405        };
406
407        Ok(Self::dispatch(|dispatch| dispatch.new_span(&attributes)))
408    }
409
410    /// Tries to consume an event and relays it to the tracing infrastructure.
411    ///
412    /// # Errors
413    ///
414    /// Fails if the event contains a bogus reference to a call site or a span, or if it contains
415    /// too many values. In general, an error can mean that the consumer was restored
416    /// from an incorrect persisted state, or that the event generator is bogus (e.g.,
417    /// not a [`TracingEventSender`]).
418    ///
419    /// [`TracingEventSender`]: crate::TracingEventSender
420    #[allow(clippy::missing_panics_doc, clippy::map_entry)] // false positive
421    pub fn try_receive(&mut self, event: TracingEvent) -> Result<(), ReceiveError> {
422        match event {
423            TracingEvent::NewCallSite { id, data } => {
424                self.on_new_call_site(id, data);
425            }
426
427            TracingEvent::NewSpan {
428                id,
429                parent_id,
430                metadata_id,
431                values,
432            } => {
433                Self::ensure_values_len(&values)?;
434
435                let data = SpanData {
436                    metadata_id,
437                    parent_id,
438                    ref_count: 1,
439                    values,
440                };
441                if !self.local_spans.inner.contains_key(&id) {
442                    let local_id = self.create_local_span(&data)?;
443                    self.local_spans.inner.insert(id, local_id);
444                }
445                self.spans.inner.insert(id, data);
446                self.current_execution.uncommitted_span_ids.insert(id);
447            }
448
449            TracingEvent::FollowsFrom { id, follows_from } => {
450                let local_id = self.map_span_id(id)?;
451                let local_follows_from = self.map_span_id(follows_from)?;
452
453                // TODO: properly handle remaining cases
454                if let (Some(id), Some(follows_from)) = (local_id, local_follows_from) {
455                    Self::dispatch(|dispatch| {
456                        dispatch.record_follows_from(id, follows_from);
457                    });
458                }
459            }
460
461            TracingEvent::SpanEntered { id } => {
462                let local_id = if let Some(id) = self.map_span_id(id)? {
463                    id.clone()
464                } else {
465                    let data = self.span(id)?;
466                    let local_id = self.create_local_span(data)?;
467                    self.local_spans.inner.insert(id, local_id.clone());
468                    local_id
469                };
470                self.current_execution.entered_span_ids.insert(id);
471                Self::dispatch(|dispatch| dispatch.enter(&local_id));
472            }
473            TracingEvent::SpanExited { id } => {
474                if let Some(local_id) = self.map_span_id(id)? {
475                    Self::dispatch(|dispatch| dispatch.exit(local_id));
476                }
477                self.current_execution.entered_span_ids.remove(&id);
478            }
479
480            TracingEvent::SpanCloned { id } => {
481                let span = self.span_mut(id)?;
482                span.ref_count += 1;
483                // Dispatcher is intentionally not called: we handle ref counting locally.
484            }
485            TracingEvent::SpanDropped { id } => {
486                let span = self.span_mut(id)?;
487                span.ref_count -= 1;
488                if span.ref_count == 0 {
489                    self.spans.inner.remove(&id);
490                    self.current_execution.remove_span(id);
491                    if let Some(local_id) = self.local_spans.inner.remove(&id) {
492                        Self::dispatch(|dispatch| dispatch.try_close(local_id.clone()));
493                    }
494                }
495            }
496
497            TracingEvent::ValuesRecorded { id, values } => {
498                Self::ensure_values_len(&values)?;
499
500                if let Some(local_id) = self.map_span_id(id)? {
501                    let metadata = self.metadata(self.spans.inner[&id].metadata_id)?;
502                    let values = Self::generate_fields(metadata, &values);
503                    let values = Self::expand_fields(&values);
504                    let values = Self::create_values(metadata.fields(), &values);
505                    let values = Record::new(&values);
506                    Self::dispatch(|dispatch| dispatch.record(local_id, &values));
507                }
508                let span = self.span_mut(id)?;
509                span.values.extend(values);
510            }
511
512            TracingEvent::NewEvent {
513                metadata_id,
514                parent,
515                values,
516            } => {
517                Self::ensure_values_len(&values)?;
518
519                let metadata = self.metadata(metadata_id)?;
520                let values = Self::generate_fields(metadata, &values);
521                let values = Self::expand_fields(&values);
522                let values = Self::create_values(metadata.fields(), &values);
523                let parent = parent.map(|id| self.map_span_id(id)).transpose()?.flatten();
524                let event = if let Some(parent) = parent {
525                    Event::new_child_of(parent.clone(), metadata, &values)
526                } else {
527                    Event::new(metadata, &values)
528                };
529                Self::dispatch(|dispatch| dispatch.event(&event));
530            }
531        }
532        Ok(())
533    }
534
535    /// Consumes an event and relays it to the tracing infrastructure.
536    ///
537    /// # Panics
538    ///
539    /// Panics in the same cases when [`Self::try_receive()`] returns an error.
540    pub fn receive(&mut self, event: TracingEvent) {
541        self.try_receive(event)
542            .expect("received bogus tracing event");
543    }
544
545    /// Persists [`Metadata`] produced by the previously consumed events. The returned
546    /// metadata should be merged into the metadata provided to [`Self::new()`].
547    pub fn persist_metadata(&self) -> PersistedMetadata {
548        let inner = self
549            .metadata
550            .iter()
551            .map(|(&id, &metadata)| (id, CallSiteData::from(metadata)))
552            .collect();
553        PersistedMetadata { inner }
554    }
555
556    /// Returns persisted and local spans.
557    pub fn persist(mut self) -> (PersistedSpans, LocalSpans) {
558        self.current_execution.uncommitted_span_ids.clear();
559        let spans = mem::take(&mut self.spans);
560        let local_spans = mem::take(&mut self.local_spans);
561        self.current_execution.finalize(&local_spans);
562        (spans, local_spans)
563    }
564}
565
566impl Drop for TracingEventReceiver {
567    fn drop(&mut self) {
568        self.current_execution.finalize(&self.local_spans);
569    }
570}