1use std::{
4 collections::{HashMap, HashSet},
5 error, fmt, mem,
6};
7
8use serde::{Deserialize, Serialize};
9use tracing_core::{
10 dispatcher::{self, Dispatch},
11 field::{self, FieldSet, Value, ValueSet},
12 span::{Attributes, Id, Record},
13 Event, Field, Metadata,
14};
15
16use self::arena::ARENA;
17use crate::{CallSiteData, MetadataId, RawSpanId, TracedValue, TracedValues, TracingEvent};
18
19mod arena;
20#[cfg(test)]
21mod tests;
22
23enum CowValue<'a> {
24 Borrowed(&'a dyn Value),
25 Owned(Box<dyn Value + 'a>),
26}
27
28impl fmt::Debug for CowValue<'_> {
29 fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
30 match self {
31 Self::Borrowed(_) => formatter.debug_struct("Borrowed").finish_non_exhaustive(),
32 Self::Owned(_) => formatter.debug_struct("Owned").finish_non_exhaustive(),
33 }
34 }
35}
36
37impl<'a> CowValue<'a> {
38 fn as_ref(&self) -> &(dyn Value + 'a) {
39 match self {
40 Self::Borrowed(value) => value,
41 Self::Owned(boxed) => boxed.as_ref(),
42 }
43 }
44}
45
46impl TracedValue {
47 fn as_value(&self) -> CowValue<'_> {
48 CowValue::Borrowed(match self {
49 Self::Bool(value) => value,
50 Self::Int(value) => value,
51 Self::UInt(value) => value,
52 Self::Float(value) => value,
53 Self::String(value) => value,
54 Self::Object(value) => return CowValue::Owned(Box::new(field::debug(value))),
55 Self::Error(err) => {
56 let err = err as &(dyn error::Error + 'static);
57 return CowValue::Owned(Box::new(err));
58 }
59 })
60 }
61}
62
63#[derive(Debug, Clone, Serialize, Deserialize)]
64struct SpanData {
65 metadata_id: MetadataId,
66 #[serde(default, skip_serializing_if = "Option::is_none")]
67 parent_id: Option<RawSpanId>,
68 ref_count: usize,
69 values: TracedValues<String>,
70}
71
72#[derive(Debug, Clone, Default, Serialize, Deserialize)]
82#[serde(transparent)]
83pub struct PersistedMetadata {
84 inner: HashMap<MetadataId, CallSiteData>,
85}
86
87impl PersistedMetadata {
88 pub fn len(&self) -> usize {
90 self.inner.len()
91 }
92
93 pub fn is_empty(&self) -> bool {
95 self.inner.is_empty()
96 }
97
98 pub fn iter(&self) -> impl Iterator<Item = (MetadataId, &CallSiteData)> + '_ {
101 self.inner.iter().map(|(id, data)| (*id, data))
102 }
103
104 pub fn extend(&mut self, other: Self) {
106 self.inner.extend(other.inner);
107 }
108}
109
110#[derive(Debug, Clone, Default, Serialize, Deserialize)]
119#[serde(transparent)]
120pub struct PersistedSpans {
121 inner: HashMap<RawSpanId, SpanData>,
122}
123
124impl PersistedSpans {
125 pub fn len(&self) -> usize {
127 self.inner.len()
128 }
129
130 pub fn is_empty(&self) -> bool {
132 self.inner.is_empty()
133 }
134}
135
136#[derive(Debug, Default)]
145pub struct LocalSpans {
146 inner: HashMap<RawSpanId, Id>,
147}
148
149#[derive(Debug)]
151#[non_exhaustive]
152pub enum ReceiveError {
153 UnknownMetadataId(MetadataId),
155 UnknownSpanId(RawSpanId),
157 TooManyValues {
159 max: usize,
161 actual: usize,
163 },
164}
165
166impl fmt::Display for ReceiveError {
167 fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
168 match self {
169 Self::UnknownMetadataId(id) => write!(formatter, "unknown metadata ID: {id}"),
170 Self::UnknownSpanId(id) => write!(formatter, "unknown span ID: {id}"),
171 Self::TooManyValues { max, actual } => write!(
172 formatter,
173 "too many values provided ({actual}), should be no more than {max}"
174 ),
175 }
176 }
177}
178
179impl error::Error for ReceiveError {}
180
181macro_rules! create_value_set {
182 ($fields:ident, $values:ident, [$($i:expr,)+]) => {
183 match $values.len() {
184 0 => $fields.value_set(&[]),
185 $(
186 $i => $fields.value_set(<&[_; $i]>::try_from($values).unwrap()),
187 )+
188 _ => unreachable!(),
189 }
190 };
191}
192
193#[derive(Debug, Default)]
195struct CurrentExecution {
196 uncommitted_span_ids: HashSet<RawSpanId>,
197 entered_span_ids: HashSet<RawSpanId>,
198}
199
200impl CurrentExecution {
201 fn remove_span(&mut self, id: RawSpanId) {
202 self.entered_span_ids.remove(&id);
203 self.uncommitted_span_ids.remove(&id);
204 }
205
206 fn finalize(&mut self, local_spans: &LocalSpans) {
207 for id in mem::take(&mut self.entered_span_ids) {
208 if let Some(local_id) = local_spans.inner.get(&id) {
209 TracingEventReceiver::dispatch(|dispatch| dispatch.exit(local_id));
210 }
211 }
212 for id in mem::take(&mut self.uncommitted_span_ids) {
213 if let Some(local_id) = local_spans.inner.get(&id) {
214 TracingEventReceiver::dispatch(|dispatch| dispatch.try_close(local_id.clone()));
215 }
216 }
217 }
218}
219
220#[derive(Debug, Default)]
255pub struct TracingEventReceiver {
256 metadata: HashMap<MetadataId, &'static Metadata<'static>>,
257 spans: PersistedSpans,
258 local_spans: LocalSpans,
259 current_execution: CurrentExecution,
260}
261
262impl TracingEventReceiver {
263 const MAX_VALUES: usize = 32;
265
266 pub fn new(
276 metadata: PersistedMetadata,
277 spans: PersistedSpans,
278 local_spans: LocalSpans,
279 ) -> Self {
280 let mut this = Self {
281 metadata: HashMap::new(),
282 spans,
283 local_spans,
284 current_execution: CurrentExecution::default(),
285 };
286
287 for (id, data) in metadata.inner {
288 this.on_new_call_site(id, data);
289 }
290 this
291 }
292
293 fn dispatch<T>(dispatch_fn: impl FnOnce(&Dispatch) -> T) -> T {
294 dispatch_fn(&dispatcher::get_default(Dispatch::clone))
295 }
296
297 fn metadata(&self, id: MetadataId) -> Result<&'static Metadata<'static>, ReceiveError> {
298 self.metadata
299 .get(&id)
300 .copied()
301 .ok_or(ReceiveError::UnknownMetadataId(id))
302 }
303
304 fn span(&self, id: RawSpanId) -> Result<&SpanData, ReceiveError> {
305 self.spans
306 .inner
307 .get(&id)
308 .ok_or(ReceiveError::UnknownSpanId(id))
309 }
310
311 fn span_mut(&mut self, id: RawSpanId) -> Result<&mut SpanData, ReceiveError> {
312 self.spans
313 .inner
314 .get_mut(&id)
315 .ok_or(ReceiveError::UnknownSpanId(id))
316 }
317
318 fn map_span_id(&self, remote_id: RawSpanId) -> Result<Option<&Id>, ReceiveError> {
321 match self.local_spans.inner.get(&remote_id) {
322 Some(local_id) => Ok(Some(local_id)),
323 None => {
324 if self.spans.inner.contains_key(&remote_id) {
326 Ok(None)
327 } else {
328 Err(ReceiveError::UnknownSpanId(remote_id))
329 }
330 }
331 }
332 }
333
334 fn ensure_values_len(values: &TracedValues<String>) -> Result<(), ReceiveError> {
335 if values.len() > Self::MAX_VALUES {
336 return Err(ReceiveError::TooManyValues {
337 actual: values.len(),
338 max: Self::MAX_VALUES,
339 });
340 }
341 Ok(())
342 }
343
344 fn generate_fields<'a>(
345 metadata: &'static Metadata<'static>,
346 values: &'a TracedValues<String>,
347 ) -> Vec<(Field, CowValue<'a>)> {
348 let fields = metadata.fields();
349 values
350 .iter()
351 .filter_map(|(field_name, value)| {
352 fields
353 .field(field_name)
354 .map(|field| (field, value.as_value()))
355 })
356 .collect()
357 }
358
359 fn expand_fields<'a>(
360 values: &'a [(Field, CowValue<'_>)],
361 ) -> Vec<(&'a Field, Option<&'a dyn Value>)> {
362 values
363 .iter()
364 .map(|(field, value)| (field, Some(value.as_ref())))
365 .collect()
366 }
367
368 fn create_values<'a>(
369 fields: &'a FieldSet,
370 values: &'a [(&Field, Option<&dyn Value>)],
371 ) -> ValueSet<'a> {
372 create_value_set!(
373 fields,
374 values,
375 [
376 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23,
377 24, 25, 26, 27, 28, 29, 30, 31, 32,
378 ]
379 )
380 }
381
382 fn on_new_call_site(&mut self, id: MetadataId, data: CallSiteData) {
383 let (metadata, is_new) = ARENA.alloc_metadata(data);
384 self.metadata.insert(id, metadata);
385 if is_new {
386 Self::dispatch(|dispatch| dispatch.register_callsite(metadata));
387 }
388 }
389
390 fn create_local_span(&self, data: &SpanData) -> Result<Id, ReceiveError> {
391 let metadata = self.metadata(data.metadata_id)?;
392 let local_parent_id = data
393 .parent_id
394 .map(|parent_id| self.map_span_id(parent_id))
395 .transpose()?
396 .flatten();
397
398 let value_set = Self::generate_fields(metadata, &data.values);
399 let value_set = Self::expand_fields(&value_set);
400 let value_set = Self::create_values(metadata.fields(), &value_set);
401 let attributes = if let Some(local_parent_id) = local_parent_id {
402 Attributes::child_of(local_parent_id.clone(), metadata, &value_set)
403 } else {
404 Attributes::new(metadata, &value_set)
405 };
406
407 Ok(Self::dispatch(|dispatch| dispatch.new_span(&attributes)))
408 }
409
410 #[allow(clippy::missing_panics_doc, clippy::map_entry)] pub fn try_receive(&mut self, event: TracingEvent) -> Result<(), ReceiveError> {
422 match event {
423 TracingEvent::NewCallSite { id, data } => {
424 self.on_new_call_site(id, data);
425 }
426
427 TracingEvent::NewSpan {
428 id,
429 parent_id,
430 metadata_id,
431 values,
432 } => {
433 Self::ensure_values_len(&values)?;
434
435 let data = SpanData {
436 metadata_id,
437 parent_id,
438 ref_count: 1,
439 values,
440 };
441 if !self.local_spans.inner.contains_key(&id) {
442 let local_id = self.create_local_span(&data)?;
443 self.local_spans.inner.insert(id, local_id);
444 }
445 self.spans.inner.insert(id, data);
446 self.current_execution.uncommitted_span_ids.insert(id);
447 }
448
449 TracingEvent::FollowsFrom { id, follows_from } => {
450 let local_id = self.map_span_id(id)?;
451 let local_follows_from = self.map_span_id(follows_from)?;
452
453 if let (Some(id), Some(follows_from)) = (local_id, local_follows_from) {
455 Self::dispatch(|dispatch| {
456 dispatch.record_follows_from(id, follows_from);
457 });
458 }
459 }
460
461 TracingEvent::SpanEntered { id } => {
462 let local_id = if let Some(id) = self.map_span_id(id)? {
463 id.clone()
464 } else {
465 let data = self.span(id)?;
466 let local_id = self.create_local_span(data)?;
467 self.local_spans.inner.insert(id, local_id.clone());
468 local_id
469 };
470 self.current_execution.entered_span_ids.insert(id);
471 Self::dispatch(|dispatch| dispatch.enter(&local_id));
472 }
473 TracingEvent::SpanExited { id } => {
474 if let Some(local_id) = self.map_span_id(id)? {
475 Self::dispatch(|dispatch| dispatch.exit(local_id));
476 }
477 self.current_execution.entered_span_ids.remove(&id);
478 }
479
480 TracingEvent::SpanCloned { id } => {
481 let span = self.span_mut(id)?;
482 span.ref_count += 1;
483 }
485 TracingEvent::SpanDropped { id } => {
486 let span = self.span_mut(id)?;
487 span.ref_count -= 1;
488 if span.ref_count == 0 {
489 self.spans.inner.remove(&id);
490 self.current_execution.remove_span(id);
491 if let Some(local_id) = self.local_spans.inner.remove(&id) {
492 Self::dispatch(|dispatch| dispatch.try_close(local_id.clone()));
493 }
494 }
495 }
496
497 TracingEvent::ValuesRecorded { id, values } => {
498 Self::ensure_values_len(&values)?;
499
500 if let Some(local_id) = self.map_span_id(id)? {
501 let metadata = self.metadata(self.spans.inner[&id].metadata_id)?;
502 let values = Self::generate_fields(metadata, &values);
503 let values = Self::expand_fields(&values);
504 let values = Self::create_values(metadata.fields(), &values);
505 let values = Record::new(&values);
506 Self::dispatch(|dispatch| dispatch.record(local_id, &values));
507 }
508 let span = self.span_mut(id)?;
509 span.values.extend(values);
510 }
511
512 TracingEvent::NewEvent {
513 metadata_id,
514 parent,
515 values,
516 } => {
517 Self::ensure_values_len(&values)?;
518
519 let metadata = self.metadata(metadata_id)?;
520 let values = Self::generate_fields(metadata, &values);
521 let values = Self::expand_fields(&values);
522 let values = Self::create_values(metadata.fields(), &values);
523 let parent = parent.map(|id| self.map_span_id(id)).transpose()?.flatten();
524 let event = if let Some(parent) = parent {
525 Event::new_child_of(parent.clone(), metadata, &values)
526 } else {
527 Event::new(metadata, &values)
528 };
529 Self::dispatch(|dispatch| dispatch.event(&event));
530 }
531 }
532 Ok(())
533 }
534
535 pub fn receive(&mut self, event: TracingEvent) {
541 self.try_receive(event)
542 .expect("received bogus tracing event");
543 }
544
545 pub fn persist_metadata(&self) -> PersistedMetadata {
548 let inner = self
549 .metadata
550 .iter()
551 .map(|(&id, &metadata)| (id, CallSiteData::from(metadata)))
552 .collect();
553 PersistedMetadata { inner }
554 }
555
556 pub fn persist(mut self) -> (PersistedSpans, LocalSpans) {
558 self.current_execution.uncommitted_span_ids.clear();
559 let spans = mem::take(&mut self.spans);
560 let local_spans = mem::take(&mut self.local_spans);
561 self.current_execution.finalize(&local_spans);
562 (spans, local_spans)
563 }
564}
565
566impl Drop for TracingEventReceiver {
567 fn drop(&mut self) {
568 self.current_execution.finalize(&self.local_spans);
569 }
570}