eventually/command/
mod.rs1pub mod test;
15
16use std::future::Future;
17
18use async_trait::async_trait;
19
20use crate::message;
21
22pub type Envelope<T> = message::Envelope<T>;
27
28#[async_trait]
35pub trait Handler<T>: Send + Sync
36where
37 T: message::Message,
38{
39 type Error: Send + Sync;
41
42 async fn handle(&self, command: Envelope<T>) -> Result<(), Self::Error>;
48}
49
50#[async_trait]
51impl<T, Err, F, Fut> Handler<T> for F
52where
53 T: message::Message + Send + Sync + 'static,
54 Err: Send + Sync,
55 F: Send + Sync + Fn(Envelope<T>) -> Fut,
56 Fut: Send + Sync + Future<Output = Result<(), Err>>,
57{
58 type Error = Err;
59
60 async fn handle(&self, command: Envelope<T>) -> Result<(), Self::Error> {
61 self(command).await
62 }
63}
64
65#[cfg(test)]
66mod test_user_domain {
67 use std::sync::Arc;
68
69 use async_trait::async_trait;
70
71 use crate::aggregate::test_user_domain::{User, UserEvent};
72 use crate::{aggregate, command, event, message};
73
74 struct UserService(Arc<dyn aggregate::Repository<User>>);
75
76 impl<R> From<R> for UserService
77 where
78 R: aggregate::Repository<User> + 'static,
79 {
80 fn from(repository: R) -> Self {
81 Self(Arc::new(repository))
82 }
83 }
84
85 struct CreateUser {
86 email: String,
87 password: String,
88 }
89
90 impl message::Message for CreateUser {
91 fn name(&self) -> &'static str {
92 "CreateUser"
93 }
94 }
95
96 #[async_trait]
97 impl command::Handler<CreateUser> for UserService {
98 type Error = anyhow::Error;
99
100 async fn handle(&self, command: command::Envelope<CreateUser>) -> Result<(), Self::Error> {
101 let command = command.message;
102 let mut user = aggregate::Root::<User>::create(command.email, command.password)?;
103
104 self.0.save(&mut user).await?;
105
106 Ok(())
107 }
108 }
109
110 struct ChangeUserPassword {
111 email: String,
112 password: String,
113 }
114
115 impl message::Message for ChangeUserPassword {
116 fn name(&self) -> &'static str {
117 "ChangeUserPassword"
118 }
119 }
120
121 #[async_trait]
122 impl command::Handler<ChangeUserPassword> for UserService {
123 type Error = anyhow::Error;
124
125 async fn handle(
126 &self,
127 command: command::Envelope<ChangeUserPassword>,
128 ) -> Result<(), Self::Error> {
129 let command = command.message;
130
131 let mut user = self.0.get(&command.email).await?;
132
133 user.change_password(command.password)?;
134
135 self.0.save(&mut user).await?;
136
137 Ok(())
138 }
139 }
140
141 #[tokio::test]
142 async fn it_creates_a_new_user_successfully() {
143 command::test::Scenario
144 .when(command::Envelope::from(CreateUser {
145 email: "test@test.com".to_owned(),
146 password: "not-a-secret".to_owned(),
147 }))
148 .then(vec![event::Persisted {
149 stream_id: "test@test.com".to_owned(),
150 version: 1,
151 event: event::Envelope::from(UserEvent::WasCreated {
152 email: "test@test.com".to_owned(),
153 password: "not-a-secret".to_owned(),
154 }),
155 }])
156 .assert_on(|event_store| {
157 UserService::from(aggregate::EventSourcedRepository::from(event_store))
158 })
159 .await;
160 }
161
162 #[tokio::test]
163 async fn it_fails_to_create_an_user_if_it_still_exists() {
164 command::test::Scenario
165 .given(vec![event::Persisted {
166 stream_id: "test@test.com".to_owned(),
167 version: 1,
168 event: event::Envelope::from(UserEvent::WasCreated {
169 email: "test@test.com".to_owned(),
170 password: "not-a-secret".to_owned(),
171 }),
172 }])
173 .when(command::Envelope::from(CreateUser {
174 email: "test@test.com".to_owned(),
175 password: "not-a-secret".to_owned(),
176 }))
177 .then_fails()
178 .assert_on(|event_store| {
179 UserService::from(aggregate::EventSourcedRepository::from(event_store))
180 })
181 .await;
182 }
183
184 #[tokio::test]
185 async fn it_updates_the_password_of_an_existing_user() {
186 command::test::Scenario
187 .given(vec![event::Persisted {
188 stream_id: "test@test.com".to_owned(),
189 version: 1,
190 event: event::Envelope::from(UserEvent::WasCreated {
191 email: "test@test.com".to_owned(),
192 password: "not-a-secret".to_owned(),
193 }),
194 }])
195 .when(command::Envelope::from(ChangeUserPassword {
196 email: "test@test.com".to_owned(),
197 password: "new-password".to_owned(),
198 }))
199 .then(vec![event::Persisted {
200 stream_id: "test@test.com".to_owned(),
201 version: 2,
202 event: event::Envelope::from(UserEvent::PasswordWasChanged {
203 password: "new-password".to_owned(),
204 }),
205 }])
206 .assert_on(|event_store| {
207 UserService::from(aggregate::EventSourcedRepository::from(event_store))
208 })
209 .await;
210 }
211
212 #[tokio::test]
213 async fn it_fails_to_update_the_password_if_the_user_does_not_exist() {
214 command::test::Scenario
215 .when(command::Envelope::from(ChangeUserPassword {
216 email: "test@test.com".to_owned(),
217 password: "new-password".to_owned(),
218 }))
219 .then_fails()
220 .assert_on(|event_store| {
221 UserService::from(aggregate::EventSourcedRepository::from(event_store))
222 })
223 .await;
224 }
225}