eventually/
tracing.rs

1//! Module containing some extension traits to support code instrumentation
2//! using the `tracing` crate.
3
4use std::fmt::Debug;
5use std::marker::PhantomData;
6
7use async_trait::async_trait;
8use tracing::instrument;
9
10use crate::aggregate::Aggregate;
11use crate::version::{self, Version};
12use crate::{aggregate, event, message};
13
14/// [`aggregate::Repository`] type wrapper that provides instrumentation
15/// features through the `tracing` crate.
16#[derive(Debug, Clone)]
17pub struct InstrumentedAggregateRepository<T, Inner>
18where
19    T: Aggregate + Debug,
20    <T as Aggregate>::Id: Debug,
21    <T as Aggregate>::Event: Debug,
22    Inner: aggregate::Repository<T>,
23{
24    inner: Inner,
25    t: PhantomData<T>,
26}
27
28#[async_trait]
29impl<T, Inner> aggregate::repository::Getter<T> for InstrumentedAggregateRepository<T, Inner>
30where
31    T: Aggregate + Debug,
32    <T as Aggregate>::Id: Debug,
33    <T as Aggregate>::Event: Debug,
34    Inner: aggregate::Repository<T>,
35{
36    #[allow(clippy::blocks_in_conditions)] // NOTE(ar3s3ru): seems to be a false positive.
37    #[instrument(name = "aggregate::repository::Getter.get", ret, err, skip(self))]
38    async fn get(&self, id: &T::Id) -> Result<aggregate::Root<T>, aggregate::repository::GetError> {
39        self.inner.get(id).await
40    }
41}
42
43#[async_trait]
44impl<T, Inner> aggregate::repository::Saver<T> for InstrumentedAggregateRepository<T, Inner>
45where
46    T: Aggregate + Debug,
47    <T as Aggregate>::Id: Debug,
48    <T as Aggregate>::Event: Debug,
49    Inner: aggregate::Repository<T>,
50{
51    #[allow(clippy::blocks_in_conditions)] // NOTE(ar3s3ru): seems to be a false positive.
52    #[instrument(name = "aggregate::repository::Saver.save", ret, err, skip(self))]
53    async fn save(
54        &self,
55        root: &mut aggregate::Root<T>,
56    ) -> Result<(), aggregate::repository::SaveError> {
57        self.inner.save(root).await
58    }
59}
60
61/// Extension trait for any [`aggregate::Repository`] type to provide
62/// instrumentation features through the `tracing` crate.
63pub trait AggregateRepositoryExt<T>: aggregate::Repository<T> + Sized
64where
65    T: Aggregate + Debug,
66    <T as Aggregate>::Id: Debug,
67    <T as Aggregate>::Event: Debug,
68{
69    /// Returns an instrumented version of the [`aggregate::Repository`] instance.
70    fn with_tracing(self) -> InstrumentedAggregateRepository<T, Self> {
71        InstrumentedAggregateRepository {
72            inner: self,
73            t: PhantomData,
74        }
75    }
76}
77
78impl<R, T> AggregateRepositoryExt<T> for R
79where
80    R: aggregate::Repository<T>,
81    T: Aggregate + Debug,
82    <T as Aggregate>::Id: Debug,
83    <T as Aggregate>::Event: Debug,
84{
85}
86
87/// [`event::Store`] type wrapper that provides instrumentation
88/// features through the `tracing` crate.
89#[derive(Debug, Clone)]
90pub struct InstrumentedEventStore<T, StreamId, Event>
91where
92    T: event::Store<StreamId, Event> + Send + Sync,
93    StreamId: Debug + Send + Sync,
94    Event: message::Message + Debug + Send + Sync,
95{
96    store: T,
97    stream_id: PhantomData<StreamId>,
98    event: PhantomData<Event>,
99}
100
101impl<T, StreamId, Event> event::store::Streamer<StreamId, Event>
102    for InstrumentedEventStore<T, StreamId, Event>
103where
104    T: event::Store<StreamId, Event> + Send + Sync,
105    StreamId: Debug + Send + Sync,
106    Event: message::Message + Debug + Send + Sync,
107{
108    type Error = <T as event::store::Streamer<StreamId, Event>>::Error;
109
110    #[instrument(name = "event::Store.stream", skip(self))]
111    fn stream(
112        &self,
113        id: &StreamId,
114        select: event::VersionSelect,
115    ) -> event::Stream<StreamId, Event, Self::Error> {
116        self.store.stream(id, select)
117    }
118}
119
120#[async_trait]
121impl<T, StreamId, Event> event::store::Appender<StreamId, Event>
122    for InstrumentedEventStore<T, StreamId, Event>
123where
124    T: event::Store<StreamId, Event> + Send + Sync,
125    StreamId: Debug + Send + Sync,
126    Event: message::Message + Debug + Send + Sync,
127{
128    #[allow(clippy::blocks_in_conditions)] // NOTE(ar3s3ru): seems to be a false positive.
129    #[instrument(name = "event::Store.append", ret, err, skip(self))]
130    async fn append(
131        &self,
132        id: StreamId,
133        version_check: version::Check,
134        events: Vec<event::Envelope<Event>>,
135    ) -> Result<Version, event::store::AppendError> {
136        self.store.append(id, version_check, events).await
137    }
138}
139
140/// Extension trait for any [`event::Store`] type to provide
141/// instrumentation features through the `tracing` crate.
142pub trait EventStoreExt<StreamId, Event>: event::Store<StreamId, Event> + Sized
143where
144    StreamId: Debug + Send + Sync,
145    Event: message::Message + Debug + Send + Sync,
146{
147    /// Returns an instrumented version of the [`event::Store`] instance.
148    fn with_tracing(self) -> InstrumentedEventStore<Self, StreamId, Event> {
149        InstrumentedEventStore {
150            store: self,
151            stream_id: PhantomData,
152            event: PhantomData,
153        }
154    }
155}
156
157impl<T, StreamId, Event> EventStoreExt<StreamId, Event> for T
158where
159    T: event::Store<StreamId, Event> + Send + Sync,
160    StreamId: Debug + Send + Sync,
161    Event: message::Message + Debug + Send + Sync,
162{
163}