1use std::fmt::Debug;
5use std::marker::PhantomData;
6
7use async_trait::async_trait;
8use tracing::instrument;
9
10use crate::aggregate::Aggregate;
11use crate::version::{self, Version};
12use crate::{aggregate, event, message};
13
14#[derive(Debug, Clone)]
17pub struct InstrumentedAggregateRepository<T, Inner>
18where
19 T: Aggregate + Debug,
20 <T as Aggregate>::Id: Debug,
21 <T as Aggregate>::Event: Debug,
22 Inner: aggregate::Repository<T>,
23{
24 inner: Inner,
25 t: PhantomData<T>,
26}
27
28#[async_trait]
29impl<T, Inner> aggregate::repository::Getter<T> for InstrumentedAggregateRepository<T, Inner>
30where
31 T: Aggregate + Debug,
32 <T as Aggregate>::Id: Debug,
33 <T as Aggregate>::Event: Debug,
34 Inner: aggregate::Repository<T>,
35{
36 #[allow(clippy::blocks_in_conditions)] #[instrument(name = "aggregate::repository::Getter.get", ret, err, skip(self))]
38 async fn get(&self, id: &T::Id) -> Result<aggregate::Root<T>, aggregate::repository::GetError> {
39 self.inner.get(id).await
40 }
41}
42
43#[async_trait]
44impl<T, Inner> aggregate::repository::Saver<T> for InstrumentedAggregateRepository<T, Inner>
45where
46 T: Aggregate + Debug,
47 <T as Aggregate>::Id: Debug,
48 <T as Aggregate>::Event: Debug,
49 Inner: aggregate::Repository<T>,
50{
51 #[allow(clippy::blocks_in_conditions)] #[instrument(name = "aggregate::repository::Saver.save", ret, err, skip(self))]
53 async fn save(
54 &self,
55 root: &mut aggregate::Root<T>,
56 ) -> Result<(), aggregate::repository::SaveError> {
57 self.inner.save(root).await
58 }
59}
60
61pub trait AggregateRepositoryExt<T>: aggregate::Repository<T> + Sized
64where
65 T: Aggregate + Debug,
66 <T as Aggregate>::Id: Debug,
67 <T as Aggregate>::Event: Debug,
68{
69 fn with_tracing(self) -> InstrumentedAggregateRepository<T, Self> {
71 InstrumentedAggregateRepository {
72 inner: self,
73 t: PhantomData,
74 }
75 }
76}
77
78impl<R, T> AggregateRepositoryExt<T> for R
79where
80 R: aggregate::Repository<T>,
81 T: Aggregate + Debug,
82 <T as Aggregate>::Id: Debug,
83 <T as Aggregate>::Event: Debug,
84{
85}
86
87#[derive(Debug, Clone)]
90pub struct InstrumentedEventStore<T, StreamId, Event>
91where
92 T: event::Store<StreamId, Event> + Send + Sync,
93 StreamId: Debug + Send + Sync,
94 Event: message::Message + Debug + Send + Sync,
95{
96 store: T,
97 stream_id: PhantomData<StreamId>,
98 event: PhantomData<Event>,
99}
100
101impl<T, StreamId, Event> event::store::Streamer<StreamId, Event>
102 for InstrumentedEventStore<T, StreamId, Event>
103where
104 T: event::Store<StreamId, Event> + Send + Sync,
105 StreamId: Debug + Send + Sync,
106 Event: message::Message + Debug + Send + Sync,
107{
108 type Error = <T as event::store::Streamer<StreamId, Event>>::Error;
109
110 #[instrument(name = "event::Store.stream", skip(self))]
111 fn stream(
112 &self,
113 id: &StreamId,
114 select: event::VersionSelect,
115 ) -> event::Stream<StreamId, Event, Self::Error> {
116 self.store.stream(id, select)
117 }
118}
119
120#[async_trait]
121impl<T, StreamId, Event> event::store::Appender<StreamId, Event>
122 for InstrumentedEventStore<T, StreamId, Event>
123where
124 T: event::Store<StreamId, Event> + Send + Sync,
125 StreamId: Debug + Send + Sync,
126 Event: message::Message + Debug + Send + Sync,
127{
128 #[allow(clippy::blocks_in_conditions)] #[instrument(name = "event::Store.append", ret, err, skip(self))]
130 async fn append(
131 &self,
132 id: StreamId,
133 version_check: version::Check,
134 events: Vec<event::Envelope<Event>>,
135 ) -> Result<Version, event::store::AppendError> {
136 self.store.append(id, version_check, events).await
137 }
138}
139
140pub trait EventStoreExt<StreamId, Event>: event::Store<StreamId, Event> + Sized
143where
144 StreamId: Debug + Send + Sync,
145 Event: message::Message + Debug + Send + Sync,
146{
147 fn with_tracing(self) -> InstrumentedEventStore<Self, StreamId, Event> {
149 InstrumentedEventStore {
150 store: self,
151 stream_id: PhantomData,
152 event: PhantomData,
153 }
154 }
155}
156
157impl<T, StreamId, Event> EventStoreExt<StreamId, Event> for T
158where
159 T: event::Store<StreamId, Event> + Send + Sync,
160 StreamId: Debug + Send + Sync,
161 Event: message::Message + Debug + Send + Sync,
162{
163}