twilight_standby/
lib.rs

1#![doc = include_str!("../README.md")]
2#![warn(
3    clippy::missing_const_for_fn,
4    clippy::missing_docs_in_private_items,
5    clippy::pedantic,
6    missing_docs,
7    unsafe_code
8)]
9#![allow(
10    clippy::module_name_repetitions,
11    clippy::must_use_candidate,
12    clippy::unnecessary_wraps
13)]
14
15pub mod future;
16
17use self::future::{
18    WaitForComponentFuture, WaitForComponentStream, WaitForEventFuture, WaitForEventStream,
19    WaitForGuildEventFuture, WaitForGuildEventStream, WaitForMessageFuture, WaitForMessageStream,
20    WaitForReactionFuture, WaitForReactionStream,
21};
22use dashmap::DashMap;
23use std::{
24    fmt::{Debug, Display, Formatter, Result as FmtResult},
25    hash::Hash,
26    sync::atomic::{AtomicU64, Ordering},
27};
28use tokio::sync::{
29    mpsc::{self, UnboundedReceiver, UnboundedSender as MpscSender},
30    oneshot::{self, Receiver, Sender as OneshotSender},
31};
32use twilight_model::{
33    application::interaction::{Interaction, InteractionType},
34    gateway::{
35        event::Event,
36        payload::incoming::{MessageCreate, ReactionAdd},
37    },
38    id::{
39        marker::{ChannelMarker, GuildMarker, MessageMarker},
40        Id,
41    },
42};
43
44/// Map keyed by an ID - such as a channel ID or message ID - storing a list of
45/// bystanders.
46type BystanderMap<K, V> = DashMap<K, Vec<Bystander<V>>>;
47
48/// Sender to a caller that may be for a future bystander or a stream bystander.
49#[derive(Debug)]
50enum Sender<E> {
51    /// Bystander is a future and the sender is a oneshot.
52    Future(OneshotSender<E>),
53    /// Bystander is a stream and the sender is an MPSC.
54    Stream(MpscSender<E>),
55}
56
57impl<E> Sender<E> {
58    /// Whether the channel is closed.
59    fn is_closed(&self) -> bool {
60        match self {
61            Self::Future(sender) => sender.is_closed(),
62            Self::Stream(sender) => sender.is_closed(),
63        }
64    }
65}
66
67/// Registration for a caller to wait for an event based on a predicate
68/// function.
69struct Bystander<T> {
70    /// Predicate check to perform on an event.
71    func: Box<dyn Fn(&T) -> bool + Send + Sync>,
72    /// [`Sender::Future`]s consume themselves once upon sending so the sender
73    /// needs to be able to be taken out separately.
74    sender: Option<Sender<T>>,
75}
76
77impl<T: Debug> Debug for Bystander<T> {
78    fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
79        f.debug_struct("Bystander")
80            .field("func", &"<dyn Fn(&T) -> bool>")
81            .field("sender", &self.sender)
82            .finish()
83    }
84}
85
86/// The `Standby` struct, used by the main event loop to process events and by
87/// tasks to wait for an event.
88///
89/// Refer to the crate-level documentation for more information.
90///
91/// # Using Standby in multiple tasks
92///
93/// To use a Standby instance in multiple tasks, consider wrapping it in an
94/// [`std::sync::Arc`] or [`std::rc::Rc`].
95///
96/// # Examples
97///
98/// ## Timeouts
99///
100/// Futures can be timed out by passing the future returned by Standby to
101/// functions such as [`tokio::time::timeout`]:
102///
103/// ```rust,no_run
104/// # #[tokio::main] async fn main() -> Result<(), Box<dyn std::error::Error>> {
105/// use std::time::Duration;
106/// use twilight_model::gateway::event::{Event, EventType};
107/// use twilight_standby::Standby;
108///
109/// let standby = Standby::new();
110/// let future = standby.wait_for_event(|event: &Event| event.kind() == EventType::Ready);
111/// let event = tokio::time::timeout(Duration::from_secs(1), future).await?;
112/// # Ok(()) }
113/// ```
114///
115/// [`tokio::time::timeout`]: https://docs.rs/tokio/latest/tokio/time/fn.timeout.html
116#[derive(Debug, Default)]
117pub struct Standby {
118    /// List of component bystanders where the ID of the message is known
119    /// beforehand.
120    components: DashMap<Id<MessageMarker>, Vec<Bystander<Interaction>>>,
121    /// Bystanders for any event that may not be in any particular guild.
122    ///
123    /// The key is generated via [`event_counter`].
124    ///
125    /// [`event_counter`]: Self::event_counter
126    events: DashMap<u64, Bystander<Event>>,
127    /// Event counter to be used as the key of [`events`].
128    ///
129    /// [`events`]: Self::events
130    event_counter: AtomicU64,
131    /// List of bystanders where the ID of the guild is known beforehand.
132    guilds: DashMap<Id<GuildMarker>, Vec<Bystander<Event>>>,
133    /// List of message bystanders where the ID of the channel is known
134    /// beforehand.
135    messages: DashMap<Id<ChannelMarker>, Vec<Bystander<MessageCreate>>>,
136    /// List of reaction bystanders where the ID of the message is known
137    /// beforehand.
138    reactions: DashMap<Id<MessageMarker>, Vec<Bystander<ReactionAdd>>>,
139}
140
141impl Standby {
142    /// Create a new instance of `Standby`.
143    ///
144    /// Once a `Standby` has been created it must process gateway events via
145    /// [`process`]. Awaiting an event can start via methods such as
146    /// [`wait_for`] and [`wait_for_message_stream`].
147    ///
148    /// [`process`]: Self::process
149    /// [`wait_for`]: Self::wait_for
150    /// [`wait_for_message_stream`]: Self::wait_for_message_stream
151    #[must_use = "must process events to be useful"]
152    pub fn new() -> Self {
153        Self::default()
154    }
155
156    /// Process an event, calling any bystanders that might be waiting on it.
157    ///
158    /// Returns statistics about matched [`Standby`] calls and how they were
159    /// processed. For example, by using [`ProcessResults::matched`] you can
160    /// determine how many calls were sent an event.
161    ///
162    /// When a bystander checks to see if an event is what it's waiting for, it
163    /// will receive the event by cloning it.
164    ///
165    /// This function must be called when events are received in order for
166    /// futures returned by methods to fulfill.
167    pub fn process(&self, event: &Event) -> ProcessResults {
168        tracing::trace!(event_type = ?event.kind(), ?event, "processing event");
169
170        let mut completions = ProcessResults::new();
171
172        match event {
173            Event::InteractionCreate(e) => {
174                if e.kind == InteractionType::MessageComponent {
175                    if let Some(message) = &e.message {
176                        completions.add_with(&Self::process_specific_event(
177                            &self.components,
178                            message.id,
179                            e,
180                        ));
181                    }
182                }
183            }
184            Event::MessageCreate(e) => {
185                completions.add_with(&Self::process_specific_event(
186                    &self.messages,
187                    e.0.channel_id,
188                    e,
189                ));
190            }
191            Event::ReactionAdd(e) => {
192                completions.add_with(&Self::process_specific_event(
193                    &self.reactions,
194                    e.0.message_id,
195                    e,
196                ));
197            }
198            _ => {}
199        }
200
201        if let Some(guild_id) = event.guild_id() {
202            completions.add_with(&Self::process_specific_event(&self.guilds, guild_id, event));
203        }
204
205        completions.add_with(&Self::process_event(&self.events, event));
206
207        completions
208    }
209
210    /// Wait for an event in a certain guild.
211    ///
212    /// To wait for multiple guild events matching the given predicate use
213    /// [`wait_for_stream`].
214    ///
215    /// # Examples
216    ///
217    /// Wait for a [`BanAdd`] event in guild 123:
218    ///
219    /// ```no_run
220    /// # #[tokio::main]
221    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
222    /// use twilight_model::{
223    ///     gateway::event::{Event, EventType},
224    ///     id::Id,
225    /// };
226    /// use twilight_standby::Standby;
227    ///
228    /// let standby = Standby::new();
229    ///
230    /// let guild_id = Id::new(123);
231    ///
232    /// let reaction = standby
233    ///     .wait_for(guild_id, |event: &Event| event.kind() == EventType::BanAdd)
234    ///     .await?;
235    /// # Ok(()) }
236    /// ```
237    ///
238    /// # Errors
239    ///
240    /// The returned future resolves to a [`Canceled`] error if the associated
241    /// [`Standby`] instance is dropped.
242    ///
243    /// [`BanAdd`]: twilight_model::gateway::payload::incoming::BanAdd
244    /// [`Canceled`]: future::Canceled
245    /// [`wait_for_stream`]: Self::wait_for_stream
246    pub fn wait_for<F: Fn(&Event) -> bool + Send + Sync + 'static>(
247        &self,
248        guild_id: Id<GuildMarker>,
249        check: impl Into<Box<F>>,
250    ) -> WaitForGuildEventFuture {
251        tracing::trace!(%guild_id, "waiting for event in guild");
252
253        WaitForGuildEventFuture {
254            rx: Self::insert_future(&self.guilds, guild_id, check),
255        }
256    }
257
258    /// Wait for a stream of events in a certain guild.
259    ///
260    /// To wait for only one guild event matching the given predicate use
261    /// [`wait_for`].
262    ///
263    /// # Examples
264    ///
265    /// Wait for multiple [`BanAdd`] events in guild 123:
266    ///
267    /// ```no_run
268    /// # #[tokio::main]
269    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
270    /// use tokio_stream::StreamExt;
271    /// use twilight_model::{
272    ///     gateway::event::{Event, EventType},
273    ///     id::Id,
274    /// };
275    /// use twilight_standby::Standby;
276    ///
277    /// let standby = Standby::new();
278    ///
279    /// let guild_id = Id::new(123);
280    ///
281    /// let mut stream =
282    ///     standby.wait_for_stream(guild_id, |event: &Event| event.kind() == EventType::BanAdd);
283    ///
284    /// while let Some(event) = stream.next().await {
285    ///     if let Event::BanAdd(ban) = event {
286    ///         println!("user {} was banned in guild {}", ban.user.id, ban.guild_id);
287    ///     }
288    /// }
289    /// # Ok(()) }
290    /// ```
291    ///
292    /// # Errors
293    ///
294    /// The returned stream ends when the associated [`Standby`] instance is
295    /// dropped.
296    ///
297    /// [`BanAdd`]: twilight_model::gateway::payload::incoming::BanAdd
298    /// [`wait_for`]: Self::wait_for
299    pub fn wait_for_stream<F: Fn(&Event) -> bool + Send + Sync + 'static>(
300        &self,
301        guild_id: Id<GuildMarker>,
302        check: impl Into<Box<F>>,
303    ) -> WaitForGuildEventStream {
304        tracing::trace!(%guild_id, "waiting for event in guild");
305
306        WaitForGuildEventStream {
307            rx: Self::insert_stream(&self.guilds, guild_id, check),
308        }
309    }
310
311    /// Wait for an event not in a certain guild. This must be filtered by an
312    /// event type.
313    ///
314    /// To wait for multiple events matching the given predicate use
315    /// [`wait_for_event_stream`].
316    ///
317    /// # Examples
318    ///
319    /// Wait for a [`Ready`] event for shard 5:
320    ///
321    /// ```no_run
322    /// # #[tokio::main]
323    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
324    /// use twilight_model::gateway::event::{Event, EventType};
325    /// use twilight_standby::Standby;
326    ///
327    /// let standby = Standby::new();
328    ///
329    /// let ready = standby
330    ///     .wait_for_event(|event: &Event| {
331    ///         if let Event::Ready(ready) = event {
332    ///             ready.shard.map_or(false, |id| id.number() == 5)
333    ///         } else {
334    ///             false
335    ///         }
336    ///     })
337    ///     .await?;
338    /// # Ok(()) }
339    /// ```
340    ///
341    /// # Errors
342    ///
343    /// The returned future resolves to a [`Canceled`] error if the associated
344    /// [`Standby`] instance is dropped.
345    ///
346    /// [`Canceled`]: future::Canceled
347    /// [`Ready`]: twilight_model::gateway::payload::incoming::Ready
348    /// [`wait_for_event_stream`]: Self::wait_for_event_stream
349    pub fn wait_for_event<F: Fn(&Event) -> bool + Send + Sync + 'static>(
350        &self,
351        check: impl Into<Box<F>>,
352    ) -> WaitForEventFuture {
353        tracing::trace!("waiting for event");
354
355        let (tx, rx) = oneshot::channel();
356
357        self.events.insert(
358            self.next_event_id(),
359            Bystander {
360                func: check.into(),
361                sender: Some(Sender::Future(tx)),
362            },
363        );
364
365        WaitForEventFuture { rx }
366    }
367
368    /// Wait for a stream of events not in a certain guild. This must be
369    /// filtered by an event type.
370    ///
371    /// To wait for only one event matching the given predicate use
372    /// [`wait_for_event`].
373    ///
374    /// # Examples
375    ///
376    /// Wait for multiple [`Ready`] events on shard 5:
377    ///
378    /// ```no_run
379    /// # #[tokio::main]
380    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
381    /// use tokio_stream::StreamExt;
382    /// use twilight_model::gateway::event::{Event, EventType};
383    /// use twilight_standby::Standby;
384    ///
385    /// let standby = Standby::new();
386    ///
387    /// let mut events = standby.wait_for_event_stream(|event: &Event| {
388    ///     if let Event::Ready(ready) = event {
389    ///         ready.shard.map_or(false, |id| id.number() == 5)
390    ///     } else {
391    ///         false
392    ///     }
393    /// });
394    ///
395    /// while let Some(event) = events.next().await {
396    ///     println!("got event with type {:?}", event.kind());
397    /// }
398    /// # Ok(()) }
399    /// ```
400    ///
401    /// # Errors
402    ///
403    /// The returned stream ends when the associated [`Standby`] instance is
404    /// dropped.
405    ///
406    /// [`Ready`]: twilight_model::gateway::payload::incoming::Ready
407    /// [`wait_for_event`]: Self::wait_for_event
408    pub fn wait_for_event_stream<F: Fn(&Event) -> bool + Send + Sync + 'static>(
409        &self,
410        check: impl Into<Box<F>>,
411    ) -> WaitForEventStream {
412        tracing::trace!("waiting for event");
413
414        let (tx, rx) = mpsc::unbounded_channel();
415
416        self.events.insert(
417            self.next_event_id(),
418            Bystander {
419                func: check.into(),
420                sender: Some(Sender::Stream(tx)),
421            },
422        );
423
424        WaitForEventStream { rx }
425    }
426
427    /// Wait for a message in a certain channel.
428    ///
429    /// To wait for multiple messages matching the given predicate use
430    /// [`wait_for_message_stream`].
431    ///
432    /// # Examples
433    ///
434    /// Wait for a message in channel 123 by user 456 with the content "test":
435    ///
436    /// ```no_run
437    /// # #[tokio::main]
438    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
439    /// use twilight_model::{gateway::payload::incoming::MessageCreate, id::Id};
440    /// use twilight_standby::Standby;
441    ///
442    /// let standby = Standby::new();
443    ///
444    /// let author_id = Id::new(456);
445    /// let channel_id = Id::new(123);
446    ///
447    /// let message = standby
448    ///     .wait_for_message(channel_id, move |event: &MessageCreate| {
449    ///         event.author.id == author_id && event.content == "test"
450    ///     })
451    ///     .await?;
452    /// # Ok(()) }
453    /// ```
454    ///
455    /// # Errors
456    ///
457    /// The returned future resolves to a [`Canceled`] error if the associated
458    /// [`Standby`] instance is dropped.
459    ///
460    /// [`Canceled`]: future::Canceled
461    /// [`wait_for_message_stream`]: Self::wait_for_message_stream
462    pub fn wait_for_message<F: Fn(&MessageCreate) -> bool + Send + Sync + 'static>(
463        &self,
464        channel_id: Id<ChannelMarker>,
465        check: impl Into<Box<F>>,
466    ) -> WaitForMessageFuture {
467        tracing::trace!(%channel_id, "waiting for message in channel");
468
469        WaitForMessageFuture {
470            rx: Self::insert_future(&self.messages, channel_id, check),
471        }
472    }
473
474    /// Wait for a stream of message in a certain channel.
475    ///
476    /// To wait for only one message matching the given predicate use
477    /// [`wait_for_message`].
478    ///
479    /// # Examples
480    ///
481    /// Wait for multiple messages in channel 123 by user 456 with the content
482    /// "test":
483    ///
484    /// ```no_run
485    /// # #[tokio::main]
486    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
487    /// use tokio_stream::StreamExt;
488    /// use twilight_model::{gateway::payload::incoming::MessageCreate, id::Id};
489    /// use twilight_standby::Standby;
490    ///
491    /// let standby = Standby::new();
492    ///
493    /// let author_id = Id::new(456);
494    /// let channel_id = Id::new(123);
495    ///
496    /// let mut messages = standby.wait_for_message_stream(channel_id, move |event: &MessageCreate| {
497    ///     event.author.id == author_id && event.content == "test"
498    /// });
499    ///
500    /// while let Some(message) = messages.next().await {
501    ///     println!("got message by {}", message.author.id);
502    /// }
503    /// # Ok(()) }
504    /// ```
505    ///
506    /// # Errors
507    ///
508    /// The returned stream ends when the associated [`Standby`] instance is
509    /// dropped.
510    ///
511    /// [`wait_for_message`]: Self::wait_for_message
512    pub fn wait_for_message_stream<F: Fn(&MessageCreate) -> bool + Send + Sync + 'static>(
513        &self,
514        channel_id: Id<ChannelMarker>,
515        check: impl Into<Box<F>>,
516    ) -> WaitForMessageStream {
517        tracing::trace!(%channel_id, "waiting for message in channel");
518
519        WaitForMessageStream {
520            rx: Self::insert_stream(&self.messages, channel_id, check),
521        }
522    }
523
524    /// Wait for a reaction on a certain message.
525    ///
526    /// To wait for multiple reactions matching the given predicate use
527    /// [`wait_for_reaction_stream`].
528    ///
529    /// # Examples
530    ///
531    /// Wait for a reaction on message 123 by user 456:
532    ///
533    /// ```no_run
534    /// # #[tokio::main]
535    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
536    /// use twilight_model::{gateway::payload::incoming::ReactionAdd, id::Id};
537    /// use twilight_standby::Standby;
538    ///
539    /// let standby = Standby::new();
540    ///
541    /// let message_id = Id::new(123);
542    /// let user_id = Id::new(456);
543    ///
544    /// let reaction = standby
545    ///     .wait_for_reaction(message_id, move |event: &ReactionAdd| {
546    ///         event.user_id == user_id
547    ///     })
548    ///     .await?;
549    /// # Ok(()) }
550    /// ```
551    ///
552    /// # Errors
553    ///
554    /// The returned future resolves to a [`Canceled`] error if the associated
555    /// [`Standby`] instance is dropped.
556    ///
557    /// [`Canceled`]: future::Canceled
558    /// [`wait_for_reaction_stream`]: Self::wait_for_reaction_stream
559    pub fn wait_for_reaction<F: Fn(&ReactionAdd) -> bool + Send + Sync + 'static>(
560        &self,
561        message_id: Id<MessageMarker>,
562        check: impl Into<Box<F>>,
563    ) -> WaitForReactionFuture {
564        tracing::trace!(%message_id, "waiting for reaction on message");
565
566        WaitForReactionFuture {
567            rx: Self::insert_future(&self.reactions, message_id, check),
568        }
569    }
570
571    /// Wait for a stream of reactions on a certain message.
572    ///
573    /// To wait for only one reaction matching the given predicate use
574    /// [`wait_for_reaction`].
575    ///
576    /// # Examples
577    ///
578    /// Wait for multiple reactions on message 123 with unicode reaction "🤠":
579    ///
580    /// ```no_run
581    /// # #[tokio::main]
582    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
583    /// use tokio_stream::StreamExt;
584    /// use twilight_model::{
585    ///     channel::message::EmojiReactionType,
586    ///     gateway::payload::incoming::ReactionAdd,
587    ///     id::Id,
588    /// };
589    /// use twilight_standby::Standby;
590    ///
591    /// let standby = Standby::new();
592    ///
593    /// let message_id = Id::new(123);
594    ///
595    /// let mut reactions = standby.wait_for_reaction_stream(message_id, |event: &ReactionAdd| {
596    ///     matches!(&event.emoji, EmojiReactionType::Unicode { name } if name == "🤠")
597    /// });
598    ///
599    /// while let Some(reaction) = reactions.next().await {
600    ///     println!("got a reaction by {}", reaction.user_id);
601    /// }
602    /// # Ok(()) }
603    /// ```
604    ///
605    /// # Errors
606    ///
607    /// The returned stream ends when the associated [`Standby`] instance is
608    /// dropped.
609    ///
610    /// [`wait_for_reaction`]: Self::wait_for_reaction
611    pub fn wait_for_reaction_stream<F: Fn(&ReactionAdd) -> bool + Send + Sync + 'static>(
612        &self,
613        message_id: Id<MessageMarker>,
614        check: impl Into<Box<F>>,
615    ) -> WaitForReactionStream {
616        tracing::trace!(%message_id, "waiting for reaction on message");
617
618        WaitForReactionStream {
619            rx: Self::insert_stream(&self.reactions, message_id, check),
620        }
621    }
622
623    /// Wait for a component on a certain message.
624    ///
625    /// Returns a `Canceled` error if the `Standby` struct was dropped.
626    ///
627    /// If you need to wait for multiple components matching the given predicate,
628    /// use [`wait_for_component_stream`].
629    ///
630    /// # Examples
631    ///
632    /// Wait for a component on message 123 by user 456:
633    ///
634    /// ```no_run
635    /// # #[tokio::main] async fn main() -> Result<(), Box<dyn std::error::Error>> {
636    /// use twilight_model::{application::interaction::Interaction, id::Id};
637    /// use twilight_standby::Standby;
638    ///
639    /// let standby = Standby::new();
640    /// let message_id = Id::new(123);
641    ///
642    /// let component = standby
643    ///     .wait_for_component(message_id, |event: &Interaction| {
644    ///         event.author_id() == Some(Id::new(456))
645    ///     })
646    ///     .await?;
647    /// # Ok(()) }
648    /// ```
649    ///
650    /// [`wait_for_component_stream`]: Self::wait_for_component_stream
651    pub fn wait_for_component<F: Fn(&Interaction) -> bool + Send + Sync + 'static>(
652        &self,
653        message_id: Id<MessageMarker>,
654        check: impl Into<Box<F>>,
655    ) -> WaitForComponentFuture {
656        tracing::trace!(%message_id, "waiting for component on message");
657
658        WaitForComponentFuture {
659            rx: Self::insert_future(&self.components, message_id, check),
660        }
661    }
662
663    /// Wait for a stream of components on a certain message.
664    ///
665    /// Returns a `Canceled` error if the `Standby` struct was dropped.
666    ///
667    /// If you need to wait for only one component matching the given predicate,
668    /// use [`wait_for_component`].
669    ///
670    /// # Examples
671    ///
672    /// Wait for multiple button components on message 123 with a `custom_id` of
673    /// "Click":
674    ///
675    /// ```no_run
676    /// # #[tokio::main] async fn main() -> Result<(), Box<dyn std::error::Error>> {
677    /// use tokio_stream::StreamExt;
678    /// use twilight_model::{
679    ///     application::interaction::{Interaction, InteractionData},
680    ///     id::Id,
681    /// };
682    /// use twilight_standby::Standby;
683    ///
684    /// let standby = Standby::new();
685    /// let message_id = Id::new(123);
686    ///
687    /// let mut components = standby.wait_for_component_stream(message_id, |event: &Interaction| {
688    ///     if let Some(InteractionData::MessageComponent(data)) = &event.data {
689    ///         data.custom_id == "Click".to_string()
690    ///     } else {
691    ///         false
692    ///     }
693    /// });
694    ///
695    /// while let Some(component) = components.next().await {
696    ///     println!("got a component by {}", component.author_id().unwrap());
697    /// }
698    /// # Ok(()) }
699    /// ```
700    ///
701    /// [`wait_for_component`]: Self::wait_for_component
702    pub fn wait_for_component_stream<F: Fn(&Interaction) -> bool + Send + Sync + 'static>(
703        &self,
704        message_id: Id<MessageMarker>,
705        check: impl Into<Box<F>>,
706    ) -> WaitForComponentStream {
707        tracing::trace!(%message_id, "waiting for component on message");
708
709        WaitForComponentStream {
710            rx: Self::insert_stream(&self.components, message_id, check),
711        }
712    }
713
714    /// Next event ID in [`Standby::event_counter`].
715    fn next_event_id(&self) -> u64 {
716        self.event_counter.fetch_add(1, Ordering::SeqCst)
717    }
718
719    /// Append a new future bystander into a map according to the ID.
720    fn insert_future<F: Fn(&V) -> bool + Send + Sync + 'static, K: Eq + Hash, V>(
721        map: &BystanderMap<K, V>,
722        id: K,
723        check: impl Into<Box<F>>,
724    ) -> Receiver<V> {
725        let (tx, rx) = oneshot::channel();
726
727        let mut entry = map.entry(id).or_default();
728        entry.push(Bystander {
729            func: check.into(),
730            sender: Some(Sender::Future(tx)),
731        });
732
733        rx
734    }
735
736    /// Append a new stream bystander into a map according to the ID.
737    fn insert_stream<F: Fn(&V) -> bool + Send + Sync + 'static, K: Eq + Hash, V>(
738        map: &BystanderMap<K, V>,
739        id: K,
740        check: impl Into<Box<F>>,
741    ) -> UnboundedReceiver<V> {
742        let (tx, rx) = mpsc::unbounded_channel();
743
744        let mut entry = map.entry(id).or_default();
745        entry.push(Bystander {
746            func: check.into(),
747            sender: Some(Sender::Stream(tx)),
748        });
749
750        rx
751    }
752
753    /// Process a general event that is not of any particular type or in any
754    /// particular guild.
755    #[tracing::instrument(level = "trace")]
756    fn process_event<K: Debug + Display + Eq + Hash + PartialEq + 'static, V: Clone + Debug>(
757        map: &DashMap<K, Bystander<V>>,
758        event: &V,
759    ) -> ProcessResults {
760        tracing::trace!(?event, "processing event");
761
762        let mut results = ProcessResults::new();
763
764        map.retain(|id, bystander| {
765            let result = Self::bystander_process(bystander, event);
766            results.handle(result);
767
768            tracing::trace!(bystander_id = %id, ?result, "event bystander processed");
769
770            // We want to retain bystanders that are *incomplete* and remove
771            // bystanders that are *complete*.
772            !result.is_complete()
773        });
774
775        results
776    }
777
778    /// Process a general event that is either of a particular type or in a
779    /// particular guild.
780    #[allow(clippy::needless_pass_by_value)]
781    #[tracing::instrument(level = "trace")]
782    fn process_specific_event<
783        K: Debug + Display + Eq + Hash + PartialEq + 'static,
784        V: Clone + Debug,
785    >(
786        map: &DashMap<K, Vec<Bystander<V>>>,
787        guild_id: K,
788        event: &V,
789    ) -> ProcessResults {
790        // Iterate over a guild's bystanders and mark it for removal if there
791        // are no bystanders remaining.
792        let (remove_guild, results) = if let Some(mut bystanders) = map.get_mut(&guild_id) {
793            let results = Self::bystander_iter(&mut bystanders, event);
794
795            (bystanders.is_empty(), results)
796        } else {
797            tracing::trace!(%guild_id, "guild has no event bystanders");
798
799            return ProcessResults::new();
800        };
801
802        if remove_guild {
803            tracing::trace!(%guild_id, "removing guild from map");
804
805            map.remove(&guild_id);
806        }
807
808        results
809    }
810
811    /// Iterate over bystanders and remove the ones that match the predicate.
812    #[tracing::instrument(level = "trace")]
813    fn bystander_iter<E: Clone + Debug>(
814        bystanders: &mut Vec<Bystander<E>>,
815        event: &E,
816    ) -> ProcessResults {
817        tracing::trace!(?bystanders, "iterating over bystanders");
818
819        // Iterate over the list of bystanders by using an index and manually
820        // indexing in to the list.
821        //
822        // # Logic
823        //
824        // In each iteration we decide whether to retain a bystander: if we do
825        // then we can increment our index and move on, but if we opt to instead
826        // remove it then we do so and don't increment the index. The reason we
827        // don't increment the index is because when we remove an element the
828        // index does not become empty and instead everything to the right is
829        // shifted to the left, illustrated as such:
830        //
831        //     |---|
832        //     v   |
833        // A - B - C - D
834        //     |   ^   |
835        //     |   |---|
836        //     |
837        //  Remove B
838        //
839        // After: A - C - D
840        //
841        // # Reasons not to use alternatives
842        //
843        // **`Vec::retain`** we need to mutate the entries in order to take out
844        // the sender and `Vec::retain` only gives us immutable references.
845        //
846        // A form of enumeration can't be used because sometimes the index
847        // doesn't advance; iterators would continue to provide incrementing
848        // enumeration indexes while we sometimes want to reuse an index.
849        let mut index = 0;
850        let mut results = ProcessResults::new();
851
852        while index < bystanders.len() {
853            tracing::trace!(%index, "checking bystander");
854
855            let status = Self::bystander_process(&mut bystanders[index], event);
856            results.handle(status);
857
858            tracing::trace!(%index, ?status, "checked bystander");
859
860            if status.is_complete() {
861                bystanders.remove(index);
862            } else {
863                index += 1;
864            }
865        }
866
867        results
868    }
869
870    /// Process a bystander, sending the event if the sender is active and the
871    /// predicate matches. Returns whether the bystander has fulfilled.
872    ///
873    /// Returns whether the bystander is fulfilled; if the bystander has been
874    /// fulfilled then the channel is now closed.
875    #[tracing::instrument(level = "trace")]
876    fn bystander_process<T: Clone + Debug>(
877        bystander: &mut Bystander<T>,
878        event: &T,
879    ) -> ProcessStatus {
880        // We need to take the sender out because `OneshotSender`s consume
881        // themselves when calling `OneshotSender::send`.
882        let Some(sender) = bystander.sender.take() else {
883            tracing::trace!("bystander has no sender, indicating for removal");
884
885            return ProcessStatus::AlreadyComplete;
886        };
887
888        // The channel may have closed due to the receiver dropping their end,
889        // in which case we can say we're done.
890        if sender.is_closed() {
891            tracing::trace!("bystander's rx dropped, indicating for removal");
892
893            return ProcessStatus::Dropped;
894        }
895
896        // Lastly check to see if the predicate matches the event. If it doesn't
897        // then we can short-circuit.
898        if !(bystander.func)(event) {
899            tracing::trace!("bystander check doesn't match, not removing");
900
901            // Put the sender back into its bystander since we'll still need it
902            // next time around.
903            bystander.sender.replace(sender);
904
905            return ProcessStatus::Skip;
906        }
907
908        match sender {
909            Sender::Future(tx) => {
910                // We don't care if the event successfully sends or not since
911                // we're going to be tossing out the bystander anyway.
912                drop(tx.send(event.clone()));
913
914                tracing::trace!("bystander matched event, indicating for removal");
915
916                ProcessStatus::SentFuture
917            }
918            Sender::Stream(tx) => {
919                // If we can send an event to the receiver and the channel is
920                // still open then we need to retain the bystander, otherwise we
921                // need to mark it for removal.
922                if tx.send(event.clone()).is_ok() {
923                    tracing::trace!("bystander is a stream, retaining in map");
924
925                    bystander.sender.replace(Sender::Stream(tx));
926
927                    ProcessStatus::SentStream
928                } else {
929                    ProcessStatus::Dropped
930                }
931            }
932        }
933    }
934}
935/// Number of [`Standby`] calls that were completed.
936#[derive(Clone, Debug, Eq, Hash, PartialEq)]
937pub struct ProcessResults {
938    /// Number of bystanders that were dropped due to the receiving end
939    /// dropping.
940    dropped: usize,
941    /// Number of future bystanders that were open and were sent an event.
942    fulfilled: usize,
943    /// Number of stream bystanders that were open and were sent an event.
944    sent: usize,
945}
946
947impl ProcessResults {
948    /// Create a new set of zeroed out results.
949    const fn new() -> Self {
950        Self {
951            dropped: 0,
952            fulfilled: 0,
953            sent: 0,
954        }
955    }
956
957    /// Number of [`Standby`] calls where the receiver had already dropped their
958    /// end.
959    ///
960    /// This may happen when a caller calls into [`Standby`] but then times out
961    /// or otherwise cancels their request.
962    pub const fn dropped(&self) -> usize {
963        self.dropped
964    }
965
966    /// Number of [`Standby`] calls that were fulfilled.
967    ///
968    /// Calls for futures via methods such as [`Standby::wait_for`] will be
969    /// marked as fulfilled once matched and an event is sent over the channel.
970    ///
971    /// **Caveat**: although an event has been sent over the channel to the
972    /// receiver it is not guaranteed whether the receiver end actually received
973    /// it; the receiver end may drop shortly after an event is sent. In this
974    /// case the call is considered to be fulfilled.
975    pub const fn fulfilled(&self) -> usize {
976        self.fulfilled
977    }
978
979    /// Number of calls that were matched and were sent an event.
980    ///
981    /// This is the sum of [`fulfilled`] and [`sent`].
982    ///
983    /// See the caveats for both methods.
984    ///
985    /// [`fulfilled`]: Self::fulfilled
986    /// [`sent`]: Self::sent
987    pub const fn matched(&self) -> usize {
988        self.fulfilled() + self.sent()
989    }
990
991    /// Number of [`Standby`] streaming calls that were matched and had an event
992    /// sent to them.
993    ///
994    /// **Caveat**: although an event has been sent over the channel to the
995    /// receiver it is not guaranteed whether the receiver end actually received
996    /// it; the receiver end may drop shortly after an event is sent. In this
997    /// case the call is considered to be sent. Checks over this call will in
998    /// the future be considered [`dropped`].
999    ///
1000    /// [`dropped`]: Self::dropped
1001    pub const fn sent(&self) -> usize {
1002        self.sent
1003    }
1004
1005    /// Add another set of results to this set.
1006    fn add_with(&mut self, other: &Self) {
1007        self.dropped = self.dropped.saturating_add(other.dropped);
1008        self.fulfilled = self.fulfilled.saturating_add(other.fulfilled);
1009        self.sent = self.sent.saturating_add(other.sent);
1010    }
1011
1012    /// Handle a process status.
1013    fn handle(&mut self, status: ProcessStatus) {
1014        match status {
1015            ProcessStatus::Dropped => {
1016                self.dropped += 1;
1017            }
1018            ProcessStatus::SentFuture => {
1019                self.fulfilled += 1;
1020            }
1021            ProcessStatus::SentStream => {
1022                self.sent += 1;
1023            }
1024            ProcessStatus::AlreadyComplete | ProcessStatus::Skip => {}
1025        }
1026    }
1027}
1028
1029/// Status result of processing a bystander via [`Standby::bystander_process`].
1030#[derive(Clone, Copy, Debug)]
1031enum ProcessStatus {
1032    /// Call matched but already matched previously and was not removed, so the
1033    /// subject must be removed and not counted towards results.
1034    AlreadyComplete,
1035    /// Call matched but the receiver dropped their end.
1036    Dropped,
1037    /// Call matched a oneshot.
1038    SentFuture,
1039    /// Call matched a stream.
1040    SentStream,
1041    /// Call was not matched.
1042    Skip,
1043}
1044
1045impl ProcessStatus {
1046    /// Whether the call is complete.
1047    const fn is_complete(self) -> bool {
1048        matches!(
1049            self,
1050            Self::AlreadyComplete | Self::Dropped | Self::SentFuture
1051        )
1052    }
1053}
1054
1055#[cfg(test)]
1056mod tests {
1057    #![allow(clippy::non_ascii_literal)]
1058
1059    use crate::Standby;
1060    use static_assertions::assert_impl_all;
1061    use std::fmt::Debug;
1062    use tokio_stream::StreamExt;
1063    use twilight_gateway::{Event, EventType};
1064    use twilight_model::{
1065        application::interaction::{
1066            message_component::MessageComponentInteractionData, Interaction, InteractionData,
1067            InteractionType,
1068        },
1069        channel::{
1070            message::{component::ComponentType, EmojiReactionType, Message, MessageType},
1071            Channel, ChannelType,
1072        },
1073        gateway::{
1074            payload::incoming::{InteractionCreate, MessageCreate, ReactionAdd, Ready, RoleDelete},
1075            GatewayReaction, ShardId,
1076        },
1077        guild::Permissions,
1078        id::{marker::GuildMarker, Id},
1079        oauth::{ApplicationFlags, ApplicationIntegrationMap, PartialApplication},
1080        user::{CurrentUser, User},
1081        util::Timestamp,
1082    };
1083
1084    assert_impl_all!(Standby: Debug, Default, Send, Sync);
1085
1086    #[allow(deprecated)]
1087    fn message() -> Message {
1088        Message {
1089            activity: None,
1090            application: None,
1091            application_id: None,
1092            attachments: Vec::new(),
1093            author: User {
1094                accent_color: None,
1095                avatar: None,
1096                avatar_decoration: None,
1097                avatar_decoration_data: None,
1098                banner: None,
1099                bot: false,
1100                discriminator: 1,
1101                email: None,
1102                flags: None,
1103                global_name: Some("test".to_owned()),
1104                id: Id::new(2),
1105                locale: None,
1106                mfa_enabled: None,
1107                name: "twilight".to_owned(),
1108                premium_type: None,
1109                public_flags: None,
1110                system: None,
1111                verified: None,
1112            },
1113            call: None,
1114            channel_id: Id::new(1),
1115            components: Vec::new(),
1116            content: "test".to_owned(),
1117            edited_timestamp: None,
1118            embeds: Vec::new(),
1119            flags: None,
1120            guild_id: Some(Id::new(4)),
1121            id: Id::new(3),
1122            interaction: None,
1123            interaction_metadata: None,
1124            kind: MessageType::Regular,
1125            member: None,
1126            mention_channels: Vec::new(),
1127            mention_everyone: false,
1128            mention_roles: Vec::new(),
1129            mentions: Vec::new(),
1130            message_snapshots: Vec::new(),
1131            pinned: false,
1132            poll: None,
1133            reactions: Vec::new(),
1134            reference: None,
1135            referenced_message: None,
1136            role_subscription_data: None,
1137            sticker_items: Vec::new(),
1138            timestamp: Timestamp::from_secs(1_632_072_645).expect("non zero"),
1139            thread: None,
1140            tts: false,
1141            webhook_id: None,
1142        }
1143    }
1144
1145    fn reaction() -> GatewayReaction {
1146        GatewayReaction {
1147            burst: false,
1148            burst_colors: Vec::new(),
1149            channel_id: Id::new(2),
1150            emoji: EmojiReactionType::Unicode {
1151                name: "🍎".to_owned(),
1152            },
1153            guild_id: Some(Id::new(1)),
1154            member: None,
1155            message_author_id: None,
1156            message_id: Id::new(4),
1157            user_id: Id::new(3),
1158        }
1159    }
1160
1161    #[allow(deprecated)]
1162    fn button() -> Interaction {
1163        Interaction {
1164            app_permissions: Some(Permissions::SEND_MESSAGES),
1165            application_id: Id::new(1),
1166            authorizing_integration_owners: ApplicationIntegrationMap {
1167                guild: None,
1168                user: None,
1169            },
1170            channel: Some(Channel {
1171                bitrate: None,
1172                guild_id: None,
1173                id: Id::new(400),
1174                kind: ChannelType::GuildText,
1175                last_message_id: None,
1176                last_pin_timestamp: None,
1177                name: None,
1178                nsfw: None,
1179                owner_id: None,
1180                parent_id: None,
1181                permission_overwrites: None,
1182                position: None,
1183                rate_limit_per_user: None,
1184                recipients: None,
1185                rtc_region: None,
1186                topic: None,
1187                user_limit: None,
1188                application_id: None,
1189                applied_tags: None,
1190                available_tags: None,
1191                default_auto_archive_duration: None,
1192                default_forum_layout: None,
1193                default_reaction_emoji: None,
1194                default_sort_order: None,
1195                default_thread_rate_limit_per_user: None,
1196                flags: None,
1197                icon: None,
1198                invitable: None,
1199                managed: None,
1200                member: None,
1201                member_count: None,
1202                message_count: None,
1203                newly_created: None,
1204                thread_metadata: None,
1205                video_quality_mode: None,
1206            }),
1207            channel_id: None,
1208            context: None,
1209            data: Some(InteractionData::MessageComponent(Box::new(
1210                MessageComponentInteractionData {
1211                    custom_id: String::from("Click"),
1212                    component_type: ComponentType::Button,
1213                    resolved: None,
1214                    values: Vec::new(),
1215                },
1216            ))),
1217            entitlements: Vec::new(),
1218            guild: None,
1219            guild_id: Some(Id::new(3)),
1220            guild_locale: None,
1221            id: Id::new(4),
1222            kind: InteractionType::MessageComponent,
1223            locale: Some("en-GB".to_owned()),
1224            member: None,
1225            message: Some(message()),
1226            token: String::from("token"),
1227            user: Some(User {
1228                accent_color: None,
1229                avatar: None,
1230                avatar_decoration: None,
1231                avatar_decoration_data: None,
1232                banner: None,
1233                bot: false,
1234                discriminator: 1,
1235                email: None,
1236                flags: None,
1237                global_name: Some("test".to_owned()),
1238                id: Id::new(2),
1239                locale: None,
1240                mfa_enabled: None,
1241                name: "twilight".to_owned(),
1242                premium_type: None,
1243                public_flags: None,
1244                system: None,
1245                verified: None,
1246            }),
1247        }
1248    }
1249
1250    /// Test that if a receiver drops their end, the result properly counts the
1251    /// statistic.
1252    #[tokio::test]
1253    async fn test_dropped() {
1254        let standby = Standby::new();
1255        let guild_id = Id::new(1);
1256
1257        {
1258            let _rx = standby.wait_for(guild_id, move |_: &Event| false);
1259        }
1260
1261        let results = standby.process(&Event::RoleDelete(RoleDelete {
1262            guild_id,
1263            role_id: Id::new(2),
1264        }));
1265
1266        assert_eq!(1, results.dropped());
1267        assert_eq!(0, results.fulfilled());
1268        assert_eq!(0, results.sent());
1269    }
1270
1271    /// Test that both events in guild 1 is matched but the event in guild 2 is
1272    /// not matched by testing the returned matched amount.
1273    #[tokio::test]
1274    async fn test_matched() {
1275        fn check(event: &Event, guild_id: Id<GuildMarker>) -> bool {
1276            matches!(event, Event::RoleDelete(e) if e.guild_id == guild_id)
1277        }
1278
1279        let standby = Standby::new();
1280        let guild_id_one = Id::new(1);
1281        let guild_id_two = Id::new(2);
1282        let _one = standby.wait_for(guild_id_one, move |event: &Event| {
1283            check(event, guild_id_one)
1284        });
1285        let _two = standby.wait_for(guild_id_one, move |event: &Event| {
1286            check(event, guild_id_one)
1287        });
1288        let _three = standby.wait_for(guild_id_two, move |event: &Event| {
1289            check(event, guild_id_two)
1290        });
1291
1292        let results = standby.process(&Event::RoleDelete(RoleDelete {
1293            guild_id: Id::new(1),
1294            role_id: Id::new(2),
1295        }));
1296
1297        assert_eq!(0, results.dropped());
1298        assert_eq!(2, results.fulfilled());
1299        assert_eq!(0, results.sent());
1300    }
1301
1302    /// Test that the [`ProcessResults::sent`] counter increments if a match is
1303    /// sent to it.
1304    #[tokio::test]
1305    async fn test_sent() {
1306        let standby = Standby::new();
1307        let guild_id = Id::new(1);
1308
1309        let _rx = standby.wait_for_stream(guild_id, move |_: &Event| true);
1310
1311        let results = standby.process(&Event::RoleDelete(RoleDelete {
1312            guild_id,
1313            role_id: Id::new(2),
1314        }));
1315
1316        assert_eq!(0, results.dropped());
1317        assert_eq!(0, results.fulfilled());
1318        assert_eq!(1, results.sent());
1319    }
1320
1321    /// Test basic functionality of the [`Standby::wait_for`] method.
1322    #[tokio::test]
1323    async fn test_wait_for() {
1324        let standby = Standby::new();
1325        let wait = standby.wait_for(
1326            Id::new(1),
1327            |event: &Event| matches!(event, Event::RoleDelete(e) if e.guild_id.get() == 1),
1328        );
1329        standby.process(&Event::RoleDelete(RoleDelete {
1330            guild_id: Id::new(1),
1331            role_id: Id::new(2),
1332        }));
1333
1334        assert_eq!(
1335            wait.await.unwrap(),
1336            Event::RoleDelete(RoleDelete {
1337                guild_id: Id::new(1),
1338                role_id: Id::new(2),
1339            })
1340        );
1341        assert!(standby.guilds.is_empty());
1342    }
1343
1344    /// Test basic functionality of the [`Standby::wait_for_stream`] method.
1345    #[tokio::test]
1346    async fn test_wait_for_stream() {
1347        let standby = Standby::new();
1348        let mut stream = standby.wait_for_stream(
1349            Id::new(1),
1350            |event: &Event| matches!(event, Event::RoleDelete(e) if e.guild_id.get() == 1),
1351        );
1352        standby.process(&Event::RoleDelete(RoleDelete {
1353            guild_id: Id::new(1),
1354            role_id: Id::new(2),
1355        }));
1356        standby.process(&Event::RoleDelete(RoleDelete {
1357            guild_id: Id::new(1),
1358            role_id: Id::new(3),
1359        }));
1360
1361        assert_eq!(
1362            stream.next().await,
1363            Some(Event::RoleDelete(RoleDelete {
1364                guild_id: Id::new(1),
1365                role_id: Id::new(2)
1366            }))
1367        );
1368        assert_eq!(
1369            stream.next().await,
1370            Some(Event::RoleDelete(RoleDelete {
1371                guild_id: Id::new(1),
1372                role_id: Id::new(3)
1373            }))
1374        );
1375        assert!(!standby.guilds.is_empty());
1376        drop(stream);
1377        standby.process(&Event::RoleDelete(RoleDelete {
1378            guild_id: Id::new(1),
1379            role_id: Id::new(4),
1380        }));
1381        assert!(standby.guilds.is_empty());
1382    }
1383
1384    /// Test basic functionality of the [`Standby::wait_for_event`] method.
1385    #[tokio::test]
1386    async fn test_wait_for_event() {
1387        let ready = Ready {
1388            application: PartialApplication {
1389                flags: ApplicationFlags::empty(),
1390                id: Id::new(1),
1391            },
1392            guilds: Vec::new(),
1393            resume_gateway_url: "wss://gateway.discord.gg".into(),
1394            session_id: String::new(),
1395            shard: Some(ShardId::new(5, 7)),
1396            user: CurrentUser {
1397                accent_color: None,
1398                avatar: None,
1399                banner: None,
1400                bot: false,
1401                discriminator: 1,
1402                email: None,
1403                id: Id::new(1),
1404                mfa_enabled: true,
1405                name: "twilight".to_owned(),
1406                verified: Some(false),
1407                premium_type: None,
1408                public_flags: None,
1409                flags: None,
1410                locale: None,
1411                global_name: None,
1412            },
1413            version: 6,
1414        };
1415        let event = Event::Ready(Box::new(ready));
1416
1417        let standby = Standby::new();
1418        let wait = standby.wait_for_event(|event: &Event| match event {
1419            Event::Ready(ready) => ready.shard.is_some_and(|id| id.number() == 5),
1420            _ => false,
1421        });
1422        assert!(!standby.events.is_empty());
1423        standby.process(&event);
1424
1425        assert_eq!(event, wait.await.unwrap());
1426        assert!(standby.events.is_empty());
1427    }
1428
1429    /// Test basic functionality of the [`Standby::wait_for_event_stream`]
1430    /// method.
1431    #[tokio::test]
1432    async fn test_wait_for_event_stream() {
1433        let standby = Standby::new();
1434        let mut stream =
1435            standby.wait_for_event_stream(|event: &Event| event.kind() == EventType::Resumed);
1436        standby.process(&Event::Resumed);
1437        assert_eq!(stream.next().await, Some(Event::Resumed));
1438        assert!(!standby.events.is_empty());
1439        drop(stream);
1440        standby.process(&Event::Resumed);
1441        assert!(standby.events.is_empty());
1442    }
1443
1444    /// Test basic functionality of the [`Standby::wait_for_message`] method.
1445    #[tokio::test]
1446    async fn test_wait_for_message() {
1447        let message = message();
1448        let event = Event::MessageCreate(Box::new(MessageCreate(message)));
1449
1450        let standby = Standby::new();
1451        let wait = standby.wait_for_message(Id::new(1), |message: &MessageCreate| {
1452            message.author.id.get() == 2
1453        });
1454        standby.process(&event);
1455
1456        assert_eq!(3, wait.await.map(|msg| msg.id.get()).unwrap());
1457        assert!(standby.messages.is_empty());
1458    }
1459
1460    /// Test basic functionality of the [`Standby::wait_for_message_stream`]
1461    /// method.
1462    #[tokio::test]
1463    async fn test_wait_for_message_stream() {
1464        let standby = Standby::new();
1465        let mut stream = standby.wait_for_message_stream(Id::new(1), |_: &MessageCreate| true);
1466        standby.process(&Event::MessageCreate(Box::new(MessageCreate(message()))));
1467        standby.process(&Event::MessageCreate(Box::new(MessageCreate(message()))));
1468
1469        assert!(stream.next().await.is_some());
1470        assert!(stream.next().await.is_some());
1471        drop(stream);
1472        assert_eq!(1, standby.messages.len());
1473        standby.process(&Event::MessageCreate(Box::new(MessageCreate(message()))));
1474        assert!(standby.messages.is_empty());
1475    }
1476
1477    /// Test basic functionality of the [`Standby::wait_for_reaction`] method.
1478    #[tokio::test]
1479    async fn test_wait_for_reaction() {
1480        let event = Event::ReactionAdd(Box::new(ReactionAdd(reaction())));
1481
1482        let standby = Standby::new();
1483        let wait = standby.wait_for_reaction(Id::new(4), |reaction: &ReactionAdd| {
1484            reaction.user_id.get() == 3
1485        });
1486
1487        standby.process(&event);
1488
1489        assert_eq!(
1490            Id::new(3),
1491            wait.await.map(|reaction| reaction.user_id).unwrap()
1492        );
1493        assert!(standby.reactions.is_empty());
1494    }
1495
1496    /// Test basic functionality of the [`Standby::wait_for_reaction_stream`]
1497    /// method.
1498    #[tokio::test]
1499    async fn test_wait_for_reaction_stream() {
1500        let standby = Standby::new();
1501        let mut stream = standby.wait_for_reaction_stream(Id::new(4), |_: &ReactionAdd| true);
1502        standby.process(&Event::ReactionAdd(Box::new(ReactionAdd(reaction()))));
1503        standby.process(&Event::ReactionAdd(Box::new(ReactionAdd(reaction()))));
1504
1505        assert!(stream.next().await.is_some());
1506        assert!(stream.next().await.is_some());
1507        drop(stream);
1508        assert_eq!(1, standby.reactions.len());
1509        standby.process(&Event::ReactionAdd(Box::new(ReactionAdd(reaction()))));
1510        assert!(standby.reactions.is_empty());
1511    }
1512
1513    /// Assert that Standby processing some non-matching events will not affect
1514    /// the matching of a later event.
1515    #[tokio::test]
1516    async fn test_wait_for_component() {
1517        let event = Event::InteractionCreate(Box::new(InteractionCreate(button())));
1518
1519        let standby = Standby::new();
1520        let wait = standby.wait_for_component(Id::new(3), |button: &Interaction| {
1521            button.author_id() == Some(Id::new(2))
1522        });
1523
1524        standby.process(&event);
1525
1526        assert_eq!(
1527            Some(Id::new(2)),
1528            wait.await.map(|button| button.author_id()).unwrap()
1529        );
1530        assert!(standby.components.is_empty());
1531    }
1532
1533    #[tokio::test]
1534    async fn test_wait_for_component_stream() {
1535        let standby = Standby::new();
1536        let mut stream = standby.wait_for_component_stream(Id::new(3), |_: &Interaction| true);
1537        standby.process(&Event::InteractionCreate(Box::new(InteractionCreate(
1538            button(),
1539        ))));
1540        standby.process(&Event::InteractionCreate(Box::new(InteractionCreate(
1541            button(),
1542        ))));
1543
1544        assert!(stream.next().await.is_some());
1545        assert!(stream.next().await.is_some());
1546        drop(stream);
1547        assert_eq!(1, standby.components.len());
1548        standby.process(&Event::InteractionCreate(Box::new(InteractionCreate(
1549            button(),
1550        ))));
1551        assert!(standby.components.is_empty());
1552    }
1553
1554    #[tokio::test]
1555    async fn test_handles_wrong_events() {
1556        let standby = Standby::new();
1557        let wait = standby.wait_for_event(|event: &Event| event.kind() == EventType::Resumed);
1558
1559        standby.process(&Event::GatewayHeartbeatAck);
1560        standby.process(&Event::GatewayHeartbeatAck);
1561        standby.process(&Event::Resumed);
1562
1563        assert_eq!(Event::Resumed, wait.await.unwrap());
1564    }
1565
1566    /// Test that generic event handlers will be given the opportunity to
1567    /// process events with specific handlers (message creates, reaction adds)
1568    /// and guild events. Similarly, guild handlers should be able to process
1569    /// specific handler events as well.
1570    #[tokio::test]
1571    async fn test_process_nonspecific_handling() {
1572        let standby = Standby::new();
1573
1574        // generic event handler gets message creates
1575        let wait = standby.wait_for_event(|event: &Event| event.kind() == EventType::MessageCreate);
1576        standby.process(&Event::MessageCreate(Box::new(MessageCreate(message()))));
1577        assert!(matches!(wait.await, Ok(Event::MessageCreate(_))));
1578
1579        // generic event handler gets reaction adds
1580        let wait = standby.wait_for_event(|event: &Event| event.kind() == EventType::ReactionAdd);
1581        standby.process(&Event::ReactionAdd(Box::new(ReactionAdd(reaction()))));
1582        assert!(matches!(wait.await, Ok(Event::ReactionAdd(_))));
1583
1584        // generic event handler gets other guild events
1585        let wait = standby.wait_for_event(|event: &Event| event.kind() == EventType::RoleDelete);
1586        standby.process(&Event::RoleDelete(RoleDelete {
1587            guild_id: Id::new(1),
1588            role_id: Id::new(2),
1589        }));
1590        assert!(matches!(wait.await, Ok(Event::RoleDelete(_))));
1591
1592        // guild event handler gets message creates or reaction events
1593        let wait = standby.wait_for(Id::new(1), |event: &Event| {
1594            event.kind() == EventType::ReactionAdd
1595        });
1596        standby.process(&Event::ReactionAdd(Box::new(ReactionAdd(reaction()))));
1597        assert!(matches!(wait.await, Ok(Event::ReactionAdd(_))));
1598    }
1599}