eventually/aggregate/mod.rs
1//! Module containing support for the Aggregate pattern.
2//!
3//! ## What is an Aggregate?
4//!
5//! An [Aggregate] is the most important concept in your domain.
6//!
7//! It represents the entities your business domain is composed of,
8//! and the business logic your domain is exposing.
9//!
10//! For example: in an Order Management bounded-context (e.g. a
11//! microservice), the concepts of Order or Customer are two potential
12//! [Aggregate]s.
13//!
14//! Aggregates expose mutations with the concept of **commands**:
15//! from the previous example, an Order might expose some commands such as
16//! _"Add Order Item"_, or _"Remove Order Item"_, or _"Place Order"_
17//! to close the transaction.
18//!
19//! In Event Sourcing, the Aggregate state is modified by the usage of
20//! **Domain Events**, which carry some or all the fields in the state
21//! in a certain logical meaning.
22//!
23//! As such, commands in Event Sourcing will **produce** Domain Events.
24//!
25//! Aggregates should provide a way to **fold** Domain Events on the
26//! current value of the state, to produce the next state.
27
28use crate::version::Version;
29use crate::{event, message};
30
31pub mod repository;
32pub mod test;
33
34use futures::TryStreamExt;
35pub use repository::{EventSourced as EventSourcedRepository, Repository};
36
37/// An Aggregate represents a Domain Model that, through an Aggregate [Root],
38/// acts as a _transactional boundary_.
39///
40/// Aggregates are also used to enforce Domain invariants
41/// (i.e. certain constraints or rules that are unique to a specific Domain).
42///
43/// Since this is an Event-sourced version of the Aggregate pattern,
44/// any change to the Aggregate state must be represented through
45/// a Domain Event, which is then applied to the current state
46/// using the [`Aggregate::apply`] method.
47///
48/// More on Aggregates can be found here: `<https://www.dddcommunity.org/library/vernon_2011/>`
49pub trait Aggregate: Sized + Send + Sync + Clone {
50 /// The type used to uniquely identify the Aggregate.
51 type Id: Send + Sync;
52
53 /// The type of Domain Events that interest this Aggregate.
54 /// Usually, this type should be an `enum`.
55 type Event: message::Message + Send + Sync + Clone;
56
57 /// The error type that can be returned by [`Aggregate::apply`] when
58 /// mutating the Aggregate state.
59 type Error: Send + Sync;
60
61 /// A unique name identifier for this Aggregate type.
62 fn type_name() -> &'static str;
63
64 /// Returns the unique identifier for the Aggregate instance.
65 fn aggregate_id(&self) -> &Self::Id;
66
67 /// Mutates the state of an Aggregate through a Domain Event.
68 ///
69 /// # Errors
70 ///
71 /// The method can return an error if the event to apply is unexpected
72 /// given the current state of the Aggregate.
73 fn apply(state: Option<Self>, event: Self::Event) -> Result<Self, Self::Error>;
74}
75
76/// An Aggregate Root represents the Domain Entity object used to
77/// load and save an [Aggregate] from and to a [Repository], and
78/// to perform actions that may result in new Domain Events
79/// to change the state of the Aggregate.
80///
81/// The Aggregate state and list of Domain Events recorded
82/// are handled by the [Root] object itself.
83///
84/// ```text
85/// #[derive(Debug, Clone)]
86/// struct MyAggregate {
87/// // Here goes the state of the Aggregate.
88/// };
89///
90/// #[derive(Debug, Clone, PartialEq, Eq)]
91/// enum MyAggregateEvent {
92/// // Here we list the Domain Events for the Aggregate.
93/// EventHasHappened,
94/// }
95///
96/// impl Aggregate for MyAggregate {
97/// type Id = i64; // Just for the sake of the example.
98/// type Event = MyAggregateEvent;
99/// type Error = (); // Just for the sake of the example. Use a proper error here.
100///
101/// fn aggregate_id(&self) -> &Self::Id {
102/// todo!()
103/// }
104///
105/// fn apply(this: Option<Self>, event: Self::Event) -> Result<Self, Self::Error> {
106/// todo!()
107/// }
108/// }
109///
110/// // This type is necessary in order to create a new vtable
111/// // for the method implementations in the block below.
112/// #[derive(Debug, Clone)]
113/// struct MyAggregateRoot(Root<MyAggregate>)
114///
115/// impl MyAggregateRoot {
116/// pub fn do_something() -> Result<MyAggregate, ()> {
117/// // Here, we record a new Domain Event through the Root<MyAggregate> object.
118/// //
119/// // This will record the new Domain Event in a list of events to commit,
120/// // and call the `MyAggregate::apply` method to create the Aggregate state.
121/// Root::<MyAggregate>::record_new(MyAggregateEvent::EventHasHappened)
122/// .map(MyAggregateRoot)
123/// }
124/// }
125/// ```
126#[derive(Debug, Clone, PartialEq)]
127#[must_use]
128pub struct Root<T>
129where
130 T: Aggregate,
131{
132 aggregate: T,
133 version: Version,
134 recorded_events: Vec<event::Envelope<T::Event>>,
135}
136
137impl<T> std::ops::Deref for Root<T>
138where
139 T: Aggregate,
140{
141 type Target = T;
142
143 fn deref(&self) -> &Self::Target {
144 &self.aggregate
145 }
146}
147
148impl<T> Root<T>
149where
150 T: Aggregate,
151{
152 /// Returns the current version for the [Aggregate].
153 pub fn version(&self) -> Version {
154 self.version
155 }
156
157 /// Returns the unique identifier of the [Aggregate].
158 pub fn aggregate_id(&self) -> &T::Id {
159 self.aggregate.aggregate_id()
160 }
161
162 /// Maps the [Aggregate] value contained within [Root]
163 /// to a different type, that can be converted through [From] trait.
164 ///
165 /// Useful to convert an [Aggregate] type to a data transfer object to use
166 /// for database storage.
167 pub fn to_aggregate_type<K>(&self) -> K
168 where
169 K: From<T>,
170 {
171 K::from(self.aggregate.clone())
172 }
173
174 /// Returns the list of uncommitted, recorded Domain [Event]s from the [Root]
175 /// and resets the internal list to its default value.
176 #[doc(hidden)]
177 pub fn take_uncommitted_events(&mut self) -> Vec<event::Envelope<T::Event>> {
178 std::mem::take(&mut self.recorded_events)
179 }
180
181 /// Creates a new [Aggregate] [Root] instance by applying the specified
182 /// Domain Event.
183 ///
184 /// Example of usage:
185 /// ```text
186 /// use eventually::{
187 /// event,
188 /// aggregate::Root,
189 /// aggregate,
190 /// };
191 ///
192 /// let my_aggregate_root = MyAggregateRoot::record_new(
193 /// event::Envelope::from(MyDomainEvent { /* something */ })
194 /// )?;
195 /// ```
196 ///
197 /// # Errors
198 ///
199 /// The method can return an error if the event to apply is unexpected
200 /// given the current state of the Aggregate.
201 pub fn record_new(event: event::Envelope<T::Event>) -> Result<Self, T::Error> {
202 Ok(Root {
203 version: 1,
204 aggregate: T::apply(None, event.message.clone())?,
205 recorded_events: vec![event],
206 })
207 }
208
209 /// Records a change to the [Aggregate] [Root], expressed by the specified
210 /// Domain Event.
211 ///
212 /// Example of usage:
213 /// ```text
214 /// use eventually::{
215 /// event,
216 /// aggregate::Root,
217 /// };
218 ///
219 /// impl MyAggregateRoot {
220 /// pub fn update_name(&mut self, name: String) -> Result<(), MyAggregateError> {
221 /// if name.is_empty() {
222 /// return Err(MyAggregateError::NameIsEmpty);
223 /// }
224 ///
225 /// self.record_that(
226 /// event::Envelope::from(MyAggergateEvent::NameWasChanged { name })
227 /// )
228 /// }
229 /// }
230 /// ```
231 ///
232 /// # Errors
233 ///
234 /// The method can return an error if the event to apply is unexpected
235 /// given the current state of the Aggregate.
236 pub fn record_that(&mut self, event: event::Envelope<T::Event>) -> Result<(), T::Error> {
237 self.aggregate = T::apply(Some(self.aggregate.clone()), event.message.clone())?;
238 self.recorded_events.push(event);
239 self.version += 1;
240
241 Ok(())
242 }
243}
244
245/// List of possible errors that can be returned by [`Root::rehydrate_async`].
246#[derive(Debug, thiserror::Error)]
247pub enum RehydrateError<T, I> {
248 /// Error returned during rehydration when the [Aggregate Root][Root]
249 /// is applying a Domain Event using [`Aggregate::apply`].
250 ///
251 /// This usually implies the Event Stream for the [Aggregate]
252 /// contains corrupted or unexpected data.
253 #[error("failed to apply domain event while rehydrating aggregate: {0}")]
254 Domain(#[source] T),
255
256 /// This error is returned by [`Root::rehydrate_async`] when the underlying
257 /// [`futures::TryStream`] has returned an error.
258 #[error("failed to rehydrate aggregate from event stream: {0}")]
259 Inner(#[source] I),
260}
261
262impl<T> Root<T>
263where
264 T: Aggregate,
265{
266 /// Rehydrates an [Aggregate] Root from its state and version.
267 /// Useful for [Repository] implementations outside the [EventSourcedRepository] one.
268 #[doc(hidden)]
269 pub fn rehydrate_from_state(version: Version, aggregate: T) -> Root<T> {
270 Root {
271 version,
272 aggregate,
273 recorded_events: Vec::default(),
274 }
275 }
276
277 /// Rehydrates an [Aggregate Root][Root] from a stream of Domain Events.
278 #[doc(hidden)]
279 pub(crate) fn rehydrate(
280 mut stream: impl Iterator<Item = event::Envelope<T::Event>>,
281 ) -> Result<Option<Root<T>>, T::Error> {
282 stream.try_fold(None, |ctx: Option<Root<T>>, event| {
283 let new_ctx_result = match ctx {
284 None => Root::<T>::rehydrate_from(event),
285 Some(ctx) => ctx.apply_rehydrated_event(event),
286 };
287
288 Ok(Some(new_ctx_result?))
289 })
290 }
291
292 /// Rehydrates an [Aggregate Root][Root] from a stream of Domain Events.
293 #[doc(hidden)]
294 pub(crate) async fn rehydrate_async<Err>(
295 stream: impl futures::TryStream<Ok = event::Envelope<T::Event>, Error = Err>,
296 ) -> Result<Option<Root<T>>, RehydrateError<T::Error, Err>> {
297 stream
298 .map_err(RehydrateError::Inner)
299 .try_fold(None, |ctx: Option<Root<T>>, event| async {
300 let new_ctx_result = match ctx {
301 None => Root::<T>::rehydrate_from(event),
302 Some(ctx) => ctx.apply_rehydrated_event(event),
303 };
304
305 Ok(Some(new_ctx_result.map_err(RehydrateError::Domain)?))
306 })
307 .await
308 }
309
310 /// Creates a new [Root] instance from a Domain [Event]
311 /// while rehydrating an [Aggregate].
312 ///
313 /// # Errors
314 ///
315 /// The method can return an error if the event to apply is unexpected
316 /// given the current state of the Aggregate.
317 #[doc(hidden)]
318 pub(crate) fn rehydrate_from(event: event::Envelope<T::Event>) -> Result<Root<T>, T::Error> {
319 Ok(Root {
320 version: 1,
321 aggregate: T::apply(None, event.message)?,
322 recorded_events: Vec::default(),
323 })
324 }
325
326 /// Applies a new Domain [Event] to the [Root] while rehydrating
327 /// an [Aggregate].
328 ///
329 /// # Errors
330 ///
331 /// The method can return an error if the event to apply is unexpected
332 /// given the current state of the Aggregate.
333 #[doc(hidden)]
334 pub(crate) fn apply_rehydrated_event(
335 mut self,
336 event: event::Envelope<T::Event>,
337 ) -> Result<Root<T>, T::Error> {
338 self.aggregate = T::apply(Some(self.aggregate), event.message)?;
339 self.version += 1;
340
341 Ok(self)
342 }
343}
344
345// The warnings are happening due to usage of the methods only inside #[cfg(test)]
346#[allow(dead_code)]
347#[doc(hidden)]
348#[cfg(test)]
349pub(crate) mod test_user_domain {
350 use crate::{aggregate, message};
351
352 #[derive(Debug, Clone)]
353 pub(crate) struct User {
354 email: String,
355 password: String,
356 }
357
358 #[derive(Debug, Clone, PartialEq, Eq)]
359 pub(crate) enum UserEvent {
360 WasCreated { email: String, password: String },
361 PasswordWasChanged { password: String },
362 }
363
364 impl message::Message for UserEvent {
365 fn name(&self) -> &'static str {
366 match self {
367 UserEvent::WasCreated { .. } => "UserWasCreated",
368 UserEvent::PasswordWasChanged { .. } => "UserPasswordWasChanged",
369 }
370 }
371 }
372
373 #[derive(Debug, thiserror::Error)]
374 pub(crate) enum UserError {
375 #[error("provided email was empty")]
376 EmptyEmail,
377 #[error("provided password was empty")]
378 EmptyPassword,
379 #[error("user was not yet created")]
380 NotYetCreated,
381 #[error("user was already created")]
382 AlreadyCreated,
383 }
384
385 impl aggregate::Aggregate for User {
386 type Id = String;
387 type Event = UserEvent;
388 type Error = UserError;
389
390 fn type_name() -> &'static str {
391 "User"
392 }
393
394 fn aggregate_id(&self) -> &Self::Id {
395 &self.email
396 }
397
398 fn apply(state: Option<Self>, event: Self::Event) -> Result<Self, Self::Error> {
399 match state {
400 None => match event {
401 UserEvent::WasCreated { email, password } => Ok(User { email, password }),
402 UserEvent::PasswordWasChanged { .. } => Err(UserError::NotYetCreated),
403 },
404 Some(mut state) => match event {
405 UserEvent::PasswordWasChanged { password } => {
406 state.password = password;
407 Ok(state)
408 },
409 UserEvent::WasCreated { .. } => Err(UserError::AlreadyCreated),
410 },
411 }
412 }
413 }
414
415 impl aggregate::Root<User> {
416 pub(crate) fn create(email: String, password: String) -> Result<Self, UserError> {
417 if email.is_empty() {
418 return Err(UserError::EmptyEmail);
419 }
420
421 if password.is_empty() {
422 return Err(UserError::EmptyPassword);
423 }
424
425 Self::record_new(UserEvent::WasCreated { email, password }.into())
426 }
427
428 pub(crate) fn change_password(&mut self, password: String) -> Result<(), UserError> {
429 if password.is_empty() {
430 return Err(UserError::EmptyPassword);
431 }
432
433 self.record_that(UserEvent::PasswordWasChanged { password }.into())?;
434
435 Ok(())
436 }
437 }
438}
439
440#[allow(clippy::semicolon_if_nothing_returned)] // False positives :shrugs:
441#[cfg(test)]
442mod tests {
443 use std::error::Error;
444
445 use crate::aggregate::repository::{Getter, Saver};
446 use crate::aggregate::test_user_domain::{User, UserEvent};
447 use crate::event::store::EventStoreExt;
448 use crate::{aggregate, event, version};
449
450 #[tokio::test]
451 async fn repository_persists_new_aggregate_root() {
452 let event_store = event::store::InMemory::<String, UserEvent>::default();
453 let tracking_event_store = event_store.with_recorded_events_tracking();
454 let user_repository =
455 aggregate::EventSourcedRepository::<User, _>::from(tracking_event_store.clone());
456
457 let email = "test@email.com".to_owned();
458 let password = "not-a-secret".to_owned();
459
460 let mut user = aggregate::Root::<User>::create(email.clone(), password.clone())
461 .expect("user should be created successfully");
462
463 user_repository
464 .save(&mut user)
465 .await
466 .expect("user should be saved successfully");
467
468 let expected_events = vec![event::Persisted {
469 stream_id: email.clone(),
470 version: 1,
471 event: event::Envelope::from(UserEvent::WasCreated { email, password }),
472 }];
473
474 assert_eq!(expected_events, tracking_event_store.recorded_events());
475 }
476
477 #[tokio::test]
478 async fn repository_retrieves_the_aggregate_root_and_stores_new_events() {
479 let event_store = event::store::InMemory::<String, UserEvent>::default();
480 let tracking_event_store = event_store.with_recorded_events_tracking();
481 let user_repository =
482 aggregate::EventSourcedRepository::<User, _>::from(tracking_event_store.clone());
483
484 let email = "test@email.com".to_owned();
485 let password = "not-a-secret".to_owned();
486
487 let mut user = aggregate::Root::<User>::create(email.clone(), password.clone())
488 .expect("user should be created successfully");
489
490 user_repository
491 .save(&mut user)
492 .await
493 .expect("user should be saved successfully");
494
495 // Reset the event recorded while storing the User for the first time.
496 tracking_event_store.reset_recorded_events();
497
498 let mut user = user_repository
499 .get(&email)
500 .await
501 .expect("user should be retrieved from the repository");
502
503 let new_password = "new-password".to_owned();
504
505 user.change_password(new_password.clone())
506 .expect("user password should be changed successfully");
507
508 user_repository
509 .save(&mut user)
510 .await
511 .expect("new user version should be saved successfully");
512
513 let expected_events = vec![event::Persisted {
514 stream_id: email.clone(),
515 version: 2,
516 event: event::Envelope::from(UserEvent::PasswordWasChanged {
517 password: new_password,
518 }),
519 }];
520
521 assert_eq!(expected_events, tracking_event_store.recorded_events());
522 }
523
524 #[tokio::test]
525 async fn repository_returns_conflict_error_from_store_when_data_race_happens() {
526 let event_store = event::store::InMemory::<String, UserEvent>::default();
527 let user_repository =
528 aggregate::EventSourcedRepository::<User, _>::from(event_store.clone());
529
530 let email = "test@email.com".to_owned();
531 let password = "not-a-secret".to_owned();
532
533 let mut user = aggregate::Root::<User>::create(email.clone(), password.clone())
534 .expect("user should be created successfully");
535
536 // We need to clone the User Aggregate Root instance to get the list
537 // of uncommitted events from the Root context twice.
538 let mut cloned_user = user.clone();
539
540 // Saving the first User to the Repository.
541 user_repository
542 .save(&mut user)
543 .await
544 .expect("user should be saved successfully");
545
546 // Simulating data race by duplicating the call to the Repository
547 // with the same UserRoot instance that has already been committeed.
548 let error = user_repository.save(&mut cloned_user).await.expect_err(
549 "the repository should fail on the second .save() call with the cloned user",
550 );
551
552 let error: Box<dyn Error> = error.into();
553
554 // Have no idea how to fix this one...
555 #[allow(clippy::redundant_closure_for_method_calls)]
556 {
557 assert!(error
558 .source()
559 .is_some_and(|src| src.is::<version::ConflictError>()));
560 }
561 }
562}