eventually/aggregate/
repository.rs

1//! Module containing the definition of a [Repository], to fetch and store
2//! Aggregate Roots from a data store.
3//!
4//! If you are looking for the Event-sourced implementation of an Aggregate Repository,
5//! take a look at [`EventSourced`].
6
7use std::fmt::Debug;
8use std::marker::PhantomData;
9
10use async_trait::async_trait;
11use futures::TryStreamExt;
12
13use crate::aggregate::Aggregate;
14use crate::{aggregate, event, version};
15
16/// All possible errors returned by [`Getter::get`].
17#[derive(Debug, thiserror::Error)]
18pub enum GetError {
19    /// Error returned when the [Aggregate Root][aggregate::Root] could not be found in the data store.
20    #[error("failed to get aggregate root: not found")]
21    NotFound,
22    /// Error returned when the [Getter] implementation has encountered an error.
23    #[error("failed to get aggregate root, an error occurred: {0}")]
24    Internal(#[from] anyhow::Error),
25}
26
27/// Trait used to implement read access to a data store from which
28/// to load an [aggregate::Root] instance, given its id.
29#[async_trait]
30pub trait Getter<T>: Send + Sync
31where
32    T: Aggregate,
33{
34    /// Loads an [aggregate::Root] instance from the data store,
35    /// referenced by its unique identifier.
36    async fn get(&self, id: &T::Id) -> Result<aggregate::Root<T>, GetError>;
37}
38
39/// All possible errors returned by [`Saver::save`].
40#[derive(Debug, thiserror::Error)]
41pub enum SaveError {
42    /// Error returned when [`Saver::save`] encounters a conflict error while saving the new Aggregate Root.
43    #[error("failed to save aggregate root: {0}")]
44    Conflict(#[from] version::ConflictError),
45    /// Error returned when the [Saver] implementation has encountered an error.
46    #[error("failed to save aggregate root, an error occurred: {0}")]
47    Internal(#[from] anyhow::Error),
48}
49
50/// Trait used to implement write access to a data store, which can be used
51/// to save the latest state of an [aggregate::Root] instance.
52#[async_trait]
53pub trait Saver<T>: Send + Sync
54where
55    T: Aggregate,
56{
57    /// Saves a new version of an [aggregate::Root] instance to the data store.
58    async fn save(&self, root: &mut aggregate::Root<T>) -> Result<(), SaveError>;
59}
60
61/// A Repository is an object that allows to load and save
62/// an [Aggregate Root][aggregate::Root] from and to a persistent data store.
63pub trait Repository<T>: Getter<T> + Saver<T> + Send + Sync
64where
65    T: Aggregate,
66{
67}
68
69impl<T, R> Repository<T> for R
70where
71    T: Aggregate,
72    R: Getter<T> + Saver<T> + Send + Sync,
73{
74}
75
76/// An Event-sourced implementation of the [Repository] interface.
77///
78/// It uses an [Event Store][event::Store] instance to stream Domain Events
79/// for a particular Aggregate, and append uncommitted Domain Events
80/// recorded by an Aggregate Root.
81#[derive(Debug, Clone)]
82pub struct EventSourced<T, S>
83where
84    T: Aggregate,
85    S: event::Store<T::Id, T::Event>,
86{
87    store: S,
88    aggregate: PhantomData<T>,
89}
90
91impl<T, S> From<S> for EventSourced<T, S>
92where
93    T: Aggregate,
94    S: event::Store<T::Id, T::Event>,
95{
96    fn from(store: S) -> Self {
97        Self {
98            store,
99            aggregate: PhantomData,
100        }
101    }
102}
103
104#[async_trait]
105impl<T, S> Getter<T> for EventSourced<T, S>
106where
107    T: Aggregate,
108    T::Id: Clone,
109    T::Error: std::error::Error + Send + Sync + 'static,
110    S: event::Store<T::Id, T::Event>,
111    <S as event::store::Streamer<T::Id, T::Event>>::Error:
112        std::error::Error + Send + Sync + 'static,
113{
114    async fn get(&self, id: &T::Id) -> Result<aggregate::Root<T>, GetError> {
115        let stream = self
116            .store
117            .stream(id, event::VersionSelect::All)
118            .map_ok(|persisted| persisted.event);
119
120        let ctx = aggregate::Root::<T>::rehydrate_async(stream)
121            .await
122            .map_err(anyhow::Error::from)
123            .map_err(GetError::Internal)?;
124
125        ctx.ok_or(GetError::NotFound)
126    }
127}
128
129#[async_trait]
130impl<T, S> Saver<T> for EventSourced<T, S>
131where
132    T: Aggregate,
133    T::Id: Clone,
134    S: event::Store<T::Id, T::Event>,
135{
136    async fn save(&self, root: &mut aggregate::Root<T>) -> Result<(), SaveError> {
137        let events_to_commit = root.take_uncommitted_events();
138        let aggregate_id = root.aggregate_id();
139
140        if events_to_commit.is_empty() {
141            return Ok(());
142        }
143
144        let current_event_stream_version =
145            root.version() - (events_to_commit.len() as version::Version);
146
147        self.store
148            .append(
149                aggregate_id.clone(),
150                version::Check::MustBe(current_event_stream_version),
151                events_to_commit,
152            )
153            .await
154            .map_err(|err| match err {
155                event::store::AppendError::Conflict(err) => SaveError::Conflict(err),
156                event::store::AppendError::Internal(err) => SaveError::Internal(err),
157            })?;
158
159        Ok(())
160    }
161}