eventually/command/
mod.rs

1//! Module containing support for Domain [Command]s.
2//!
3//! Following the Domain-driven Design definition, a [Command] expresses the
4//! intent of an Actor (e.g. a Customer, a User, a System, etc.) to modify
5//! the state of the system in some way.
6//!
7//! To modify the state of the system through a [Command], you must
8//! implement a Command [Handler] which, in an Event-sourced system,
9//! should make use of an [Aggregate] to evaluate the validity of the Command
10//! submitted, and emit Domain [Event]s as a result (through the Event [Store]).
11//!
12//! Check out the type documentation exported in this module.
13
14pub mod test;
15
16use std::future::Future;
17
18use async_trait::async_trait;
19
20use crate::message;
21
22/// A Command represents an intent by an Actor (e.g. a User, or a System)
23/// to mutate the state of the system.
24///
25/// In an event-sourced system, a Command is represented as a [Message].
26pub type Envelope<T> = message::Envelope<T>;
27
28/// A software component that is able to handle [Command]s of a certain type,
29/// and mutate the state as a result of the command handling, or fail.
30///
31/// In an event-sourced system, the [Command] Handler
32/// should use an [Aggregate][crate::aggregate::Aggregate] to evaluate
33/// a [Command] to ensure business invariants are respected.
34#[async_trait]
35pub trait Handler<T>: Send + Sync
36where
37    T: message::Message,
38{
39    /// The error type returned by the Handler while handling a [Command].
40    type Error: Send + Sync;
41
42    /// Handles a [Command] and returns an error if the handling has failed.
43    ///
44    /// Since [Command]s are solely modifying the state of the system,
45    /// they do not return anything to the caller but the result of the operation
46    /// (expressed by a [Result] type).
47    async fn handle(&self, command: Envelope<T>) -> Result<(), Self::Error>;
48}
49
50#[async_trait]
51impl<T, Err, F, Fut> Handler<T> for F
52where
53    T: message::Message + Send + Sync + 'static,
54    Err: Send + Sync,
55    F: Send + Sync + Fn(Envelope<T>) -> Fut,
56    Fut: Send + Sync + Future<Output = Result<(), Err>>,
57{
58    type Error = Err;
59
60    async fn handle(&self, command: Envelope<T>) -> Result<(), Self::Error> {
61        self(command).await
62    }
63}
64
65#[cfg(test)]
66mod test_user_domain {
67    use std::sync::Arc;
68
69    use async_trait::async_trait;
70
71    use crate::aggregate::test_user_domain::{User, UserEvent};
72    use crate::{aggregate, command, event, message};
73
74    struct UserService(Arc<dyn aggregate::Repository<User>>);
75
76    impl<R> From<R> for UserService
77    where
78        R: aggregate::Repository<User> + 'static,
79    {
80        fn from(repository: R) -> Self {
81            Self(Arc::new(repository))
82        }
83    }
84
85    struct CreateUser {
86        email: String,
87        password: String,
88    }
89
90    impl message::Message for CreateUser {
91        fn name(&self) -> &'static str {
92            "CreateUser"
93        }
94    }
95
96    #[async_trait]
97    impl command::Handler<CreateUser> for UserService {
98        type Error = anyhow::Error;
99
100        async fn handle(&self, command: command::Envelope<CreateUser>) -> Result<(), Self::Error> {
101            let command = command.message;
102            let mut user = aggregate::Root::<User>::create(command.email, command.password)?;
103
104            self.0.save(&mut user).await?;
105
106            Ok(())
107        }
108    }
109
110    struct ChangeUserPassword {
111        email: String,
112        password: String,
113    }
114
115    impl message::Message for ChangeUserPassword {
116        fn name(&self) -> &'static str {
117            "ChangeUserPassword"
118        }
119    }
120
121    #[async_trait]
122    impl command::Handler<ChangeUserPassword> for UserService {
123        type Error = anyhow::Error;
124
125        async fn handle(
126            &self,
127            command: command::Envelope<ChangeUserPassword>,
128        ) -> Result<(), Self::Error> {
129            let command = command.message;
130
131            let mut user = self.0.get(&command.email).await?;
132
133            user.change_password(command.password)?;
134
135            self.0.save(&mut user).await?;
136
137            Ok(())
138        }
139    }
140
141    #[tokio::test]
142    async fn it_creates_a_new_user_successfully() {
143        command::test::Scenario
144            .when(command::Envelope::from(CreateUser {
145                email: "test@test.com".to_owned(),
146                password: "not-a-secret".to_owned(),
147            }))
148            .then(vec![event::Persisted {
149                stream_id: "test@test.com".to_owned(),
150                version: 1,
151                event: event::Envelope::from(UserEvent::WasCreated {
152                    email: "test@test.com".to_owned(),
153                    password: "not-a-secret".to_owned(),
154                }),
155            }])
156            .assert_on(|event_store| {
157                UserService::from(aggregate::EventSourcedRepository::from(event_store))
158            })
159            .await;
160    }
161
162    #[tokio::test]
163    async fn it_fails_to_create_an_user_if_it_still_exists() {
164        command::test::Scenario
165            .given(vec![event::Persisted {
166                stream_id: "test@test.com".to_owned(),
167                version: 1,
168                event: event::Envelope::from(UserEvent::WasCreated {
169                    email: "test@test.com".to_owned(),
170                    password: "not-a-secret".to_owned(),
171                }),
172            }])
173            .when(command::Envelope::from(CreateUser {
174                email: "test@test.com".to_owned(),
175                password: "not-a-secret".to_owned(),
176            }))
177            .then_fails()
178            .assert_on(|event_store| {
179                UserService::from(aggregate::EventSourcedRepository::from(event_store))
180            })
181            .await;
182    }
183
184    #[tokio::test]
185    async fn it_updates_the_password_of_an_existing_user() {
186        command::test::Scenario
187            .given(vec![event::Persisted {
188                stream_id: "test@test.com".to_owned(),
189                version: 1,
190                event: event::Envelope::from(UserEvent::WasCreated {
191                    email: "test@test.com".to_owned(),
192                    password: "not-a-secret".to_owned(),
193                }),
194            }])
195            .when(command::Envelope::from(ChangeUserPassword {
196                email: "test@test.com".to_owned(),
197                password: "new-password".to_owned(),
198            }))
199            .then(vec![event::Persisted {
200                stream_id: "test@test.com".to_owned(),
201                version: 2,
202                event: event::Envelope::from(UserEvent::PasswordWasChanged {
203                    password: "new-password".to_owned(),
204                }),
205            }])
206            .assert_on(|event_store| {
207                UserService::from(aggregate::EventSourcedRepository::from(event_store))
208            })
209            .await;
210    }
211
212    #[tokio::test]
213    async fn it_fails_to_update_the_password_if_the_user_does_not_exist() {
214        command::test::Scenario
215            .when(command::Envelope::from(ChangeUserPassword {
216                email: "test@test.com".to_owned(),
217                password: "new-password".to_owned(),
218            }))
219            .then_fails()
220            .assert_on(|event_store| {
221                UserService::from(aggregate::EventSourcedRepository::from(event_store))
222            })
223            .await;
224    }
225}