twilight_standby/lib.rs
1#![doc = include_str!("../README.md")]
2#![warn(
3 clippy::missing_const_for_fn,
4 clippy::missing_docs_in_private_items,
5 clippy::pedantic,
6 missing_docs,
7 unsafe_code
8)]
9#![allow(
10 clippy::module_name_repetitions,
11 clippy::must_use_candidate,
12 clippy::unnecessary_wraps
13)]
14
15pub mod future;
16
17use self::future::{
18 WaitForComponentFuture, WaitForComponentStream, WaitForEventFuture, WaitForEventStream,
19 WaitForGuildEventFuture, WaitForGuildEventStream, WaitForMessageFuture, WaitForMessageStream,
20 WaitForReactionFuture, WaitForReactionStream,
21};
22use dashmap::DashMap;
23use std::{
24 fmt::{Debug, Display, Formatter, Result as FmtResult},
25 hash::Hash,
26 sync::atomic::{AtomicU64, Ordering},
27};
28use tokio::sync::{
29 mpsc::{self, UnboundedReceiver, UnboundedSender as MpscSender},
30 oneshot::{self, Receiver, Sender as OneshotSender},
31};
32use twilight_model::{
33 application::interaction::{Interaction, InteractionType},
34 gateway::{
35 event::Event,
36 payload::incoming::{MessageCreate, ReactionAdd},
37 },
38 id::{
39 marker::{ChannelMarker, GuildMarker, MessageMarker},
40 Id,
41 },
42};
43
44/// Map keyed by an ID - such as a channel ID or message ID - storing a list of
45/// bystanders.
46type BystanderMap<K, V> = DashMap<K, Vec<Bystander<V>>>;
47
48/// Sender to a caller that may be for a future bystander or a stream bystander.
49#[derive(Debug)]
50enum Sender<E> {
51 /// Bystander is a future and the sender is a oneshot.
52 Future(OneshotSender<E>),
53 /// Bystander is a stream and the sender is an MPSC.
54 Stream(MpscSender<E>),
55}
56
57impl<E> Sender<E> {
58 /// Whether the channel is closed.
59 fn is_closed(&self) -> bool {
60 match self {
61 Self::Future(sender) => sender.is_closed(),
62 Self::Stream(sender) => sender.is_closed(),
63 }
64 }
65}
66
67/// Registration for a caller to wait for an event based on a predicate
68/// function.
69struct Bystander<T> {
70 /// Predicate check to perform on an event.
71 func: Box<dyn Fn(&T) -> bool + Send + Sync>,
72 /// [`Sender::Future`]s consume themselves once upon sending so the sender
73 /// needs to be able to be taken out separately.
74 sender: Option<Sender<T>>,
75}
76
77impl<T: Debug> Debug for Bystander<T> {
78 fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
79 f.debug_struct("Bystander")
80 .field("func", &"<dyn Fn(&T) -> bool>")
81 .field("sender", &self.sender)
82 .finish()
83 }
84}
85
86/// The `Standby` struct, used by the main event loop to process events and by
87/// tasks to wait for an event.
88///
89/// Refer to the crate-level documentation for more information.
90///
91/// # Using Standby in multiple tasks
92///
93/// To use a Standby instance in multiple tasks, consider wrapping it in an
94/// [`std::sync::Arc`] or [`std::rc::Rc`].
95///
96/// # Examples
97///
98/// ## Timeouts
99///
100/// Futures can be timed out by passing the future returned by Standby to
101/// functions such as [`tokio::time::timeout`]:
102///
103/// ```rust,no_run
104/// # #[tokio::main] async fn main() -> Result<(), Box<dyn std::error::Error>> {
105/// use std::time::Duration;
106/// use twilight_model::gateway::event::{Event, EventType};
107/// use twilight_standby::Standby;
108///
109/// let standby = Standby::new();
110/// let future = standby.wait_for_event(|event: &Event| event.kind() == EventType::Ready);
111/// let event = tokio::time::timeout(Duration::from_secs(1), future).await?;
112/// # Ok(()) }
113/// ```
114///
115/// [`tokio::time::timeout`]: https://docs.rs/tokio/latest/tokio/time/fn.timeout.html
116#[derive(Debug, Default)]
117pub struct Standby {
118 /// List of component bystanders where the ID of the message is known
119 /// beforehand.
120 components: DashMap<Id<MessageMarker>, Vec<Bystander<Interaction>>>,
121 /// Bystanders for any event that may not be in any particular guild.
122 ///
123 /// The key is generated via [`event_counter`].
124 ///
125 /// [`event_counter`]: Self::event_counter
126 events: DashMap<u64, Bystander<Event>>,
127 /// Event counter to be used as the key of [`events`].
128 ///
129 /// [`events`]: Self::events
130 event_counter: AtomicU64,
131 /// List of bystanders where the ID of the guild is known beforehand.
132 guilds: DashMap<Id<GuildMarker>, Vec<Bystander<Event>>>,
133 /// List of message bystanders where the ID of the channel is known
134 /// beforehand.
135 messages: DashMap<Id<ChannelMarker>, Vec<Bystander<MessageCreate>>>,
136 /// List of reaction bystanders where the ID of the message is known
137 /// beforehand.
138 reactions: DashMap<Id<MessageMarker>, Vec<Bystander<ReactionAdd>>>,
139}
140
141impl Standby {
142 /// Create a new instance of `Standby`.
143 ///
144 /// Once a `Standby` has been created it must process gateway events via
145 /// [`process`]. Awaiting an event can start via methods such as
146 /// [`wait_for`] and [`wait_for_message_stream`].
147 ///
148 /// [`process`]: Self::process
149 /// [`wait_for`]: Self::wait_for
150 /// [`wait_for_message_stream`]: Self::wait_for_message_stream
151 #[must_use = "must process events to be useful"]
152 pub fn new() -> Self {
153 Self::default()
154 }
155
156 /// Process an event, calling any bystanders that might be waiting on it.
157 ///
158 /// Returns statistics about matched [`Standby`] calls and how they were
159 /// processed. For example, by using [`ProcessResults::matched`] you can
160 /// determine how many calls were sent an event.
161 ///
162 /// When a bystander checks to see if an event is what it's waiting for, it
163 /// will receive the event by cloning it.
164 ///
165 /// This function must be called when events are received in order for
166 /// futures returned by methods to fulfill.
167 pub fn process(&self, event: &Event) -> ProcessResults {
168 tracing::trace!(event_type = ?event.kind(), ?event, "processing event");
169
170 let mut completions = ProcessResults::new();
171
172 match event {
173 Event::InteractionCreate(e) => {
174 if e.kind == InteractionType::MessageComponent {
175 if let Some(message) = &e.message {
176 completions.add_with(&Self::process_specific_event(
177 &self.components,
178 message.id,
179 e,
180 ));
181 }
182 }
183 }
184 Event::MessageCreate(e) => {
185 completions.add_with(&Self::process_specific_event(
186 &self.messages,
187 e.0.channel_id,
188 e,
189 ));
190 }
191 Event::ReactionAdd(e) => {
192 completions.add_with(&Self::process_specific_event(
193 &self.reactions,
194 e.0.message_id,
195 e,
196 ));
197 }
198 _ => {}
199 }
200
201 if let Some(guild_id) = event.guild_id() {
202 completions.add_with(&Self::process_specific_event(&self.guilds, guild_id, event));
203 }
204
205 completions.add_with(&Self::process_event(&self.events, event));
206
207 completions
208 }
209
210 /// Wait for an event in a certain guild.
211 ///
212 /// To wait for multiple guild events matching the given predicate use
213 /// [`wait_for_stream`].
214 ///
215 /// # Examples
216 ///
217 /// Wait for a [`BanAdd`] event in guild 123:
218 ///
219 /// ```no_run
220 /// # #[tokio::main]
221 /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
222 /// use twilight_model::{
223 /// gateway::event::{Event, EventType},
224 /// id::Id,
225 /// };
226 /// use twilight_standby::Standby;
227 ///
228 /// let standby = Standby::new();
229 ///
230 /// let guild_id = Id::new(123);
231 ///
232 /// let reaction = standby
233 /// .wait_for(guild_id, |event: &Event| event.kind() == EventType::BanAdd)
234 /// .await?;
235 /// # Ok(()) }
236 /// ```
237 ///
238 /// # Errors
239 ///
240 /// The returned future resolves to a [`Canceled`] error if the associated
241 /// [`Standby`] instance is dropped.
242 ///
243 /// [`BanAdd`]: twilight_model::gateway::payload::incoming::BanAdd
244 /// [`Canceled`]: future::Canceled
245 /// [`wait_for_stream`]: Self::wait_for_stream
246 pub fn wait_for<F: Fn(&Event) -> bool + Send + Sync + 'static>(
247 &self,
248 guild_id: Id<GuildMarker>,
249 check: impl Into<Box<F>>,
250 ) -> WaitForGuildEventFuture {
251 tracing::trace!(%guild_id, "waiting for event in guild");
252
253 WaitForGuildEventFuture {
254 rx: Self::insert_future(&self.guilds, guild_id, check),
255 }
256 }
257
258 /// Wait for a stream of events in a certain guild.
259 ///
260 /// To wait for only one guild event matching the given predicate use
261 /// [`wait_for`].
262 ///
263 /// # Examples
264 ///
265 /// Wait for multiple [`BanAdd`] events in guild 123:
266 ///
267 /// ```no_run
268 /// # #[tokio::main]
269 /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
270 /// use tokio_stream::StreamExt;
271 /// use twilight_model::{
272 /// gateway::event::{Event, EventType},
273 /// id::Id,
274 /// };
275 /// use twilight_standby::Standby;
276 ///
277 /// let standby = Standby::new();
278 ///
279 /// let guild_id = Id::new(123);
280 ///
281 /// let mut stream =
282 /// standby.wait_for_stream(guild_id, |event: &Event| event.kind() == EventType::BanAdd);
283 ///
284 /// while let Some(event) = stream.next().await {
285 /// if let Event::BanAdd(ban) = event {
286 /// println!("user {} was banned in guild {}", ban.user.id, ban.guild_id);
287 /// }
288 /// }
289 /// # Ok(()) }
290 /// ```
291 ///
292 /// # Errors
293 ///
294 /// The returned stream ends when the associated [`Standby`] instance is
295 /// dropped.
296 ///
297 /// [`BanAdd`]: twilight_model::gateway::payload::incoming::BanAdd
298 /// [`wait_for`]: Self::wait_for
299 pub fn wait_for_stream<F: Fn(&Event) -> bool + Send + Sync + 'static>(
300 &self,
301 guild_id: Id<GuildMarker>,
302 check: impl Into<Box<F>>,
303 ) -> WaitForGuildEventStream {
304 tracing::trace!(%guild_id, "waiting for event in guild");
305
306 WaitForGuildEventStream {
307 rx: Self::insert_stream(&self.guilds, guild_id, check),
308 }
309 }
310
311 /// Wait for an event not in a certain guild. This must be filtered by an
312 /// event type.
313 ///
314 /// To wait for multiple events matching the given predicate use
315 /// [`wait_for_event_stream`].
316 ///
317 /// # Examples
318 ///
319 /// Wait for a [`Ready`] event for shard 5:
320 ///
321 /// ```no_run
322 /// # #[tokio::main]
323 /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
324 /// use twilight_model::gateway::event::{Event, EventType};
325 /// use twilight_standby::Standby;
326 ///
327 /// let standby = Standby::new();
328 ///
329 /// let ready = standby
330 /// .wait_for_event(|event: &Event| {
331 /// if let Event::Ready(ready) = event {
332 /// ready.shard.map_or(false, |id| id.number() == 5)
333 /// } else {
334 /// false
335 /// }
336 /// })
337 /// .await?;
338 /// # Ok(()) }
339 /// ```
340 ///
341 /// # Errors
342 ///
343 /// The returned future resolves to a [`Canceled`] error if the associated
344 /// [`Standby`] instance is dropped.
345 ///
346 /// [`Canceled`]: future::Canceled
347 /// [`Ready`]: twilight_model::gateway::payload::incoming::Ready
348 /// [`wait_for_event_stream`]: Self::wait_for_event_stream
349 pub fn wait_for_event<F: Fn(&Event) -> bool + Send + Sync + 'static>(
350 &self,
351 check: impl Into<Box<F>>,
352 ) -> WaitForEventFuture {
353 tracing::trace!("waiting for event");
354
355 let (tx, rx) = oneshot::channel();
356
357 self.events.insert(
358 self.next_event_id(),
359 Bystander {
360 func: check.into(),
361 sender: Some(Sender::Future(tx)),
362 },
363 );
364
365 WaitForEventFuture { rx }
366 }
367
368 /// Wait for a stream of events not in a certain guild. This must be
369 /// filtered by an event type.
370 ///
371 /// To wait for only one event matching the given predicate use
372 /// [`wait_for_event`].
373 ///
374 /// # Examples
375 ///
376 /// Wait for multiple [`Ready`] events on shard 5:
377 ///
378 /// ```no_run
379 /// # #[tokio::main]
380 /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
381 /// use tokio_stream::StreamExt;
382 /// use twilight_model::gateway::event::{Event, EventType};
383 /// use twilight_standby::Standby;
384 ///
385 /// let standby = Standby::new();
386 ///
387 /// let mut events = standby.wait_for_event_stream(|event: &Event| {
388 /// if let Event::Ready(ready) = event {
389 /// ready.shard.map_or(false, |id| id.number() == 5)
390 /// } else {
391 /// false
392 /// }
393 /// });
394 ///
395 /// while let Some(event) = events.next().await {
396 /// println!("got event with type {:?}", event.kind());
397 /// }
398 /// # Ok(()) }
399 /// ```
400 ///
401 /// # Errors
402 ///
403 /// The returned stream ends when the associated [`Standby`] instance is
404 /// dropped.
405 ///
406 /// [`Ready`]: twilight_model::gateway::payload::incoming::Ready
407 /// [`wait_for_event`]: Self::wait_for_event
408 pub fn wait_for_event_stream<F: Fn(&Event) -> bool + Send + Sync + 'static>(
409 &self,
410 check: impl Into<Box<F>>,
411 ) -> WaitForEventStream {
412 tracing::trace!("waiting for event");
413
414 let (tx, rx) = mpsc::unbounded_channel();
415
416 self.events.insert(
417 self.next_event_id(),
418 Bystander {
419 func: check.into(),
420 sender: Some(Sender::Stream(tx)),
421 },
422 );
423
424 WaitForEventStream { rx }
425 }
426
427 /// Wait for a message in a certain channel.
428 ///
429 /// To wait for multiple messages matching the given predicate use
430 /// [`wait_for_message_stream`].
431 ///
432 /// # Examples
433 ///
434 /// Wait for a message in channel 123 by user 456 with the content "test":
435 ///
436 /// ```no_run
437 /// # #[tokio::main]
438 /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
439 /// use twilight_model::{gateway::payload::incoming::MessageCreate, id::Id};
440 /// use twilight_standby::Standby;
441 ///
442 /// let standby = Standby::new();
443 ///
444 /// let author_id = Id::new(456);
445 /// let channel_id = Id::new(123);
446 ///
447 /// let message = standby
448 /// .wait_for_message(channel_id, move |event: &MessageCreate| {
449 /// event.author.id == author_id && event.content == "test"
450 /// })
451 /// .await?;
452 /// # Ok(()) }
453 /// ```
454 ///
455 /// # Errors
456 ///
457 /// The returned future resolves to a [`Canceled`] error if the associated
458 /// [`Standby`] instance is dropped.
459 ///
460 /// [`Canceled`]: future::Canceled
461 /// [`wait_for_message_stream`]: Self::wait_for_message_stream
462 pub fn wait_for_message<F: Fn(&MessageCreate) -> bool + Send + Sync + 'static>(
463 &self,
464 channel_id: Id<ChannelMarker>,
465 check: impl Into<Box<F>>,
466 ) -> WaitForMessageFuture {
467 tracing::trace!(%channel_id, "waiting for message in channel");
468
469 WaitForMessageFuture {
470 rx: Self::insert_future(&self.messages, channel_id, check),
471 }
472 }
473
474 /// Wait for a stream of message in a certain channel.
475 ///
476 /// To wait for only one message matching the given predicate use
477 /// [`wait_for_message`].
478 ///
479 /// # Examples
480 ///
481 /// Wait for multiple messages in channel 123 by user 456 with the content
482 /// "test":
483 ///
484 /// ```no_run
485 /// # #[tokio::main]
486 /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
487 /// use tokio_stream::StreamExt;
488 /// use twilight_model::{gateway::payload::incoming::MessageCreate, id::Id};
489 /// use twilight_standby::Standby;
490 ///
491 /// let standby = Standby::new();
492 ///
493 /// let author_id = Id::new(456);
494 /// let channel_id = Id::new(123);
495 ///
496 /// let mut messages = standby.wait_for_message_stream(channel_id, move |event: &MessageCreate| {
497 /// event.author.id == author_id && event.content == "test"
498 /// });
499 ///
500 /// while let Some(message) = messages.next().await {
501 /// println!("got message by {}", message.author.id);
502 /// }
503 /// # Ok(()) }
504 /// ```
505 ///
506 /// # Errors
507 ///
508 /// The returned stream ends when the associated [`Standby`] instance is
509 /// dropped.
510 ///
511 /// [`wait_for_message`]: Self::wait_for_message
512 pub fn wait_for_message_stream<F: Fn(&MessageCreate) -> bool + Send + Sync + 'static>(
513 &self,
514 channel_id: Id<ChannelMarker>,
515 check: impl Into<Box<F>>,
516 ) -> WaitForMessageStream {
517 tracing::trace!(%channel_id, "waiting for message in channel");
518
519 WaitForMessageStream {
520 rx: Self::insert_stream(&self.messages, channel_id, check),
521 }
522 }
523
524 /// Wait for a reaction on a certain message.
525 ///
526 /// To wait for multiple reactions matching the given predicate use
527 /// [`wait_for_reaction_stream`].
528 ///
529 /// # Examples
530 ///
531 /// Wait for a reaction on message 123 by user 456:
532 ///
533 /// ```no_run
534 /// # #[tokio::main]
535 /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
536 /// use twilight_model::{gateway::payload::incoming::ReactionAdd, id::Id};
537 /// use twilight_standby::Standby;
538 ///
539 /// let standby = Standby::new();
540 ///
541 /// let message_id = Id::new(123);
542 /// let user_id = Id::new(456);
543 ///
544 /// let reaction = standby
545 /// .wait_for_reaction(message_id, move |event: &ReactionAdd| {
546 /// event.user_id == user_id
547 /// })
548 /// .await?;
549 /// # Ok(()) }
550 /// ```
551 ///
552 /// # Errors
553 ///
554 /// The returned future resolves to a [`Canceled`] error if the associated
555 /// [`Standby`] instance is dropped.
556 ///
557 /// [`Canceled`]: future::Canceled
558 /// [`wait_for_reaction_stream`]: Self::wait_for_reaction_stream
559 pub fn wait_for_reaction<F: Fn(&ReactionAdd) -> bool + Send + Sync + 'static>(
560 &self,
561 message_id: Id<MessageMarker>,
562 check: impl Into<Box<F>>,
563 ) -> WaitForReactionFuture {
564 tracing::trace!(%message_id, "waiting for reaction on message");
565
566 WaitForReactionFuture {
567 rx: Self::insert_future(&self.reactions, message_id, check),
568 }
569 }
570
571 /// Wait for a stream of reactions on a certain message.
572 ///
573 /// To wait for only one reaction matching the given predicate use
574 /// [`wait_for_reaction`].
575 ///
576 /// # Examples
577 ///
578 /// Wait for multiple reactions on message 123 with unicode reaction "🤠":
579 ///
580 /// ```no_run
581 /// # #[tokio::main]
582 /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
583 /// use tokio_stream::StreamExt;
584 /// use twilight_model::{
585 /// channel::message::EmojiReactionType,
586 /// gateway::payload::incoming::ReactionAdd,
587 /// id::Id,
588 /// };
589 /// use twilight_standby::Standby;
590 ///
591 /// let standby = Standby::new();
592 ///
593 /// let message_id = Id::new(123);
594 ///
595 /// let mut reactions = standby.wait_for_reaction_stream(message_id, |event: &ReactionAdd| {
596 /// matches!(&event.emoji, EmojiReactionType::Unicode { name } if name == "🤠")
597 /// });
598 ///
599 /// while let Some(reaction) = reactions.next().await {
600 /// println!("got a reaction by {}", reaction.user_id);
601 /// }
602 /// # Ok(()) }
603 /// ```
604 ///
605 /// # Errors
606 ///
607 /// The returned stream ends when the associated [`Standby`] instance is
608 /// dropped.
609 ///
610 /// [`wait_for_reaction`]: Self::wait_for_reaction
611 pub fn wait_for_reaction_stream<F: Fn(&ReactionAdd) -> bool + Send + Sync + 'static>(
612 &self,
613 message_id: Id<MessageMarker>,
614 check: impl Into<Box<F>>,
615 ) -> WaitForReactionStream {
616 tracing::trace!(%message_id, "waiting for reaction on message");
617
618 WaitForReactionStream {
619 rx: Self::insert_stream(&self.reactions, message_id, check),
620 }
621 }
622
623 /// Wait for a component on a certain message.
624 ///
625 /// Returns a `Canceled` error if the `Standby` struct was dropped.
626 ///
627 /// If you need to wait for multiple components matching the given predicate,
628 /// use [`wait_for_component_stream`].
629 ///
630 /// # Examples
631 ///
632 /// Wait for a component on message 123 by user 456:
633 ///
634 /// ```no_run
635 /// # #[tokio::main] async fn main() -> Result<(), Box<dyn std::error::Error>> {
636 /// use twilight_model::{application::interaction::Interaction, id::Id};
637 /// use twilight_standby::Standby;
638 ///
639 /// let standby = Standby::new();
640 /// let message_id = Id::new(123);
641 ///
642 /// let component = standby
643 /// .wait_for_component(message_id, |event: &Interaction| {
644 /// event.author_id() == Some(Id::new(456))
645 /// })
646 /// .await?;
647 /// # Ok(()) }
648 /// ```
649 ///
650 /// [`wait_for_component_stream`]: Self::wait_for_component_stream
651 pub fn wait_for_component<F: Fn(&Interaction) -> bool + Send + Sync + 'static>(
652 &self,
653 message_id: Id<MessageMarker>,
654 check: impl Into<Box<F>>,
655 ) -> WaitForComponentFuture {
656 tracing::trace!(%message_id, "waiting for component on message");
657
658 WaitForComponentFuture {
659 rx: Self::insert_future(&self.components, message_id, check),
660 }
661 }
662
663 /// Wait for a stream of components on a certain message.
664 ///
665 /// Returns a `Canceled` error if the `Standby` struct was dropped.
666 ///
667 /// If you need to wait for only one component matching the given predicate,
668 /// use [`wait_for_component`].
669 ///
670 /// # Examples
671 ///
672 /// Wait for multiple button components on message 123 with a `custom_id` of
673 /// "Click":
674 ///
675 /// ```no_run
676 /// # #[tokio::main] async fn main() -> Result<(), Box<dyn std::error::Error>> {
677 /// use tokio_stream::StreamExt;
678 /// use twilight_model::{
679 /// application::interaction::{Interaction, InteractionData},
680 /// id::Id,
681 /// };
682 /// use twilight_standby::Standby;
683 ///
684 /// let standby = Standby::new();
685 /// let message_id = Id::new(123);
686 ///
687 /// let mut components = standby.wait_for_component_stream(message_id, |event: &Interaction| {
688 /// if let Some(InteractionData::MessageComponent(data)) = &event.data {
689 /// data.custom_id == "Click".to_string()
690 /// } else {
691 /// false
692 /// }
693 /// });
694 ///
695 /// while let Some(component) = components.next().await {
696 /// println!("got a component by {}", component.author_id().unwrap());
697 /// }
698 /// # Ok(()) }
699 /// ```
700 ///
701 /// [`wait_for_component`]: Self::wait_for_component
702 pub fn wait_for_component_stream<F: Fn(&Interaction) -> bool + Send + Sync + 'static>(
703 &self,
704 message_id: Id<MessageMarker>,
705 check: impl Into<Box<F>>,
706 ) -> WaitForComponentStream {
707 tracing::trace!(%message_id, "waiting for component on message");
708
709 WaitForComponentStream {
710 rx: Self::insert_stream(&self.components, message_id, check),
711 }
712 }
713
714 /// Next event ID in [`Standby::event_counter`].
715 fn next_event_id(&self) -> u64 {
716 self.event_counter.fetch_add(1, Ordering::SeqCst)
717 }
718
719 /// Append a new future bystander into a map according to the ID.
720 fn insert_future<F: Fn(&V) -> bool + Send + Sync + 'static, K: Eq + Hash, V>(
721 map: &BystanderMap<K, V>,
722 id: K,
723 check: impl Into<Box<F>>,
724 ) -> Receiver<V> {
725 let (tx, rx) = oneshot::channel();
726
727 let mut entry = map.entry(id).or_default();
728 entry.push(Bystander {
729 func: check.into(),
730 sender: Some(Sender::Future(tx)),
731 });
732
733 rx
734 }
735
736 /// Append a new stream bystander into a map according to the ID.
737 fn insert_stream<F: Fn(&V) -> bool + Send + Sync + 'static, K: Eq + Hash, V>(
738 map: &BystanderMap<K, V>,
739 id: K,
740 check: impl Into<Box<F>>,
741 ) -> UnboundedReceiver<V> {
742 let (tx, rx) = mpsc::unbounded_channel();
743
744 let mut entry = map.entry(id).or_default();
745 entry.push(Bystander {
746 func: check.into(),
747 sender: Some(Sender::Stream(tx)),
748 });
749
750 rx
751 }
752
753 /// Process a general event that is not of any particular type or in any
754 /// particular guild.
755 #[tracing::instrument(level = "trace")]
756 fn process_event<K: Debug + Display + Eq + Hash + PartialEq + 'static, V: Clone + Debug>(
757 map: &DashMap<K, Bystander<V>>,
758 event: &V,
759 ) -> ProcessResults {
760 tracing::trace!(?event, "processing event");
761
762 let mut results = ProcessResults::new();
763
764 map.retain(|id, bystander| {
765 let result = Self::bystander_process(bystander, event);
766 results.handle(result);
767
768 tracing::trace!(bystander_id = %id, ?result, "event bystander processed");
769
770 // We want to retain bystanders that are *incomplete* and remove
771 // bystanders that are *complete*.
772 !result.is_complete()
773 });
774
775 results
776 }
777
778 /// Process a general event that is either of a particular type or in a
779 /// particular guild.
780 #[tracing::instrument(level = "trace")]
781 fn process_specific_event<
782 K: Debug + Display + Eq + Hash + PartialEq + 'static,
783 V: Clone + Debug,
784 >(
785 map: &DashMap<K, Vec<Bystander<V>>>,
786 guild_id: K,
787 event: &V,
788 ) -> ProcessResults {
789 // Iterate over a guild's bystanders and mark it for removal if there
790 // are no bystanders remaining.
791 let (remove_guild, results) = if let Some(mut bystanders) = map.get_mut(&guild_id) {
792 let results = Self::bystander_iter(&mut bystanders, event);
793
794 (bystanders.is_empty(), results)
795 } else {
796 tracing::trace!(%guild_id, "guild has no event bystanders");
797
798 return ProcessResults::new();
799 };
800
801 if remove_guild {
802 tracing::trace!(%guild_id, "removing guild from map");
803
804 map.remove(&guild_id);
805 }
806
807 results
808 }
809
810 /// Iterate over bystanders and remove the ones that match the predicate.
811 #[tracing::instrument(level = "trace")]
812 fn bystander_iter<E: Clone + Debug>(
813 bystanders: &mut Vec<Bystander<E>>,
814 event: &E,
815 ) -> ProcessResults {
816 tracing::trace!(?bystanders, "iterating over bystanders");
817
818 // Iterate over the list of bystanders by using an index and manually
819 // indexing in to the list.
820 //
821 // # Logic
822 //
823 // In each iteration we decide whether to retain a bystander: if we do
824 // then we can increment our index and move on, but if we opt to instead
825 // remove it then we do so and don't increment the index. The reason we
826 // don't increment the index is because when we remove an element the
827 // index does not become empty and instead everything to the right is
828 // shifted to the left, illustrated as such:
829 //
830 // |---|
831 // v |
832 // A - B - C - D
833 // | ^ |
834 // | |---|
835 // |
836 // Remove B
837 //
838 // After: A - C - D
839 //
840 // # Reasons not to use alternatives
841 //
842 // **`Vec::retain`** we need to mutate the entries in order to take out
843 // the sender and `Vec::retain` only gives us immutable references.
844 //
845 // A form of enumeration can't be used because sometimes the index
846 // doesn't advance; iterators would continue to provide incrementing
847 // enumeration indexes while we sometimes want to reuse an index.
848 let mut index = 0;
849 let mut results = ProcessResults::new();
850
851 while index < bystanders.len() {
852 tracing::trace!(%index, "checking bystander");
853
854 let status = Self::bystander_process(&mut bystanders[index], event);
855 results.handle(status);
856
857 tracing::trace!(%index, ?status, "checked bystander");
858
859 if status.is_complete() {
860 bystanders.remove(index);
861 } else {
862 index += 1;
863 }
864 }
865
866 results
867 }
868
869 /// Process a bystander, sending the event if the sender is active and the
870 /// predicate matches. Returns whether the bystander has fulfilled.
871 ///
872 /// Returns whether the bystander is fulfilled; if the bystander has been
873 /// fulfilled then the channel is now closed.
874 #[tracing::instrument(level = "trace")]
875 fn bystander_process<T: Clone + Debug>(
876 bystander: &mut Bystander<T>,
877 event: &T,
878 ) -> ProcessStatus {
879 // We need to take the sender out because `OneshotSender`s consume
880 // themselves when calling `OneshotSender::send`.
881 let Some(sender) = bystander.sender.take() else {
882 tracing::trace!("bystander has no sender, indicating for removal");
883
884 return ProcessStatus::AlreadyComplete;
885 };
886
887 // The channel may have closed due to the receiver dropping their end,
888 // in which case we can say we're done.
889 if sender.is_closed() {
890 tracing::trace!("bystander's rx dropped, indicating for removal");
891
892 return ProcessStatus::Dropped;
893 }
894
895 // Lastly check to see if the predicate matches the event. If it doesn't
896 // then we can short-circuit.
897 if !(bystander.func)(event) {
898 tracing::trace!("bystander check doesn't match, not removing");
899
900 // Put the sender back into its bystander since we'll still need it
901 // next time around.
902 bystander.sender.replace(sender);
903
904 return ProcessStatus::Skip;
905 }
906
907 match sender {
908 Sender::Future(tx) => {
909 // We don't care if the event successfully sends or not since
910 // we're going to be tossing out the bystander anyway.
911 drop(tx.send(event.clone()));
912
913 tracing::trace!("bystander matched event, indicating for removal");
914
915 ProcessStatus::SentFuture
916 }
917 Sender::Stream(tx) => {
918 // If we can send an event to the receiver and the channel is
919 // still open then we need to retain the bystander, otherwise we
920 // need to mark it for removal.
921 if tx.send(event.clone()).is_ok() {
922 tracing::trace!("bystander is a stream, retaining in map");
923
924 bystander.sender.replace(Sender::Stream(tx));
925
926 ProcessStatus::SentStream
927 } else {
928 ProcessStatus::Dropped
929 }
930 }
931 }
932 }
933}
934/// Number of [`Standby`] calls that were completed.
935#[derive(Clone, Debug, Eq, Hash, PartialEq)]
936pub struct ProcessResults {
937 /// Number of bystanders that were dropped due to the receiving end
938 /// dropping.
939 dropped: usize,
940 /// Number of future bystanders that were open and were sent an event.
941 fulfilled: usize,
942 /// Number of stream bystanders that were open and were sent an event.
943 sent: usize,
944}
945
946impl ProcessResults {
947 /// Create a new set of zeroed out results.
948 const fn new() -> Self {
949 Self {
950 dropped: 0,
951 fulfilled: 0,
952 sent: 0,
953 }
954 }
955
956 /// Number of [`Standby`] calls where the receiver had already dropped their
957 /// end.
958 ///
959 /// This may happen when a caller calls into [`Standby`] but then times out
960 /// or otherwise cancels their request.
961 pub const fn dropped(&self) -> usize {
962 self.dropped
963 }
964
965 /// Number of [`Standby`] calls that were fulfilled.
966 ///
967 /// Calls for futures via methods such as [`Standby::wait_for`] will be
968 /// marked as fulfilled once matched and an event is sent over the channel.
969 ///
970 /// **Caveat**: although an event has been sent over the channel to the
971 /// receiver it is not guaranteed whether the receiver end actually received
972 /// it; the receiver end may drop shortly after an event is sent. In this
973 /// case the call is considered to be fulfilled.
974 pub const fn fulfilled(&self) -> usize {
975 self.fulfilled
976 }
977
978 /// Number of calls that were matched and were sent an event.
979 ///
980 /// This is the sum of [`fulfilled`] and [`sent`].
981 ///
982 /// See the caveats for both methods.
983 ///
984 /// [`fulfilled`]: Self::fulfilled
985 /// [`sent`]: Self::sent
986 pub const fn matched(&self) -> usize {
987 self.fulfilled() + self.sent()
988 }
989
990 /// Number of [`Standby`] streaming calls that were matched and had an event
991 /// sent to them.
992 ///
993 /// **Caveat**: although an event has been sent over the channel to the
994 /// receiver it is not guaranteed whether the receiver end actually received
995 /// it; the receiver end may drop shortly after an event is sent. In this
996 /// case the call is considered to be sent. Checks over this call will in
997 /// the future be considered [`dropped`].
998 ///
999 /// [`dropped`]: Self::dropped
1000 pub const fn sent(&self) -> usize {
1001 self.sent
1002 }
1003
1004 /// Add another set of results to this set.
1005 fn add_with(&mut self, other: &Self) {
1006 self.dropped = self.dropped.saturating_add(other.dropped);
1007 self.fulfilled = self.fulfilled.saturating_add(other.fulfilled);
1008 self.sent = self.sent.saturating_add(other.sent);
1009 }
1010
1011 /// Handle a process status.
1012 fn handle(&mut self, status: ProcessStatus) {
1013 match status {
1014 ProcessStatus::Dropped => {
1015 self.dropped += 1;
1016 }
1017 ProcessStatus::SentFuture => {
1018 self.fulfilled += 1;
1019 }
1020 ProcessStatus::SentStream => {
1021 self.sent += 1;
1022 }
1023 ProcessStatus::AlreadyComplete | ProcessStatus::Skip => {}
1024 }
1025 }
1026}
1027
1028/// Status result of processing a bystander via [`Standby::bystander_process`].
1029#[derive(Clone, Copy, Debug)]
1030enum ProcessStatus {
1031 /// Call matched but already matched previously and was not removed, so the
1032 /// subject must be removed and not counted towards results.
1033 AlreadyComplete,
1034 /// Call matched but the receiver dropped their end.
1035 Dropped,
1036 /// Call matched a oneshot.
1037 SentFuture,
1038 /// Call matched a stream.
1039 SentStream,
1040 /// Call was not matched.
1041 Skip,
1042}
1043
1044impl ProcessStatus {
1045 /// Whether the call is complete.
1046 const fn is_complete(self) -> bool {
1047 matches!(
1048 self,
1049 Self::AlreadyComplete | Self::Dropped | Self::SentFuture
1050 )
1051 }
1052}
1053
1054#[cfg(test)]
1055mod tests {
1056 #![allow(clippy::non_ascii_literal)]
1057
1058 use crate::Standby;
1059 use static_assertions::assert_impl_all;
1060 use std::fmt::Debug;
1061 use tokio_stream::StreamExt;
1062 use twilight_gateway::{Event, EventType};
1063 use twilight_model::{
1064 application::interaction::{
1065 message_component::MessageComponentInteractionData, Interaction, InteractionData,
1066 InteractionType,
1067 },
1068 channel::{
1069 message::{component::ComponentType, EmojiReactionType, Message, MessageType},
1070 Channel, ChannelType,
1071 },
1072 gateway::{
1073 payload::incoming::{InteractionCreate, MessageCreate, ReactionAdd, Ready, RoleDelete},
1074 GatewayReaction, ShardId,
1075 },
1076 guild::Permissions,
1077 id::{marker::GuildMarker, Id},
1078 oauth::{ApplicationFlags, ApplicationIntegrationMap, PartialApplication},
1079 user::{CurrentUser, User},
1080 util::Timestamp,
1081 };
1082
1083 assert_impl_all!(Standby: Debug, Default, Send, Sync);
1084
1085 #[allow(deprecated)]
1086 fn message() -> Message {
1087 Message {
1088 activity: None,
1089 application: None,
1090 application_id: None,
1091 attachments: Vec::new(),
1092 author: User {
1093 accent_color: None,
1094 avatar: None,
1095 avatar_decoration: None,
1096 avatar_decoration_data: None,
1097 banner: None,
1098 bot: false,
1099 discriminator: 1,
1100 email: None,
1101 flags: None,
1102 global_name: Some("test".to_owned()),
1103 id: Id::new(2),
1104 locale: None,
1105 mfa_enabled: None,
1106 name: "twilight".to_owned(),
1107 premium_type: None,
1108 public_flags: None,
1109 system: None,
1110 verified: None,
1111 },
1112 call: None,
1113 channel_id: Id::new(1),
1114 components: Vec::new(),
1115 content: "test".to_owned(),
1116 edited_timestamp: None,
1117 embeds: Vec::new(),
1118 flags: None,
1119 guild_id: Some(Id::new(4)),
1120 id: Id::new(3),
1121 interaction: None,
1122 interaction_metadata: None,
1123 kind: MessageType::Regular,
1124 member: None,
1125 mention_channels: Vec::new(),
1126 mention_everyone: false,
1127 mention_roles: Vec::new(),
1128 mentions: Vec::new(),
1129 message_snapshots: Vec::new(),
1130 pinned: false,
1131 poll: None,
1132 reactions: Vec::new(),
1133 reference: None,
1134 referenced_message: None,
1135 role_subscription_data: None,
1136 sticker_items: Vec::new(),
1137 timestamp: Timestamp::from_secs(1_632_072_645).expect("non zero"),
1138 thread: None,
1139 tts: false,
1140 webhook_id: None,
1141 }
1142 }
1143
1144 fn reaction() -> GatewayReaction {
1145 GatewayReaction {
1146 burst: false,
1147 burst_colors: Vec::new(),
1148 channel_id: Id::new(2),
1149 emoji: EmojiReactionType::Unicode {
1150 name: "🍎".to_owned(),
1151 },
1152 guild_id: Some(Id::new(1)),
1153 member: None,
1154 message_author_id: None,
1155 message_id: Id::new(4),
1156 user_id: Id::new(3),
1157 }
1158 }
1159
1160 #[allow(deprecated)]
1161 fn button() -> Interaction {
1162 Interaction {
1163 app_permissions: Some(Permissions::SEND_MESSAGES),
1164 application_id: Id::new(1),
1165 authorizing_integration_owners: ApplicationIntegrationMap {
1166 guild: None,
1167 user: None,
1168 },
1169 channel: Some(Channel {
1170 bitrate: None,
1171 guild_id: None,
1172 id: Id::new(400),
1173 kind: ChannelType::GuildText,
1174 last_message_id: None,
1175 last_pin_timestamp: None,
1176 name: None,
1177 nsfw: None,
1178 owner_id: None,
1179 parent_id: None,
1180 permission_overwrites: None,
1181 position: None,
1182 rate_limit_per_user: None,
1183 recipients: None,
1184 rtc_region: None,
1185 topic: None,
1186 user_limit: None,
1187 application_id: None,
1188 applied_tags: None,
1189 available_tags: None,
1190 default_auto_archive_duration: None,
1191 default_forum_layout: None,
1192 default_reaction_emoji: None,
1193 default_sort_order: None,
1194 default_thread_rate_limit_per_user: None,
1195 flags: None,
1196 icon: None,
1197 invitable: None,
1198 managed: None,
1199 member: None,
1200 member_count: None,
1201 message_count: None,
1202 newly_created: None,
1203 thread_metadata: None,
1204 video_quality_mode: None,
1205 }),
1206 channel_id: None,
1207 context: None,
1208 data: Some(InteractionData::MessageComponent(Box::new(
1209 MessageComponentInteractionData {
1210 custom_id: String::from("Click"),
1211 component_type: ComponentType::Button,
1212 resolved: None,
1213 values: Vec::new(),
1214 },
1215 ))),
1216 entitlements: Vec::new(),
1217 guild: None,
1218 guild_id: Some(Id::new(3)),
1219 guild_locale: None,
1220 id: Id::new(4),
1221 kind: InteractionType::MessageComponent,
1222 locale: Some("en-GB".to_owned()),
1223 member: None,
1224 message: Some(message()),
1225 token: String::from("token"),
1226 user: Some(User {
1227 accent_color: None,
1228 avatar: None,
1229 avatar_decoration: None,
1230 avatar_decoration_data: None,
1231 banner: None,
1232 bot: false,
1233 discriminator: 1,
1234 email: None,
1235 flags: None,
1236 global_name: Some("test".to_owned()),
1237 id: Id::new(2),
1238 locale: None,
1239 mfa_enabled: None,
1240 name: "twilight".to_owned(),
1241 premium_type: None,
1242 public_flags: None,
1243 system: None,
1244 verified: None,
1245 }),
1246 }
1247 }
1248
1249 /// Test that if a receiver drops their end, the result properly counts the
1250 /// statistic.
1251 #[tokio::test]
1252 async fn test_dropped() {
1253 let standby = Standby::new();
1254 let guild_id = Id::new(1);
1255
1256 {
1257 let _rx = standby.wait_for(guild_id, move |_: &Event| false);
1258 }
1259
1260 let results = standby.process(&Event::RoleDelete(RoleDelete {
1261 guild_id,
1262 role_id: Id::new(2),
1263 }));
1264
1265 assert_eq!(1, results.dropped());
1266 assert_eq!(0, results.fulfilled());
1267 assert_eq!(0, results.sent());
1268 }
1269
1270 /// Test that both events in guild 1 is matched but the event in guild 2 is
1271 /// not matched by testing the returned matched amount.
1272 #[tokio::test]
1273 async fn test_matched() {
1274 fn check(event: &Event, guild_id: Id<GuildMarker>) -> bool {
1275 matches!(event, Event::RoleDelete(e) if e.guild_id == guild_id)
1276 }
1277
1278 let standby = Standby::new();
1279 let guild_id_one = Id::new(1);
1280 let guild_id_two = Id::new(2);
1281 let _one = standby.wait_for(guild_id_one, move |event: &Event| {
1282 check(event, guild_id_one)
1283 });
1284 let _two = standby.wait_for(guild_id_one, move |event: &Event| {
1285 check(event, guild_id_one)
1286 });
1287 let _three = standby.wait_for(guild_id_two, move |event: &Event| {
1288 check(event, guild_id_two)
1289 });
1290
1291 let results = standby.process(&Event::RoleDelete(RoleDelete {
1292 guild_id: Id::new(1),
1293 role_id: Id::new(2),
1294 }));
1295
1296 assert_eq!(0, results.dropped());
1297 assert_eq!(2, results.fulfilled());
1298 assert_eq!(0, results.sent());
1299 }
1300
1301 /// Test that the [`ProcessResults::sent`] counter increments if a match is
1302 /// sent to it.
1303 #[tokio::test]
1304 async fn test_sent() {
1305 let standby = Standby::new();
1306 let guild_id = Id::new(1);
1307
1308 let _rx = standby.wait_for_stream(guild_id, move |_: &Event| true);
1309
1310 let results = standby.process(&Event::RoleDelete(RoleDelete {
1311 guild_id,
1312 role_id: Id::new(2),
1313 }));
1314
1315 assert_eq!(0, results.dropped());
1316 assert_eq!(0, results.fulfilled());
1317 assert_eq!(1, results.sent());
1318 }
1319
1320 /// Test basic functionality of the [`Standby::wait_for`] method.
1321 #[tokio::test]
1322 async fn test_wait_for() {
1323 let standby = Standby::new();
1324 let wait = standby.wait_for(
1325 Id::new(1),
1326 |event: &Event| matches!(event, Event::RoleDelete(e) if e.guild_id.get() == 1),
1327 );
1328 standby.process(&Event::RoleDelete(RoleDelete {
1329 guild_id: Id::new(1),
1330 role_id: Id::new(2),
1331 }));
1332
1333 assert_eq!(
1334 wait.await.unwrap(),
1335 Event::RoleDelete(RoleDelete {
1336 guild_id: Id::new(1),
1337 role_id: Id::new(2),
1338 })
1339 );
1340 assert!(standby.guilds.is_empty());
1341 }
1342
1343 /// Test basic functionality of the [`Standby::wait_for_stream`] method.
1344 #[tokio::test]
1345 async fn test_wait_for_stream() {
1346 let standby = Standby::new();
1347 let mut stream = standby.wait_for_stream(
1348 Id::new(1),
1349 |event: &Event| matches!(event, Event::RoleDelete(e) if e.guild_id.get() == 1),
1350 );
1351 standby.process(&Event::RoleDelete(RoleDelete {
1352 guild_id: Id::new(1),
1353 role_id: Id::new(2),
1354 }));
1355 standby.process(&Event::RoleDelete(RoleDelete {
1356 guild_id: Id::new(1),
1357 role_id: Id::new(3),
1358 }));
1359
1360 assert_eq!(
1361 stream.next().await,
1362 Some(Event::RoleDelete(RoleDelete {
1363 guild_id: Id::new(1),
1364 role_id: Id::new(2)
1365 }))
1366 );
1367 assert_eq!(
1368 stream.next().await,
1369 Some(Event::RoleDelete(RoleDelete {
1370 guild_id: Id::new(1),
1371 role_id: Id::new(3)
1372 }))
1373 );
1374 assert!(!standby.guilds.is_empty());
1375 drop(stream);
1376 standby.process(&Event::RoleDelete(RoleDelete {
1377 guild_id: Id::new(1),
1378 role_id: Id::new(4),
1379 }));
1380 assert!(standby.guilds.is_empty());
1381 }
1382
1383 /// Test basic functionality of the [`Standby::wait_for_event`] method.
1384 #[tokio::test]
1385 async fn test_wait_for_event() {
1386 let ready = Ready {
1387 application: PartialApplication {
1388 flags: ApplicationFlags::empty(),
1389 id: Id::new(1),
1390 },
1391 guilds: Vec::new(),
1392 resume_gateway_url: "wss://gateway.discord.gg".into(),
1393 session_id: String::new(),
1394 shard: Some(ShardId::new(5, 7)),
1395 user: CurrentUser {
1396 accent_color: None,
1397 avatar: None,
1398 banner: None,
1399 bot: false,
1400 discriminator: 1,
1401 email: None,
1402 id: Id::new(1),
1403 mfa_enabled: true,
1404 name: "twilight".to_owned(),
1405 verified: Some(false),
1406 premium_type: None,
1407 public_flags: None,
1408 flags: None,
1409 locale: None,
1410 global_name: None,
1411 },
1412 version: 6,
1413 };
1414 let event = Event::Ready(Box::new(ready));
1415
1416 let standby = Standby::new();
1417 let wait = standby.wait_for_event(|event: &Event| match event {
1418 Event::Ready(ready) => ready.shard.is_some_and(|id| id.number() == 5),
1419 _ => false,
1420 });
1421 assert!(!standby.events.is_empty());
1422 standby.process(&event);
1423
1424 assert_eq!(event, wait.await.unwrap());
1425 assert!(standby.events.is_empty());
1426 }
1427
1428 /// Test basic functionality of the [`Standby::wait_for_event_stream`]
1429 /// method.
1430 #[tokio::test]
1431 async fn test_wait_for_event_stream() {
1432 let standby = Standby::new();
1433 let mut stream =
1434 standby.wait_for_event_stream(|event: &Event| event.kind() == EventType::Resumed);
1435 standby.process(&Event::Resumed);
1436 assert_eq!(stream.next().await, Some(Event::Resumed));
1437 assert!(!standby.events.is_empty());
1438 drop(stream);
1439 standby.process(&Event::Resumed);
1440 assert!(standby.events.is_empty());
1441 }
1442
1443 /// Test basic functionality of the [`Standby::wait_for_message`] method.
1444 #[tokio::test]
1445 async fn test_wait_for_message() {
1446 let message = message();
1447 let event = Event::MessageCreate(Box::new(MessageCreate(message)));
1448
1449 let standby = Standby::new();
1450 let wait = standby.wait_for_message(Id::new(1), |message: &MessageCreate| {
1451 message.author.id.get() == 2
1452 });
1453 standby.process(&event);
1454
1455 assert_eq!(3, wait.await.map(|msg| msg.id.get()).unwrap());
1456 assert!(standby.messages.is_empty());
1457 }
1458
1459 /// Test basic functionality of the [`Standby::wait_for_message_stream`]
1460 /// method.
1461 #[tokio::test]
1462 async fn test_wait_for_message_stream() {
1463 let standby = Standby::new();
1464 let mut stream = standby.wait_for_message_stream(Id::new(1), |_: &MessageCreate| true);
1465 standby.process(&Event::MessageCreate(Box::new(MessageCreate(message()))));
1466 standby.process(&Event::MessageCreate(Box::new(MessageCreate(message()))));
1467
1468 assert!(stream.next().await.is_some());
1469 assert!(stream.next().await.is_some());
1470 drop(stream);
1471 assert_eq!(1, standby.messages.len());
1472 standby.process(&Event::MessageCreate(Box::new(MessageCreate(message()))));
1473 assert!(standby.messages.is_empty());
1474 }
1475
1476 /// Test basic functionality of the [`Standby::wait_for_reaction`] method.
1477 #[tokio::test]
1478 async fn test_wait_for_reaction() {
1479 let event = Event::ReactionAdd(Box::new(ReactionAdd(reaction())));
1480
1481 let standby = Standby::new();
1482 let wait = standby.wait_for_reaction(Id::new(4), |reaction: &ReactionAdd| {
1483 reaction.user_id.get() == 3
1484 });
1485
1486 standby.process(&event);
1487
1488 assert_eq!(
1489 Id::new(3),
1490 wait.await.map(|reaction| reaction.user_id).unwrap()
1491 );
1492 assert!(standby.reactions.is_empty());
1493 }
1494
1495 /// Test basic functionality of the [`Standby::wait_for_reaction_stream`]
1496 /// method.
1497 #[tokio::test]
1498 async fn test_wait_for_reaction_stream() {
1499 let standby = Standby::new();
1500 let mut stream = standby.wait_for_reaction_stream(Id::new(4), |_: &ReactionAdd| true);
1501 standby.process(&Event::ReactionAdd(Box::new(ReactionAdd(reaction()))));
1502 standby.process(&Event::ReactionAdd(Box::new(ReactionAdd(reaction()))));
1503
1504 assert!(stream.next().await.is_some());
1505 assert!(stream.next().await.is_some());
1506 drop(stream);
1507 assert_eq!(1, standby.reactions.len());
1508 standby.process(&Event::ReactionAdd(Box::new(ReactionAdd(reaction()))));
1509 assert!(standby.reactions.is_empty());
1510 }
1511
1512 /// Assert that Standby processing some non-matching events will not affect
1513 /// the matching of a later event.
1514 #[tokio::test]
1515 async fn test_wait_for_component() {
1516 let event = Event::InteractionCreate(Box::new(InteractionCreate(button())));
1517
1518 let standby = Standby::new();
1519 let wait = standby.wait_for_component(Id::new(3), |button: &Interaction| {
1520 button.author_id() == Some(Id::new(2))
1521 });
1522
1523 standby.process(&event);
1524
1525 assert_eq!(
1526 Some(Id::new(2)),
1527 wait.await.map(|button| button.author_id()).unwrap()
1528 );
1529 assert!(standby.components.is_empty());
1530 }
1531
1532 #[tokio::test]
1533 async fn test_wait_for_component_stream() {
1534 let standby = Standby::new();
1535 let mut stream = standby.wait_for_component_stream(Id::new(3), |_: &Interaction| true);
1536 standby.process(&Event::InteractionCreate(Box::new(InteractionCreate(
1537 button(),
1538 ))));
1539 standby.process(&Event::InteractionCreate(Box::new(InteractionCreate(
1540 button(),
1541 ))));
1542
1543 assert!(stream.next().await.is_some());
1544 assert!(stream.next().await.is_some());
1545 drop(stream);
1546 assert_eq!(1, standby.components.len());
1547 standby.process(&Event::InteractionCreate(Box::new(InteractionCreate(
1548 button(),
1549 ))));
1550 assert!(standby.components.is_empty());
1551 }
1552
1553 #[tokio::test]
1554 async fn test_handles_wrong_events() {
1555 let standby = Standby::new();
1556 let wait = standby.wait_for_event(|event: &Event| event.kind() == EventType::Resumed);
1557
1558 standby.process(&Event::GatewayHeartbeatAck);
1559 standby.process(&Event::GatewayHeartbeatAck);
1560 standby.process(&Event::Resumed);
1561
1562 assert_eq!(Event::Resumed, wait.await.unwrap());
1563 }
1564
1565 /// Test that generic event handlers will be given the opportunity to
1566 /// process events with specific handlers (message creates, reaction adds)
1567 /// and guild events. Similarly, guild handlers should be able to process
1568 /// specific handler events as well.
1569 #[tokio::test]
1570 async fn test_process_nonspecific_handling() {
1571 let standby = Standby::new();
1572
1573 // generic event handler gets message creates
1574 let wait = standby.wait_for_event(|event: &Event| event.kind() == EventType::MessageCreate);
1575 standby.process(&Event::MessageCreate(Box::new(MessageCreate(message()))));
1576 assert!(matches!(wait.await, Ok(Event::MessageCreate(_))));
1577
1578 // generic event handler gets reaction adds
1579 let wait = standby.wait_for_event(|event: &Event| event.kind() == EventType::ReactionAdd);
1580 standby.process(&Event::ReactionAdd(Box::new(ReactionAdd(reaction()))));
1581 assert!(matches!(wait.await, Ok(Event::ReactionAdd(_))));
1582
1583 // generic event handler gets other guild events
1584 let wait = standby.wait_for_event(|event: &Event| event.kind() == EventType::RoleDelete);
1585 standby.process(&Event::RoleDelete(RoleDelete {
1586 guild_id: Id::new(1),
1587 role_id: Id::new(2),
1588 }));
1589 assert!(matches!(wait.await, Ok(Event::RoleDelete(_))));
1590
1591 // guild event handler gets message creates or reaction events
1592 let wait = standby.wait_for(Id::new(1), |event: &Event| {
1593 event.kind() == EventType::ReactionAdd
1594 });
1595 standby.process(&Event::ReactionAdd(Box::new(ReactionAdd(reaction()))));
1596 assert!(matches!(wait.await, Ok(Event::ReactionAdd(_))));
1597 }
1598}