eventually/aggregate/
repository.rs1use std::fmt::Debug;
8use std::marker::PhantomData;
9
10use async_trait::async_trait;
11use futures::TryStreamExt;
12
13use crate::aggregate::Aggregate;
14use crate::{aggregate, event, version};
15
16#[derive(Debug, thiserror::Error)]
18pub enum GetError {
19 #[error("failed to get aggregate root: not found")]
21 NotFound,
22 #[error("failed to get aggregate root, an error occurred: {0}")]
24 Internal(#[from] anyhow::Error),
25}
26
27#[async_trait]
30pub trait Getter<T>: Send + Sync
31where
32 T: Aggregate,
33{
34 async fn get(&self, id: &T::Id) -> Result<aggregate::Root<T>, GetError>;
37}
38
39#[derive(Debug, thiserror::Error)]
41pub enum SaveError {
42 #[error("failed to save aggregate root: {0}")]
44 Conflict(#[from] version::ConflictError),
45 #[error("failed to save aggregate root, an error occurred: {0}")]
47 Internal(#[from] anyhow::Error),
48}
49
50#[async_trait]
53pub trait Saver<T>: Send + Sync
54where
55 T: Aggregate,
56{
57 async fn save(&self, root: &mut aggregate::Root<T>) -> Result<(), SaveError>;
59}
60
61pub trait Repository<T>: Getter<T> + Saver<T> + Send + Sync
64where
65 T: Aggregate,
66{
67}
68
69impl<T, R> Repository<T> for R
70where
71 T: Aggregate,
72 R: Getter<T> + Saver<T> + Send + Sync,
73{
74}
75
76#[derive(Debug, Clone)]
82pub struct EventSourced<T, S>
83where
84 T: Aggregate,
85 S: event::Store<T::Id, T::Event>,
86{
87 store: S,
88 aggregate: PhantomData<T>,
89}
90
91impl<T, S> From<S> for EventSourced<T, S>
92where
93 T: Aggregate,
94 S: event::Store<T::Id, T::Event>,
95{
96 fn from(store: S) -> Self {
97 Self {
98 store,
99 aggregate: PhantomData,
100 }
101 }
102}
103
104#[async_trait]
105impl<T, S> Getter<T> for EventSourced<T, S>
106where
107 T: Aggregate,
108 T::Id: Clone,
109 T::Error: std::error::Error + Send + Sync + 'static,
110 S: event::Store<T::Id, T::Event>,
111 <S as event::store::Streamer<T::Id, T::Event>>::Error:
112 std::error::Error + Send + Sync + 'static,
113{
114 async fn get(&self, id: &T::Id) -> Result<aggregate::Root<T>, GetError> {
115 let stream = self
116 .store
117 .stream(id, event::VersionSelect::All)
118 .map_ok(|persisted| persisted.event);
119
120 let ctx = aggregate::Root::<T>::rehydrate_async(stream)
121 .await
122 .map_err(anyhow::Error::from)
123 .map_err(GetError::Internal)?;
124
125 ctx.ok_or(GetError::NotFound)
126 }
127}
128
129#[async_trait]
130impl<T, S> Saver<T> for EventSourced<T, S>
131where
132 T: Aggregate,
133 T::Id: Clone,
134 S: event::Store<T::Id, T::Event>,
135{
136 async fn save(&self, root: &mut aggregate::Root<T>) -> Result<(), SaveError> {
137 let events_to_commit = root.take_uncommitted_events();
138 let aggregate_id = root.aggregate_id();
139
140 if events_to_commit.is_empty() {
141 return Ok(());
142 }
143
144 let current_event_stream_version =
145 root.version() - (events_to_commit.len() as version::Version);
146
147 self.store
148 .append(
149 aggregate_id.clone(),
150 version::Check::MustBe(current_event_stream_version),
151 events_to_commit,
152 )
153 .await
154 .map_err(|err| match err {
155 event::store::AppendError::Conflict(err) => SaveError::Conflict(err),
156 event::store::AppendError::Internal(err) => SaveError::Internal(err),
157 })?;
158
159 Ok(())
160 }
161}