eventually/aggregate/
mod.rs

1//! Module containing support for the Aggregate pattern.
2//!
3//! ## What is an Aggregate?
4//!
5//! An [Aggregate] is the most important concept in your domain.
6//!
7//! It represents the entities your business domain is composed of,
8//! and the business logic your domain is exposing.
9//!
10//! For example: in an Order Management bounded-context (e.g. a
11//! microservice), the concepts of Order or Customer are two potential
12//! [Aggregate]s.
13//!
14//! Aggregates expose mutations with the concept of **commands**:
15//! from the previous example, an Order might expose some commands such as
16//! _"Add Order Item"_, or _"Remove Order Item"_, or _"Place Order"_
17//! to close the transaction.
18//!
19//! In Event Sourcing, the Aggregate state is modified by the usage of
20//! **Domain Events**, which carry some or all the fields in the state
21//! in a certain logical meaning.
22//!
23//! As such, commands in Event Sourcing will **produce** Domain Events.
24//!
25//! Aggregates should provide a way to **fold** Domain Events on the
26//! current value of the state, to produce the next state.
27
28use crate::version::Version;
29use crate::{event, message};
30
31pub mod repository;
32pub mod test;
33
34use futures::TryStreamExt;
35pub use repository::{EventSourced as EventSourcedRepository, Repository};
36
37/// An Aggregate represents a Domain Model that, through an Aggregate [Root],
38/// acts as a _transactional boundary_.
39///
40/// Aggregates are also used to enforce Domain invariants
41/// (i.e. certain constraints or rules that are unique to a specific Domain).
42///
43/// Since this is an Event-sourced version of the Aggregate pattern,
44/// any change to the Aggregate state must be represented through
45/// a Domain Event, which is then applied to the current state
46/// using the [`Aggregate::apply`] method.
47///
48/// More on Aggregates can be found here: `<https://www.dddcommunity.org/library/vernon_2011/>`
49pub trait Aggregate: Sized + Send + Sync + Clone {
50    /// The type used to uniquely identify the Aggregate.
51    type Id: Send + Sync;
52
53    /// The type of Domain Events that interest this Aggregate.
54    /// Usually, this type should be an `enum`.
55    type Event: message::Message + Send + Sync + Clone;
56
57    /// The error type that can be returned by [`Aggregate::apply`] when
58    /// mutating the Aggregate state.
59    type Error: Send + Sync;
60
61    /// A unique name identifier for this Aggregate type.
62    fn type_name() -> &'static str;
63
64    /// Returns the unique identifier for the Aggregate instance.
65    fn aggregate_id(&self) -> &Self::Id;
66
67    /// Mutates the state of an Aggregate through a Domain Event.
68    ///
69    /// # Errors
70    ///
71    /// The method can return an error if the event to apply is unexpected
72    /// given the current state of the Aggregate.
73    fn apply(state: Option<Self>, event: Self::Event) -> Result<Self, Self::Error>;
74}
75
76/// An Aggregate Root represents the Domain Entity object used to
77/// load and save an [Aggregate] from and to a [Repository], and
78/// to perform actions that may result in new Domain Events
79/// to change the state of the Aggregate.
80///
81/// The Aggregate state and list of Domain Events recorded
82/// are handled by the [Root] object itself.
83///
84/// ```text
85/// #[derive(Debug, Clone)]
86/// struct MyAggregate {
87///     // Here goes the state of the Aggregate.
88/// };
89///
90/// #[derive(Debug, Clone, PartialEq, Eq)]
91/// enum MyAggregateEvent {
92///     // Here we list the Domain Events for the Aggregate.
93///     EventHasHappened,
94/// }
95///
96/// impl Aggregate for MyAggregate {
97///     type Id = i64; // Just for the sake of the example.
98///     type Event = MyAggregateEvent;
99///     type Error = (); // Just for the sake of the example. Use a proper error here.
100///
101///     fn aggregate_id(&self) -> &Self::Id {
102///         todo!()
103///     }
104///
105///     fn apply(this: Option<Self>, event: Self::Event) -> Result<Self, Self::Error> {
106///         todo!()
107///     }
108/// }
109///
110/// // This type is necessary in order to create a new vtable
111/// // for the method implementations in the block below.
112/// #[derive(Debug, Clone)]
113/// struct MyAggregateRoot(Root<MyAggregate>)
114///
115/// impl MyAggregateRoot {
116///     pub fn do_something() -> Result<MyAggregate, ()> {
117///         // Here, we record a new Domain Event through the Root<MyAggregate> object.
118///         //
119///         // This will record the new Domain Event in a list of events to commit,
120///         // and call the `MyAggregate::apply` method to create the Aggregate state.
121///         Root::<MyAggregate>::record_new(MyAggregateEvent::EventHasHappened)
122///             .map(MyAggregateRoot)
123///     }
124/// }
125/// ```
126#[derive(Debug, Clone, PartialEq)]
127#[must_use]
128pub struct Root<T>
129where
130    T: Aggregate,
131{
132    aggregate: T,
133    version: Version,
134    recorded_events: Vec<event::Envelope<T::Event>>,
135}
136
137impl<T> std::ops::Deref for Root<T>
138where
139    T: Aggregate,
140{
141    type Target = T;
142
143    fn deref(&self) -> &Self::Target {
144        &self.aggregate
145    }
146}
147
148impl<T> Root<T>
149where
150    T: Aggregate,
151{
152    /// Returns the current version for the [Aggregate].
153    pub fn version(&self) -> Version {
154        self.version
155    }
156
157    /// Returns the unique identifier of the [Aggregate].
158    pub fn aggregate_id(&self) -> &T::Id {
159        self.aggregate.aggregate_id()
160    }
161
162    /// Maps the [Aggregate] value contained within [Root]
163    /// to a different type, that can be converted through [From] trait.
164    ///
165    /// Useful to convert an [Aggregate] type to a data transfer object to use
166    /// for database storage.
167    pub fn to_aggregate_type<K>(&self) -> K
168    where
169        K: From<T>,
170    {
171        K::from(self.aggregate.clone())
172    }
173
174    /// Returns the list of uncommitted, recorded Domain [Event]s from the [Root]
175    /// and resets the internal list to its default value.
176    #[doc(hidden)]
177    pub fn take_uncommitted_events(&mut self) -> Vec<event::Envelope<T::Event>> {
178        std::mem::take(&mut self.recorded_events)
179    }
180
181    /// Creates a new [Aggregate] [Root] instance by applying the specified
182    /// Domain Event.
183    ///
184    /// Example of usage:
185    /// ```text
186    /// use eventually::{
187    ///     event,
188    ///     aggregate::Root,
189    ///     aggregate,
190    /// };
191    ///
192    /// let my_aggregate_root = MyAggregateRoot::record_new(
193    ///     event::Envelope::from(MyDomainEvent { /* something */ })
194    ///  )?;
195    /// ```
196    ///
197    /// # Errors
198    ///
199    /// The method can return an error if the event to apply is unexpected
200    /// given the current state of the Aggregate.
201    pub fn record_new(event: event::Envelope<T::Event>) -> Result<Self, T::Error> {
202        Ok(Root {
203            version: 1,
204            aggregate: T::apply(None, event.message.clone())?,
205            recorded_events: vec![event],
206        })
207    }
208
209    /// Records a change to the [Aggregate] [Root], expressed by the specified
210    /// Domain Event.
211    ///
212    /// Example of usage:
213    /// ```text
214    /// use eventually::{
215    ///     event,
216    ///     aggregate::Root,
217    /// };
218    ///
219    /// impl MyAggregateRoot {
220    ///     pub fn update_name(&mut self, name: String) -> Result<(), MyAggregateError> {
221    ///         if name.is_empty() {
222    ///             return Err(MyAggregateError::NameIsEmpty);
223    ///         }
224    ///
225    ///         self.record_that(
226    ///             event::Envelope::from(MyAggergateEvent::NameWasChanged { name })
227    ///         )
228    ///     }
229    /// }
230    /// ```
231    ///
232    /// # Errors
233    ///
234    /// The method can return an error if the event to apply is unexpected
235    /// given the current state of the Aggregate.
236    pub fn record_that(&mut self, event: event::Envelope<T::Event>) -> Result<(), T::Error> {
237        self.aggregate = T::apply(Some(self.aggregate.clone()), event.message.clone())?;
238        self.recorded_events.push(event);
239        self.version += 1;
240
241        Ok(())
242    }
243}
244
245/// List of possible errors that can be returned by [`Root::rehydrate_async`].
246#[derive(Debug, thiserror::Error)]
247pub enum RehydrateError<T, I> {
248    /// Error returned during rehydration when the [Aggregate Root][Root]
249    /// is applying a Domain Event using [`Aggregate::apply`].
250    ///
251    /// This usually implies the Event Stream for the [Aggregate]
252    /// contains corrupted or unexpected data.
253    #[error("failed to apply domain event while rehydrating aggregate: {0}")]
254    Domain(#[source] T),
255
256    /// This error is returned by [`Root::rehydrate_async`] when the underlying
257    /// [`futures::TryStream`] has returned an error.
258    #[error("failed to rehydrate aggregate from event stream: {0}")]
259    Inner(#[source] I),
260}
261
262impl<T> Root<T>
263where
264    T: Aggregate,
265{
266    /// Rehydrates an [Aggregate] Root from its state and version.
267    /// Useful for [Repository] implementations outside the [EventSourcedRepository] one.
268    #[doc(hidden)]
269    pub fn rehydrate_from_state(version: Version, aggregate: T) -> Root<T> {
270        Root {
271            version,
272            aggregate,
273            recorded_events: Vec::default(),
274        }
275    }
276
277    /// Rehydrates an [Aggregate Root][Root] from a stream of Domain Events.
278    #[doc(hidden)]
279    pub(crate) fn rehydrate(
280        mut stream: impl Iterator<Item = event::Envelope<T::Event>>,
281    ) -> Result<Option<Root<T>>, T::Error> {
282        stream.try_fold(None, |ctx: Option<Root<T>>, event| {
283            let new_ctx_result = match ctx {
284                None => Root::<T>::rehydrate_from(event),
285                Some(ctx) => ctx.apply_rehydrated_event(event),
286            };
287
288            Ok(Some(new_ctx_result?))
289        })
290    }
291
292    /// Rehydrates an [Aggregate Root][Root] from a stream of Domain Events.
293    #[doc(hidden)]
294    pub(crate) async fn rehydrate_async<Err>(
295        stream: impl futures::TryStream<Ok = event::Envelope<T::Event>, Error = Err>,
296    ) -> Result<Option<Root<T>>, RehydrateError<T::Error, Err>> {
297        stream
298            .map_err(RehydrateError::Inner)
299            .try_fold(None, |ctx: Option<Root<T>>, event| async {
300                let new_ctx_result = match ctx {
301                    None => Root::<T>::rehydrate_from(event),
302                    Some(ctx) => ctx.apply_rehydrated_event(event),
303                };
304
305                Ok(Some(new_ctx_result.map_err(RehydrateError::Domain)?))
306            })
307            .await
308    }
309
310    /// Creates a new [Root] instance from a Domain [Event]
311    /// while rehydrating an [Aggregate].
312    ///
313    /// # Errors
314    ///
315    /// The method can return an error if the event to apply is unexpected
316    /// given the current state of the Aggregate.
317    #[doc(hidden)]
318    pub(crate) fn rehydrate_from(event: event::Envelope<T::Event>) -> Result<Root<T>, T::Error> {
319        Ok(Root {
320            version: 1,
321            aggregate: T::apply(None, event.message)?,
322            recorded_events: Vec::default(),
323        })
324    }
325
326    /// Applies a new Domain [Event] to the [Root] while rehydrating
327    /// an [Aggregate].
328    ///
329    /// # Errors
330    ///
331    /// The method can return an error if the event to apply is unexpected
332    /// given the current state of the Aggregate.
333    #[doc(hidden)]
334    pub(crate) fn apply_rehydrated_event(
335        mut self,
336        event: event::Envelope<T::Event>,
337    ) -> Result<Root<T>, T::Error> {
338        self.aggregate = T::apply(Some(self.aggregate), event.message)?;
339        self.version += 1;
340
341        Ok(self)
342    }
343}
344
345// The warnings are happening due to usage of the methods only inside #[cfg(test)]
346#[allow(dead_code)]
347#[doc(hidden)]
348#[cfg(test)]
349pub(crate) mod test_user_domain {
350    use crate::{aggregate, message};
351
352    #[derive(Debug, Clone)]
353    pub(crate) struct User {
354        email: String,
355        password: String,
356    }
357
358    #[derive(Debug, Clone, PartialEq, Eq)]
359    pub(crate) enum UserEvent {
360        WasCreated { email: String, password: String },
361        PasswordWasChanged { password: String },
362    }
363
364    impl message::Message for UserEvent {
365        fn name(&self) -> &'static str {
366            match self {
367                UserEvent::WasCreated { .. } => "UserWasCreated",
368                UserEvent::PasswordWasChanged { .. } => "UserPasswordWasChanged",
369            }
370        }
371    }
372
373    #[derive(Debug, thiserror::Error)]
374    pub(crate) enum UserError {
375        #[error("provided email was empty")]
376        EmptyEmail,
377        #[error("provided password was empty")]
378        EmptyPassword,
379        #[error("user was not yet created")]
380        NotYetCreated,
381        #[error("user was already created")]
382        AlreadyCreated,
383    }
384
385    impl aggregate::Aggregate for User {
386        type Id = String;
387        type Event = UserEvent;
388        type Error = UserError;
389
390        fn type_name() -> &'static str {
391            "User"
392        }
393
394        fn aggregate_id(&self) -> &Self::Id {
395            &self.email
396        }
397
398        fn apply(state: Option<Self>, event: Self::Event) -> Result<Self, Self::Error> {
399            match state {
400                None => match event {
401                    UserEvent::WasCreated { email, password } => Ok(User { email, password }),
402                    UserEvent::PasswordWasChanged { .. } => Err(UserError::NotYetCreated),
403                },
404                Some(mut state) => match event {
405                    UserEvent::PasswordWasChanged { password } => {
406                        state.password = password;
407                        Ok(state)
408                    },
409                    UserEvent::WasCreated { .. } => Err(UserError::AlreadyCreated),
410                },
411            }
412        }
413    }
414
415    impl aggregate::Root<User> {
416        pub(crate) fn create(email: String, password: String) -> Result<Self, UserError> {
417            if email.is_empty() {
418                return Err(UserError::EmptyEmail);
419            }
420
421            if password.is_empty() {
422                return Err(UserError::EmptyPassword);
423            }
424
425            Self::record_new(UserEvent::WasCreated { email, password }.into())
426        }
427
428        pub(crate) fn change_password(&mut self, password: String) -> Result<(), UserError> {
429            if password.is_empty() {
430                return Err(UserError::EmptyPassword);
431            }
432
433            self.record_that(UserEvent::PasswordWasChanged { password }.into())?;
434
435            Ok(())
436        }
437    }
438}
439
440#[allow(clippy::semicolon_if_nothing_returned)] // False positives :shrugs:
441#[cfg(test)]
442mod tests {
443    use std::error::Error;
444
445    use crate::aggregate::repository::{Getter, Saver};
446    use crate::aggregate::test_user_domain::{User, UserEvent};
447    use crate::event::store::EventStoreExt;
448    use crate::{aggregate, event, version};
449
450    #[tokio::test]
451    async fn repository_persists_new_aggregate_root() {
452        let event_store = event::store::InMemory::<String, UserEvent>::default();
453        let tracking_event_store = event_store.with_recorded_events_tracking();
454        let user_repository =
455            aggregate::EventSourcedRepository::<User, _>::from(tracking_event_store.clone());
456
457        let email = "test@email.com".to_owned();
458        let password = "not-a-secret".to_owned();
459
460        let mut user = aggregate::Root::<User>::create(email.clone(), password.clone())
461            .expect("user should be created successfully");
462
463        user_repository
464            .save(&mut user)
465            .await
466            .expect("user should be saved successfully");
467
468        let expected_events = vec![event::Persisted {
469            stream_id: email.clone(),
470            version: 1,
471            event: event::Envelope::from(UserEvent::WasCreated { email, password }),
472        }];
473
474        assert_eq!(expected_events, tracking_event_store.recorded_events());
475    }
476
477    #[tokio::test]
478    async fn repository_retrieves_the_aggregate_root_and_stores_new_events() {
479        let event_store = event::store::InMemory::<String, UserEvent>::default();
480        let tracking_event_store = event_store.with_recorded_events_tracking();
481        let user_repository =
482            aggregate::EventSourcedRepository::<User, _>::from(tracking_event_store.clone());
483
484        let email = "test@email.com".to_owned();
485        let password = "not-a-secret".to_owned();
486
487        let mut user = aggregate::Root::<User>::create(email.clone(), password.clone())
488            .expect("user should be created successfully");
489
490        user_repository
491            .save(&mut user)
492            .await
493            .expect("user should be saved successfully");
494
495        // Reset the event recorded while storing the User for the first time.
496        tracking_event_store.reset_recorded_events();
497
498        let mut user = user_repository
499            .get(&email)
500            .await
501            .expect("user should be retrieved from the repository");
502
503        let new_password = "new-password".to_owned();
504
505        user.change_password(new_password.clone())
506            .expect("user password should be changed successfully");
507
508        user_repository
509            .save(&mut user)
510            .await
511            .expect("new user version should be saved successfully");
512
513        let expected_events = vec![event::Persisted {
514            stream_id: email.clone(),
515            version: 2,
516            event: event::Envelope::from(UserEvent::PasswordWasChanged {
517                password: new_password,
518            }),
519        }];
520
521        assert_eq!(expected_events, tracking_event_store.recorded_events());
522    }
523
524    #[tokio::test]
525    async fn repository_returns_conflict_error_from_store_when_data_race_happens() {
526        let event_store = event::store::InMemory::<String, UserEvent>::default();
527        let user_repository =
528            aggregate::EventSourcedRepository::<User, _>::from(event_store.clone());
529
530        let email = "test@email.com".to_owned();
531        let password = "not-a-secret".to_owned();
532
533        let mut user = aggregate::Root::<User>::create(email.clone(), password.clone())
534            .expect("user should be created successfully");
535
536        // We need to clone the User Aggregate Root instance to get the list
537        // of uncommitted events from the Root context twice.
538        let mut cloned_user = user.clone();
539
540        // Saving the first User to the Repository.
541        user_repository
542            .save(&mut user)
543            .await
544            .expect("user should be saved successfully");
545
546        // Simulating data race by duplicating the call to the Repository
547        // with the same UserRoot instance that has already been committeed.
548        let error = user_repository.save(&mut cloned_user).await.expect_err(
549            "the repository should fail on the second .save() call with the cloned user",
550        );
551
552        let error: Box<dyn Error> = error.into();
553
554        // Have no idea how to fix this one...
555        #[allow(clippy::redundant_closure_for_method_calls)]
556        {
557            assert!(error
558                .source()
559                .is_some_and(|src| src.is::<version::ConflictError>()));
560        }
561    }
562}