use std::fmt::Debug;
use std::marker::PhantomData;
use async_trait::async_trait;
use futures::TryStreamExt;
use crate::aggregate::Aggregate;
use crate::{aggregate, event, version};
#[derive(Debug, thiserror::Error)]
pub enum GetError {
#[error("failed to get aggregate root: not found")]
NotFound,
#[error("failed to get aggregate root, an error occurred: {0}")]
Internal(#[from] anyhow::Error),
}
#[async_trait]
pub trait Getter<T>: Send + Sync
where
T: Aggregate,
{
async fn get(&self, id: &T::Id) -> Result<aggregate::Root<T>, GetError>;
}
#[derive(Debug, thiserror::Error)]
pub enum SaveError {
#[error("failed to save aggregate root: {0}")]
Conflict(#[from] version::ConflictError),
#[error("failed to save aggregate root, an error occurred: {0}")]
Internal(#[from] anyhow::Error),
}
#[async_trait]
pub trait Saver<T>: Send + Sync
where
T: Aggregate,
{
async fn save(&self, root: &mut aggregate::Root<T>) -> Result<(), SaveError>;
}
pub trait Repository<T>: Getter<T> + Saver<T> + Send + Sync
where
T: Aggregate,
{
}
impl<T, R> Repository<T> for R
where
T: Aggregate,
R: Getter<T> + Saver<T> + Send + Sync,
{
}
#[derive(Debug, Clone)]
pub struct EventSourced<T, S>
where
T: Aggregate,
S: event::Store<T::Id, T::Event>,
{
store: S,
aggregate: PhantomData<T>,
}
impl<T, S> From<S> for EventSourced<T, S>
where
T: Aggregate,
S: event::Store<T::Id, T::Event>,
{
fn from(store: S) -> Self {
Self {
store,
aggregate: PhantomData,
}
}
}
#[async_trait]
impl<T, S> Getter<T> for EventSourced<T, S>
where
T: Aggregate,
T::Id: Clone,
T::Error: std::error::Error + Send + Sync + 'static,
S: event::Store<T::Id, T::Event>,
<S as event::store::Streamer<T::Id, T::Event>>::Error:
std::error::Error + Send + Sync + 'static,
{
async fn get(&self, id: &T::Id) -> Result<aggregate::Root<T>, GetError> {
let stream = self
.store
.stream(id, event::VersionSelect::All)
.map_ok(|persisted| persisted.event);
let ctx = aggregate::Root::<T>::rehydrate_async(stream)
.await
.map_err(anyhow::Error::from)
.map_err(GetError::Internal)?;
ctx.ok_or(GetError::NotFound)
}
}
#[async_trait]
impl<T, S> Saver<T> for EventSourced<T, S>
where
T: Aggregate,
T::Id: Clone,
S: event::Store<T::Id, T::Event>,
{
async fn save(&self, root: &mut aggregate::Root<T>) -> Result<(), SaveError> {
let events_to_commit = root.take_uncommitted_events();
let aggregate_id = root.aggregate_id();
if events_to_commit.is_empty() {
return Ok(());
}
let current_event_stream_version =
root.version() - (events_to_commit.len() as version::Version);
self.store
.append(
aggregate_id.clone(),
version::Check::MustBe(current_event_stream_version),
events_to_commit,
)
.await
.map_err(|err| match err {
event::store::AppendError::Conflict(err) => SaveError::Conflict(err),
event::store::AppendError::Internal(err) => SaveError::Internal(err),
})?;
Ok(())
}
}