twilight_standby/
lib.rs

1#![doc = include_str!("../README.md")]
2#![warn(
3    clippy::missing_const_for_fn,
4    clippy::missing_docs_in_private_items,
5    clippy::pedantic,
6    missing_docs,
7    unsafe_code
8)]
9#![allow(
10    clippy::module_name_repetitions,
11    clippy::must_use_candidate,
12    clippy::unnecessary_wraps
13)]
14
15pub mod future;
16
17use self::future::{
18    WaitForComponentFuture, WaitForComponentStream, WaitForEventFuture, WaitForEventStream,
19    WaitForGuildEventFuture, WaitForGuildEventStream, WaitForMessageFuture, WaitForMessageStream,
20    WaitForReactionFuture, WaitForReactionStream,
21};
22use dashmap::DashMap;
23use std::{
24    fmt::{Debug, Display, Formatter, Result as FmtResult},
25    hash::Hash,
26    sync::atomic::{AtomicU64, Ordering},
27};
28use tokio::sync::{
29    mpsc::{self, UnboundedReceiver, UnboundedSender as MpscSender},
30    oneshot::{self, Receiver, Sender as OneshotSender},
31};
32use twilight_model::{
33    application::interaction::{Interaction, InteractionType},
34    gateway::{
35        event::Event,
36        payload::incoming::{MessageCreate, ReactionAdd},
37    },
38    id::{
39        marker::{ChannelMarker, GuildMarker, MessageMarker},
40        Id,
41    },
42};
43
44/// Map keyed by an ID - such as a channel ID or message ID - storing a list of
45/// bystanders.
46type BystanderMap<K, V> = DashMap<K, Vec<Bystander<V>>>;
47
48/// Sender to a caller that may be for a future bystander or a stream bystander.
49#[derive(Debug)]
50enum Sender<E> {
51    /// Bystander is a future and the sender is a oneshot.
52    Future(OneshotSender<E>),
53    /// Bystander is a stream and the sender is an MPSC.
54    Stream(MpscSender<E>),
55}
56
57impl<E> Sender<E> {
58    /// Whether the channel is closed.
59    fn is_closed(&self) -> bool {
60        match self {
61            Self::Future(sender) => sender.is_closed(),
62            Self::Stream(sender) => sender.is_closed(),
63        }
64    }
65}
66
67/// Registration for a caller to wait for an event based on a predicate
68/// function.
69struct Bystander<T> {
70    /// Predicate check to perform on an event.
71    func: Box<dyn Fn(&T) -> bool + Send + Sync>,
72    /// [`Sender::Future`]s consume themselves once upon sending so the sender
73    /// needs to be able to be taken out separately.
74    sender: Option<Sender<T>>,
75}
76
77impl<T: Debug> Debug for Bystander<T> {
78    fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
79        f.debug_struct("Bystander")
80            .field("func", &"<dyn Fn(&T) -> bool>")
81            .field("sender", &self.sender)
82            .finish()
83    }
84}
85
86/// The `Standby` struct, used by the main event loop to process events and by
87/// tasks to wait for an event.
88///
89/// Refer to the crate-level documentation for more information.
90///
91/// # Using Standby in multiple tasks
92///
93/// To use a Standby instance in multiple tasks, consider wrapping it in an
94/// [`std::sync::Arc`] or [`std::rc::Rc`].
95///
96/// # Examples
97///
98/// ## Timeouts
99///
100/// Futures can be timed out by passing the future returned by Standby to
101/// functions such as [`tokio::time::timeout`]:
102///
103/// ```rust,no_run
104/// # #[tokio::main] async fn main() -> Result<(), Box<dyn std::error::Error>> {
105/// use std::time::Duration;
106/// use twilight_model::gateway::event::{Event, EventType};
107/// use twilight_standby::Standby;
108///
109/// let standby = Standby::new();
110/// let future = standby.wait_for_event(|event: &Event| event.kind() == EventType::Ready);
111/// let event = tokio::time::timeout(Duration::from_secs(1), future).await?;
112/// # Ok(()) }
113/// ```
114///
115/// [`tokio::time::timeout`]: https://docs.rs/tokio/latest/tokio/time/fn.timeout.html
116#[derive(Debug, Default)]
117pub struct Standby {
118    /// List of component bystanders where the ID of the message is known
119    /// beforehand.
120    components: DashMap<Id<MessageMarker>, Vec<Bystander<Interaction>>>,
121    /// Bystanders for any event that may not be in any particular guild.
122    ///
123    /// The key is generated via [`event_counter`].
124    ///
125    /// [`event_counter`]: Self::event_counter
126    events: DashMap<u64, Bystander<Event>>,
127    /// Event counter to be used as the key of [`events`].
128    ///
129    /// [`events`]: Self::events
130    event_counter: AtomicU64,
131    /// List of bystanders where the ID of the guild is known beforehand.
132    guilds: DashMap<Id<GuildMarker>, Vec<Bystander<Event>>>,
133    /// List of message bystanders where the ID of the channel is known
134    /// beforehand.
135    messages: DashMap<Id<ChannelMarker>, Vec<Bystander<MessageCreate>>>,
136    /// List of reaction bystanders where the ID of the message is known
137    /// beforehand.
138    reactions: DashMap<Id<MessageMarker>, Vec<Bystander<ReactionAdd>>>,
139}
140
141impl Standby {
142    /// Create a new instance of `Standby`.
143    ///
144    /// Once a `Standby` has been created it must process gateway events via
145    /// [`process`]. Awaiting an event can start via methods such as
146    /// [`wait_for`] and [`wait_for_message_stream`].
147    ///
148    /// [`process`]: Self::process
149    /// [`wait_for`]: Self::wait_for
150    /// [`wait_for_message_stream`]: Self::wait_for_message_stream
151    #[must_use = "must process events to be useful"]
152    pub fn new() -> Self {
153        Self::default()
154    }
155
156    /// Process an event, calling any bystanders that might be waiting on it.
157    ///
158    /// Returns statistics about matched [`Standby`] calls and how they were
159    /// processed. For example, by using [`ProcessResults::matched`] you can
160    /// determine how many calls were sent an event.
161    ///
162    /// When a bystander checks to see if an event is what it's waiting for, it
163    /// will receive the event by cloning it.
164    ///
165    /// This function must be called when events are received in order for
166    /// futures returned by methods to fulfill.
167    pub fn process(&self, event: &Event) -> ProcessResults {
168        tracing::trace!(event_type = ?event.kind(), ?event, "processing event");
169
170        let mut completions = ProcessResults::new();
171
172        match event {
173            Event::InteractionCreate(e) => {
174                if e.kind == InteractionType::MessageComponent {
175                    if let Some(message) = &e.message {
176                        completions.add_with(&Self::process_specific_event(
177                            &self.components,
178                            message.id,
179                            e,
180                        ));
181                    }
182                }
183            }
184            Event::MessageCreate(e) => {
185                completions.add_with(&Self::process_specific_event(
186                    &self.messages,
187                    e.0.channel_id,
188                    e,
189                ));
190            }
191            Event::ReactionAdd(e) => {
192                completions.add_with(&Self::process_specific_event(
193                    &self.reactions,
194                    e.0.message_id,
195                    e,
196                ));
197            }
198            _ => {}
199        }
200
201        if let Some(guild_id) = event.guild_id() {
202            completions.add_with(&Self::process_specific_event(&self.guilds, guild_id, event));
203        }
204
205        completions.add_with(&Self::process_event(&self.events, event));
206
207        completions
208    }
209
210    /// Wait for an event in a certain guild.
211    ///
212    /// To wait for multiple guild events matching the given predicate use
213    /// [`wait_for_stream`].
214    ///
215    /// # Examples
216    ///
217    /// Wait for a [`BanAdd`] event in guild 123:
218    ///
219    /// ```no_run
220    /// # #[tokio::main]
221    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
222    /// use twilight_model::{
223    ///     gateway::event::{Event, EventType},
224    ///     id::Id,
225    /// };
226    /// use twilight_standby::Standby;
227    ///
228    /// let standby = Standby::new();
229    ///
230    /// let guild_id = Id::new(123);
231    ///
232    /// let reaction = standby
233    ///     .wait_for(guild_id, |event: &Event| event.kind() == EventType::BanAdd)
234    ///     .await?;
235    /// # Ok(()) }
236    /// ```
237    ///
238    /// # Errors
239    ///
240    /// The returned future resolves to a [`Canceled`] error if the associated
241    /// [`Standby`] instance is dropped.
242    ///
243    /// [`BanAdd`]: twilight_model::gateway::payload::incoming::BanAdd
244    /// [`Canceled`]: future::Canceled
245    /// [`wait_for_stream`]: Self::wait_for_stream
246    pub fn wait_for<F: Fn(&Event) -> bool + Send + Sync + 'static>(
247        &self,
248        guild_id: Id<GuildMarker>,
249        check: impl Into<Box<F>>,
250    ) -> WaitForGuildEventFuture {
251        tracing::trace!(%guild_id, "waiting for event in guild");
252
253        WaitForGuildEventFuture {
254            rx: Self::insert_future(&self.guilds, guild_id, check),
255        }
256    }
257
258    /// Wait for a stream of events in a certain guild.
259    ///
260    /// To wait for only one guild event matching the given predicate use
261    /// [`wait_for`].
262    ///
263    /// # Examples
264    ///
265    /// Wait for multiple [`BanAdd`] events in guild 123:
266    ///
267    /// ```no_run
268    /// # #[tokio::main]
269    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
270    /// use tokio_stream::StreamExt;
271    /// use twilight_model::{
272    ///     gateway::event::{Event, EventType},
273    ///     id::Id,
274    /// };
275    /// use twilight_standby::Standby;
276    ///
277    /// let standby = Standby::new();
278    ///
279    /// let guild_id = Id::new(123);
280    ///
281    /// let mut stream =
282    ///     standby.wait_for_stream(guild_id, |event: &Event| event.kind() == EventType::BanAdd);
283    ///
284    /// while let Some(event) = stream.next().await {
285    ///     if let Event::BanAdd(ban) = event {
286    ///         println!("user {} was banned in guild {}", ban.user.id, ban.guild_id);
287    ///     }
288    /// }
289    /// # Ok(()) }
290    /// ```
291    ///
292    /// # Errors
293    ///
294    /// The returned stream ends when the associated [`Standby`] instance is
295    /// dropped.
296    ///
297    /// [`BanAdd`]: twilight_model::gateway::payload::incoming::BanAdd
298    /// [`wait_for`]: Self::wait_for
299    pub fn wait_for_stream<F: Fn(&Event) -> bool + Send + Sync + 'static>(
300        &self,
301        guild_id: Id<GuildMarker>,
302        check: impl Into<Box<F>>,
303    ) -> WaitForGuildEventStream {
304        tracing::trace!(%guild_id, "waiting for event in guild");
305
306        WaitForGuildEventStream {
307            rx: Self::insert_stream(&self.guilds, guild_id, check),
308        }
309    }
310
311    /// Wait for an event not in a certain guild. This must be filtered by an
312    /// event type.
313    ///
314    /// To wait for multiple events matching the given predicate use
315    /// [`wait_for_event_stream`].
316    ///
317    /// # Examples
318    ///
319    /// Wait for a [`Ready`] event for shard 5:
320    ///
321    /// ```no_run
322    /// # #[tokio::main]
323    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
324    /// use twilight_model::gateway::event::{Event, EventType};
325    /// use twilight_standby::Standby;
326    ///
327    /// let standby = Standby::new();
328    ///
329    /// let ready = standby
330    ///     .wait_for_event(|event: &Event| {
331    ///         if let Event::Ready(ready) = event {
332    ///             ready.shard.map_or(false, |id| id.number() == 5)
333    ///         } else {
334    ///             false
335    ///         }
336    ///     })
337    ///     .await?;
338    /// # Ok(()) }
339    /// ```
340    ///
341    /// # Errors
342    ///
343    /// The returned future resolves to a [`Canceled`] error if the associated
344    /// [`Standby`] instance is dropped.
345    ///
346    /// [`Canceled`]: future::Canceled
347    /// [`Ready`]: twilight_model::gateway::payload::incoming::Ready
348    /// [`wait_for_event_stream`]: Self::wait_for_event_stream
349    pub fn wait_for_event<F: Fn(&Event) -> bool + Send + Sync + 'static>(
350        &self,
351        check: impl Into<Box<F>>,
352    ) -> WaitForEventFuture {
353        tracing::trace!("waiting for event");
354
355        let (tx, rx) = oneshot::channel();
356
357        self.events.insert(
358            self.next_event_id(),
359            Bystander {
360                func: check.into(),
361                sender: Some(Sender::Future(tx)),
362            },
363        );
364
365        WaitForEventFuture { rx }
366    }
367
368    /// Wait for a stream of events not in a certain guild. This must be
369    /// filtered by an event type.
370    ///
371    /// To wait for only one event matching the given predicate use
372    /// [`wait_for_event`].
373    ///
374    /// # Examples
375    ///
376    /// Wait for multiple [`Ready`] events on shard 5:
377    ///
378    /// ```no_run
379    /// # #[tokio::main]
380    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
381    /// use tokio_stream::StreamExt;
382    /// use twilight_model::gateway::event::{Event, EventType};
383    /// use twilight_standby::Standby;
384    ///
385    /// let standby = Standby::new();
386    ///
387    /// let mut events = standby.wait_for_event_stream(|event: &Event| {
388    ///     if let Event::Ready(ready) = event {
389    ///         ready.shard.map_or(false, |id| id.number() == 5)
390    ///     } else {
391    ///         false
392    ///     }
393    /// });
394    ///
395    /// while let Some(event) = events.next().await {
396    ///     println!("got event with type {:?}", event.kind());
397    /// }
398    /// # Ok(()) }
399    /// ```
400    ///
401    /// # Errors
402    ///
403    /// The returned stream ends when the associated [`Standby`] instance is
404    /// dropped.
405    ///
406    /// [`Ready`]: twilight_model::gateway::payload::incoming::Ready
407    /// [`wait_for_event`]: Self::wait_for_event
408    pub fn wait_for_event_stream<F: Fn(&Event) -> bool + Send + Sync + 'static>(
409        &self,
410        check: impl Into<Box<F>>,
411    ) -> WaitForEventStream {
412        tracing::trace!("waiting for event");
413
414        let (tx, rx) = mpsc::unbounded_channel();
415
416        self.events.insert(
417            self.next_event_id(),
418            Bystander {
419                func: check.into(),
420                sender: Some(Sender::Stream(tx)),
421            },
422        );
423
424        WaitForEventStream { rx }
425    }
426
427    /// Wait for a message in a certain channel.
428    ///
429    /// To wait for multiple messages matching the given predicate use
430    /// [`wait_for_message_stream`].
431    ///
432    /// # Examples
433    ///
434    /// Wait for a message in channel 123 by user 456 with the content "test":
435    ///
436    /// ```no_run
437    /// # #[tokio::main]
438    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
439    /// use twilight_model::{gateway::payload::incoming::MessageCreate, id::Id};
440    /// use twilight_standby::Standby;
441    ///
442    /// let standby = Standby::new();
443    ///
444    /// let author_id = Id::new(456);
445    /// let channel_id = Id::new(123);
446    ///
447    /// let message = standby
448    ///     .wait_for_message(channel_id, move |event: &MessageCreate| {
449    ///         event.author.id == author_id && event.content == "test"
450    ///     })
451    ///     .await?;
452    /// # Ok(()) }
453    /// ```
454    ///
455    /// # Errors
456    ///
457    /// The returned future resolves to a [`Canceled`] error if the associated
458    /// [`Standby`] instance is dropped.
459    ///
460    /// [`Canceled`]: future::Canceled
461    /// [`wait_for_message_stream`]: Self::wait_for_message_stream
462    pub fn wait_for_message<F: Fn(&MessageCreate) -> bool + Send + Sync + 'static>(
463        &self,
464        channel_id: Id<ChannelMarker>,
465        check: impl Into<Box<F>>,
466    ) -> WaitForMessageFuture {
467        tracing::trace!(%channel_id, "waiting for message in channel");
468
469        WaitForMessageFuture {
470            rx: Self::insert_future(&self.messages, channel_id, check),
471        }
472    }
473
474    /// Wait for a stream of message in a certain channel.
475    ///
476    /// To wait for only one message matching the given predicate use
477    /// [`wait_for_message`].
478    ///
479    /// # Examples
480    ///
481    /// Wait for multiple messages in channel 123 by user 456 with the content
482    /// "test":
483    ///
484    /// ```no_run
485    /// # #[tokio::main]
486    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
487    /// use tokio_stream::StreamExt;
488    /// use twilight_model::{gateway::payload::incoming::MessageCreate, id::Id};
489    /// use twilight_standby::Standby;
490    ///
491    /// let standby = Standby::new();
492    ///
493    /// let author_id = Id::new(456);
494    /// let channel_id = Id::new(123);
495    ///
496    /// let mut messages = standby.wait_for_message_stream(channel_id, move |event: &MessageCreate| {
497    ///     event.author.id == author_id && event.content == "test"
498    /// });
499    ///
500    /// while let Some(message) = messages.next().await {
501    ///     println!("got message by {}", message.author.id);
502    /// }
503    /// # Ok(()) }
504    /// ```
505    ///
506    /// # Errors
507    ///
508    /// The returned stream ends when the associated [`Standby`] instance is
509    /// dropped.
510    ///
511    /// [`wait_for_message`]: Self::wait_for_message
512    pub fn wait_for_message_stream<F: Fn(&MessageCreate) -> bool + Send + Sync + 'static>(
513        &self,
514        channel_id: Id<ChannelMarker>,
515        check: impl Into<Box<F>>,
516    ) -> WaitForMessageStream {
517        tracing::trace!(%channel_id, "waiting for message in channel");
518
519        WaitForMessageStream {
520            rx: Self::insert_stream(&self.messages, channel_id, check),
521        }
522    }
523
524    /// Wait for a reaction on a certain message.
525    ///
526    /// To wait for multiple reactions matching the given predicate use
527    /// [`wait_for_reaction_stream`].
528    ///
529    /// # Examples
530    ///
531    /// Wait for a reaction on message 123 by user 456:
532    ///
533    /// ```no_run
534    /// # #[tokio::main]
535    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
536    /// use twilight_model::{gateway::payload::incoming::ReactionAdd, id::Id};
537    /// use twilight_standby::Standby;
538    ///
539    /// let standby = Standby::new();
540    ///
541    /// let message_id = Id::new(123);
542    /// let user_id = Id::new(456);
543    ///
544    /// let reaction = standby
545    ///     .wait_for_reaction(message_id, move |event: &ReactionAdd| {
546    ///         event.user_id == user_id
547    ///     })
548    ///     .await?;
549    /// # Ok(()) }
550    /// ```
551    ///
552    /// # Errors
553    ///
554    /// The returned future resolves to a [`Canceled`] error if the associated
555    /// [`Standby`] instance is dropped.
556    ///
557    /// [`Canceled`]: future::Canceled
558    /// [`wait_for_reaction_stream`]: Self::wait_for_reaction_stream
559    pub fn wait_for_reaction<F: Fn(&ReactionAdd) -> bool + Send + Sync + 'static>(
560        &self,
561        message_id: Id<MessageMarker>,
562        check: impl Into<Box<F>>,
563    ) -> WaitForReactionFuture {
564        tracing::trace!(%message_id, "waiting for reaction on message");
565
566        WaitForReactionFuture {
567            rx: Self::insert_future(&self.reactions, message_id, check),
568        }
569    }
570
571    /// Wait for a stream of reactions on a certain message.
572    ///
573    /// To wait for only one reaction matching the given predicate use
574    /// [`wait_for_reaction`].
575    ///
576    /// # Examples
577    ///
578    /// Wait for multiple reactions on message 123 with unicode reaction "🤠":
579    ///
580    /// ```no_run
581    /// # #[tokio::main]
582    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
583    /// use tokio_stream::StreamExt;
584    /// use twilight_model::{
585    ///     channel::message::EmojiReactionType,
586    ///     gateway::payload::incoming::ReactionAdd,
587    ///     id::Id,
588    /// };
589    /// use twilight_standby::Standby;
590    ///
591    /// let standby = Standby::new();
592    ///
593    /// let message_id = Id::new(123);
594    ///
595    /// let mut reactions = standby.wait_for_reaction_stream(message_id, |event: &ReactionAdd| {
596    ///     matches!(&event.emoji, EmojiReactionType::Unicode { name } if name == "🤠")
597    /// });
598    ///
599    /// while let Some(reaction) = reactions.next().await {
600    ///     println!("got a reaction by {}", reaction.user_id);
601    /// }
602    /// # Ok(()) }
603    /// ```
604    ///
605    /// # Errors
606    ///
607    /// The returned stream ends when the associated [`Standby`] instance is
608    /// dropped.
609    ///
610    /// [`wait_for_reaction`]: Self::wait_for_reaction
611    pub fn wait_for_reaction_stream<F: Fn(&ReactionAdd) -> bool + Send + Sync + 'static>(
612        &self,
613        message_id: Id<MessageMarker>,
614        check: impl Into<Box<F>>,
615    ) -> WaitForReactionStream {
616        tracing::trace!(%message_id, "waiting for reaction on message");
617
618        WaitForReactionStream {
619            rx: Self::insert_stream(&self.reactions, message_id, check),
620        }
621    }
622
623    /// Wait for a component on a certain message.
624    ///
625    /// Returns a `Canceled` error if the `Standby` struct was dropped.
626    ///
627    /// If you need to wait for multiple components matching the given predicate,
628    /// use [`wait_for_component_stream`].
629    ///
630    /// # Examples
631    ///
632    /// Wait for a component on message 123 by user 456:
633    ///
634    /// ```no_run
635    /// # #[tokio::main] async fn main() -> Result<(), Box<dyn std::error::Error>> {
636    /// use twilight_model::{application::interaction::Interaction, id::Id};
637    /// use twilight_standby::Standby;
638    ///
639    /// let standby = Standby::new();
640    /// let message_id = Id::new(123);
641    ///
642    /// let component = standby
643    ///     .wait_for_component(message_id, |event: &Interaction| {
644    ///         event.author_id() == Some(Id::new(456))
645    ///     })
646    ///     .await?;
647    /// # Ok(()) }
648    /// ```
649    ///
650    /// [`wait_for_component_stream`]: Self::wait_for_component_stream
651    pub fn wait_for_component<F: Fn(&Interaction) -> bool + Send + Sync + 'static>(
652        &self,
653        message_id: Id<MessageMarker>,
654        check: impl Into<Box<F>>,
655    ) -> WaitForComponentFuture {
656        tracing::trace!(%message_id, "waiting for component on message");
657
658        WaitForComponentFuture {
659            rx: Self::insert_future(&self.components, message_id, check),
660        }
661    }
662
663    /// Wait for a stream of components on a certain message.
664    ///
665    /// Returns a `Canceled` error if the `Standby` struct was dropped.
666    ///
667    /// If you need to wait for only one component matching the given predicate,
668    /// use [`wait_for_component`].
669    ///
670    /// # Examples
671    ///
672    /// Wait for multiple button components on message 123 with a `custom_id` of
673    /// "Click":
674    ///
675    /// ```no_run
676    /// # #[tokio::main] async fn main() -> Result<(), Box<dyn std::error::Error>> {
677    /// use tokio_stream::StreamExt;
678    /// use twilight_model::{
679    ///     application::interaction::{Interaction, InteractionData},
680    ///     id::Id,
681    /// };
682    /// use twilight_standby::Standby;
683    ///
684    /// let standby = Standby::new();
685    /// let message_id = Id::new(123);
686    ///
687    /// let mut components = standby.wait_for_component_stream(message_id, |event: &Interaction| {
688    ///     if let Some(InteractionData::MessageComponent(data)) = &event.data {
689    ///         data.custom_id == "Click".to_string()
690    ///     } else {
691    ///         false
692    ///     }
693    /// });
694    ///
695    /// while let Some(component) = components.next().await {
696    ///     println!("got a component by {}", component.author_id().unwrap());
697    /// }
698    /// # Ok(()) }
699    /// ```
700    ///
701    /// [`wait_for_component`]: Self::wait_for_component
702    pub fn wait_for_component_stream<F: Fn(&Interaction) -> bool + Send + Sync + 'static>(
703        &self,
704        message_id: Id<MessageMarker>,
705        check: impl Into<Box<F>>,
706    ) -> WaitForComponentStream {
707        tracing::trace!(%message_id, "waiting for component on message");
708
709        WaitForComponentStream {
710            rx: Self::insert_stream(&self.components, message_id, check),
711        }
712    }
713
714    /// Next event ID in [`Standby::event_counter`].
715    fn next_event_id(&self) -> u64 {
716        self.event_counter.fetch_add(1, Ordering::SeqCst)
717    }
718
719    /// Append a new future bystander into a map according to the ID.
720    fn insert_future<F: Fn(&V) -> bool + Send + Sync + 'static, K: Eq + Hash, V>(
721        map: &BystanderMap<K, V>,
722        id: K,
723        check: impl Into<Box<F>>,
724    ) -> Receiver<V> {
725        let (tx, rx) = oneshot::channel();
726
727        let mut entry = map.entry(id).or_default();
728        entry.push(Bystander {
729            func: check.into(),
730            sender: Some(Sender::Future(tx)),
731        });
732
733        rx
734    }
735
736    /// Append a new stream bystander into a map according to the ID.
737    fn insert_stream<F: Fn(&V) -> bool + Send + Sync + 'static, K: Eq + Hash, V>(
738        map: &BystanderMap<K, V>,
739        id: K,
740        check: impl Into<Box<F>>,
741    ) -> UnboundedReceiver<V> {
742        let (tx, rx) = mpsc::unbounded_channel();
743
744        let mut entry = map.entry(id).or_default();
745        entry.push(Bystander {
746            func: check.into(),
747            sender: Some(Sender::Stream(tx)),
748        });
749
750        rx
751    }
752
753    /// Process a general event that is not of any particular type or in any
754    /// particular guild.
755    #[tracing::instrument(level = "trace")]
756    fn process_event<K: Debug + Display + Eq + Hash + PartialEq + 'static, V: Clone + Debug>(
757        map: &DashMap<K, Bystander<V>>,
758        event: &V,
759    ) -> ProcessResults {
760        tracing::trace!(?event, "processing event");
761
762        let mut results = ProcessResults::new();
763
764        map.retain(|id, bystander| {
765            let result = Self::bystander_process(bystander, event);
766            results.handle(result);
767
768            tracing::trace!(bystander_id = %id, ?result, "event bystander processed");
769
770            // We want to retain bystanders that are *incomplete* and remove
771            // bystanders that are *complete*.
772            !result.is_complete()
773        });
774
775        results
776    }
777
778    /// Process a general event that is either of a particular type or in a
779    /// particular guild.
780    #[tracing::instrument(level = "trace")]
781    fn process_specific_event<
782        K: Debug + Display + Eq + Hash + PartialEq + 'static,
783        V: Clone + Debug,
784    >(
785        map: &DashMap<K, Vec<Bystander<V>>>,
786        guild_id: K,
787        event: &V,
788    ) -> ProcessResults {
789        // Iterate over a guild's bystanders and mark it for removal if there
790        // are no bystanders remaining.
791        let (remove_guild, results) = if let Some(mut bystanders) = map.get_mut(&guild_id) {
792            let results = Self::bystander_iter(&mut bystanders, event);
793
794            (bystanders.is_empty(), results)
795        } else {
796            tracing::trace!(%guild_id, "guild has no event bystanders");
797
798            return ProcessResults::new();
799        };
800
801        if remove_guild {
802            tracing::trace!(%guild_id, "removing guild from map");
803
804            map.remove(&guild_id);
805        }
806
807        results
808    }
809
810    /// Iterate over bystanders and remove the ones that match the predicate.
811    #[tracing::instrument(level = "trace")]
812    fn bystander_iter<E: Clone + Debug>(
813        bystanders: &mut Vec<Bystander<E>>,
814        event: &E,
815    ) -> ProcessResults {
816        tracing::trace!(?bystanders, "iterating over bystanders");
817
818        // Iterate over the list of bystanders by using an index and manually
819        // indexing in to the list.
820        //
821        // # Logic
822        //
823        // In each iteration we decide whether to retain a bystander: if we do
824        // then we can increment our index and move on, but if we opt to instead
825        // remove it then we do so and don't increment the index. The reason we
826        // don't increment the index is because when we remove an element the
827        // index does not become empty and instead everything to the right is
828        // shifted to the left, illustrated as such:
829        //
830        //     |---|
831        //     v   |
832        // A - B - C - D
833        //     |   ^   |
834        //     |   |---|
835        //     |
836        //  Remove B
837        //
838        // After: A - C - D
839        //
840        // # Reasons not to use alternatives
841        //
842        // **`Vec::retain`** we need to mutate the entries in order to take out
843        // the sender and `Vec::retain` only gives us immutable references.
844        //
845        // A form of enumeration can't be used because sometimes the index
846        // doesn't advance; iterators would continue to provide incrementing
847        // enumeration indexes while we sometimes want to reuse an index.
848        let mut index = 0;
849        let mut results = ProcessResults::new();
850
851        while index < bystanders.len() {
852            tracing::trace!(%index, "checking bystander");
853
854            let status = Self::bystander_process(&mut bystanders[index], event);
855            results.handle(status);
856
857            tracing::trace!(%index, ?status, "checked bystander");
858
859            if status.is_complete() {
860                bystanders.remove(index);
861            } else {
862                index += 1;
863            }
864        }
865
866        results
867    }
868
869    /// Process a bystander, sending the event if the sender is active and the
870    /// predicate matches. Returns whether the bystander has fulfilled.
871    ///
872    /// Returns whether the bystander is fulfilled; if the bystander has been
873    /// fulfilled then the channel is now closed.
874    #[tracing::instrument(level = "trace")]
875    fn bystander_process<T: Clone + Debug>(
876        bystander: &mut Bystander<T>,
877        event: &T,
878    ) -> ProcessStatus {
879        // We need to take the sender out because `OneshotSender`s consume
880        // themselves when calling `OneshotSender::send`.
881        let Some(sender) = bystander.sender.take() else {
882            tracing::trace!("bystander has no sender, indicating for removal");
883
884            return ProcessStatus::AlreadyComplete;
885        };
886
887        // The channel may have closed due to the receiver dropping their end,
888        // in which case we can say we're done.
889        if sender.is_closed() {
890            tracing::trace!("bystander's rx dropped, indicating for removal");
891
892            return ProcessStatus::Dropped;
893        }
894
895        // Lastly check to see if the predicate matches the event. If it doesn't
896        // then we can short-circuit.
897        if !(bystander.func)(event) {
898            tracing::trace!("bystander check doesn't match, not removing");
899
900            // Put the sender back into its bystander since we'll still need it
901            // next time around.
902            bystander.sender.replace(sender);
903
904            return ProcessStatus::Skip;
905        }
906
907        match sender {
908            Sender::Future(tx) => {
909                // We don't care if the event successfully sends or not since
910                // we're going to be tossing out the bystander anyway.
911                drop(tx.send(event.clone()));
912
913                tracing::trace!("bystander matched event, indicating for removal");
914
915                ProcessStatus::SentFuture
916            }
917            Sender::Stream(tx) => {
918                // If we can send an event to the receiver and the channel is
919                // still open then we need to retain the bystander, otherwise we
920                // need to mark it for removal.
921                if tx.send(event.clone()).is_ok() {
922                    tracing::trace!("bystander is a stream, retaining in map");
923
924                    bystander.sender.replace(Sender::Stream(tx));
925
926                    ProcessStatus::SentStream
927                } else {
928                    ProcessStatus::Dropped
929                }
930            }
931        }
932    }
933}
934/// Number of [`Standby`] calls that were completed.
935#[derive(Clone, Debug, Eq, Hash, PartialEq)]
936pub struct ProcessResults {
937    /// Number of bystanders that were dropped due to the receiving end
938    /// dropping.
939    dropped: usize,
940    /// Number of future bystanders that were open and were sent an event.
941    fulfilled: usize,
942    /// Number of stream bystanders that were open and were sent an event.
943    sent: usize,
944}
945
946impl ProcessResults {
947    /// Create a new set of zeroed out results.
948    const fn new() -> Self {
949        Self {
950            dropped: 0,
951            fulfilled: 0,
952            sent: 0,
953        }
954    }
955
956    /// Number of [`Standby`] calls where the receiver had already dropped their
957    /// end.
958    ///
959    /// This may happen when a caller calls into [`Standby`] but then times out
960    /// or otherwise cancels their request.
961    pub const fn dropped(&self) -> usize {
962        self.dropped
963    }
964
965    /// Number of [`Standby`] calls that were fulfilled.
966    ///
967    /// Calls for futures via methods such as [`Standby::wait_for`] will be
968    /// marked as fulfilled once matched and an event is sent over the channel.
969    ///
970    /// **Caveat**: although an event has been sent over the channel to the
971    /// receiver it is not guaranteed whether the receiver end actually received
972    /// it; the receiver end may drop shortly after an event is sent. In this
973    /// case the call is considered to be fulfilled.
974    pub const fn fulfilled(&self) -> usize {
975        self.fulfilled
976    }
977
978    /// Number of calls that were matched and were sent an event.
979    ///
980    /// This is the sum of [`fulfilled`] and [`sent`].
981    ///
982    /// See the caveats for both methods.
983    ///
984    /// [`fulfilled`]: Self::fulfilled
985    /// [`sent`]: Self::sent
986    pub const fn matched(&self) -> usize {
987        self.fulfilled() + self.sent()
988    }
989
990    /// Number of [`Standby`] streaming calls that were matched and had an event
991    /// sent to them.
992    ///
993    /// **Caveat**: although an event has been sent over the channel to the
994    /// receiver it is not guaranteed whether the receiver end actually received
995    /// it; the receiver end may drop shortly after an event is sent. In this
996    /// case the call is considered to be sent. Checks over this call will in
997    /// the future be considered [`dropped`].
998    ///
999    /// [`dropped`]: Self::dropped
1000    pub const fn sent(&self) -> usize {
1001        self.sent
1002    }
1003
1004    /// Add another set of results to this set.
1005    fn add_with(&mut self, other: &Self) {
1006        self.dropped = self.dropped.saturating_add(other.dropped);
1007        self.fulfilled = self.fulfilled.saturating_add(other.fulfilled);
1008        self.sent = self.sent.saturating_add(other.sent);
1009    }
1010
1011    /// Handle a process status.
1012    fn handle(&mut self, status: ProcessStatus) {
1013        match status {
1014            ProcessStatus::Dropped => {
1015                self.dropped += 1;
1016            }
1017            ProcessStatus::SentFuture => {
1018                self.fulfilled += 1;
1019            }
1020            ProcessStatus::SentStream => {
1021                self.sent += 1;
1022            }
1023            ProcessStatus::AlreadyComplete | ProcessStatus::Skip => {}
1024        }
1025    }
1026}
1027
1028/// Status result of processing a bystander via [`Standby::bystander_process`].
1029#[derive(Clone, Copy, Debug)]
1030enum ProcessStatus {
1031    /// Call matched but already matched previously and was not removed, so the
1032    /// subject must be removed and not counted towards results.
1033    AlreadyComplete,
1034    /// Call matched but the receiver dropped their end.
1035    Dropped,
1036    /// Call matched a oneshot.
1037    SentFuture,
1038    /// Call matched a stream.
1039    SentStream,
1040    /// Call was not matched.
1041    Skip,
1042}
1043
1044impl ProcessStatus {
1045    /// Whether the call is complete.
1046    const fn is_complete(self) -> bool {
1047        matches!(
1048            self,
1049            Self::AlreadyComplete | Self::Dropped | Self::SentFuture
1050        )
1051    }
1052}
1053
1054#[cfg(test)]
1055mod tests {
1056    #![allow(clippy::non_ascii_literal)]
1057
1058    use crate::Standby;
1059    use static_assertions::assert_impl_all;
1060    use std::fmt::Debug;
1061    use tokio_stream::StreamExt;
1062    use twilight_gateway::{Event, EventType};
1063    use twilight_model::{
1064        application::interaction::{
1065            message_component::MessageComponentInteractionData, Interaction, InteractionData,
1066            InteractionType,
1067        },
1068        channel::{
1069            message::{component::ComponentType, EmojiReactionType, Message, MessageType},
1070            Channel, ChannelType,
1071        },
1072        gateway::{
1073            payload::incoming::{InteractionCreate, MessageCreate, ReactionAdd, Ready, RoleDelete},
1074            GatewayReaction, ShardId,
1075        },
1076        guild::Permissions,
1077        id::{marker::GuildMarker, Id},
1078        oauth::{ApplicationFlags, ApplicationIntegrationMap, PartialApplication},
1079        user::{CurrentUser, User},
1080        util::Timestamp,
1081    };
1082
1083    assert_impl_all!(Standby: Debug, Default, Send, Sync);
1084
1085    #[allow(deprecated)]
1086    fn message() -> Message {
1087        Message {
1088            activity: None,
1089            application: None,
1090            application_id: None,
1091            attachments: Vec::new(),
1092            author: User {
1093                accent_color: None,
1094                avatar: None,
1095                avatar_decoration: None,
1096                avatar_decoration_data: None,
1097                banner: None,
1098                bot: false,
1099                discriminator: 1,
1100                email: None,
1101                flags: None,
1102                global_name: Some("test".to_owned()),
1103                id: Id::new(2),
1104                locale: None,
1105                mfa_enabled: None,
1106                name: "twilight".to_owned(),
1107                premium_type: None,
1108                public_flags: None,
1109                system: None,
1110                verified: None,
1111            },
1112            call: None,
1113            channel_id: Id::new(1),
1114            components: Vec::new(),
1115            content: "test".to_owned(),
1116            edited_timestamp: None,
1117            embeds: Vec::new(),
1118            flags: None,
1119            guild_id: Some(Id::new(4)),
1120            id: Id::new(3),
1121            interaction: None,
1122            interaction_metadata: None,
1123            kind: MessageType::Regular,
1124            member: None,
1125            mention_channels: Vec::new(),
1126            mention_everyone: false,
1127            mention_roles: Vec::new(),
1128            mentions: Vec::new(),
1129            message_snapshots: Vec::new(),
1130            pinned: false,
1131            poll: None,
1132            reactions: Vec::new(),
1133            reference: None,
1134            referenced_message: None,
1135            role_subscription_data: None,
1136            sticker_items: Vec::new(),
1137            timestamp: Timestamp::from_secs(1_632_072_645).expect("non zero"),
1138            thread: None,
1139            tts: false,
1140            webhook_id: None,
1141        }
1142    }
1143
1144    fn reaction() -> GatewayReaction {
1145        GatewayReaction {
1146            burst: false,
1147            burst_colors: Vec::new(),
1148            channel_id: Id::new(2),
1149            emoji: EmojiReactionType::Unicode {
1150                name: "🍎".to_owned(),
1151            },
1152            guild_id: Some(Id::new(1)),
1153            member: None,
1154            message_author_id: None,
1155            message_id: Id::new(4),
1156            user_id: Id::new(3),
1157        }
1158    }
1159
1160    #[allow(deprecated)]
1161    fn button() -> Interaction {
1162        Interaction {
1163            app_permissions: Some(Permissions::SEND_MESSAGES),
1164            application_id: Id::new(1),
1165            authorizing_integration_owners: ApplicationIntegrationMap {
1166                guild: None,
1167                user: None,
1168            },
1169            channel: Some(Channel {
1170                bitrate: None,
1171                guild_id: None,
1172                id: Id::new(400),
1173                kind: ChannelType::GuildText,
1174                last_message_id: None,
1175                last_pin_timestamp: None,
1176                name: None,
1177                nsfw: None,
1178                owner_id: None,
1179                parent_id: None,
1180                permission_overwrites: None,
1181                position: None,
1182                rate_limit_per_user: None,
1183                recipients: None,
1184                rtc_region: None,
1185                topic: None,
1186                user_limit: None,
1187                application_id: None,
1188                applied_tags: None,
1189                available_tags: None,
1190                default_auto_archive_duration: None,
1191                default_forum_layout: None,
1192                default_reaction_emoji: None,
1193                default_sort_order: None,
1194                default_thread_rate_limit_per_user: None,
1195                flags: None,
1196                icon: None,
1197                invitable: None,
1198                managed: None,
1199                member: None,
1200                member_count: None,
1201                message_count: None,
1202                newly_created: None,
1203                thread_metadata: None,
1204                video_quality_mode: None,
1205            }),
1206            channel_id: None,
1207            context: None,
1208            data: Some(InteractionData::MessageComponent(Box::new(
1209                MessageComponentInteractionData {
1210                    custom_id: String::from("Click"),
1211                    component_type: ComponentType::Button,
1212                    resolved: None,
1213                    values: Vec::new(),
1214                },
1215            ))),
1216            entitlements: Vec::new(),
1217            guild: None,
1218            guild_id: Some(Id::new(3)),
1219            guild_locale: None,
1220            id: Id::new(4),
1221            kind: InteractionType::MessageComponent,
1222            locale: Some("en-GB".to_owned()),
1223            member: None,
1224            message: Some(message()),
1225            token: String::from("token"),
1226            user: Some(User {
1227                accent_color: None,
1228                avatar: None,
1229                avatar_decoration: None,
1230                avatar_decoration_data: None,
1231                banner: None,
1232                bot: false,
1233                discriminator: 1,
1234                email: None,
1235                flags: None,
1236                global_name: Some("test".to_owned()),
1237                id: Id::new(2),
1238                locale: None,
1239                mfa_enabled: None,
1240                name: "twilight".to_owned(),
1241                premium_type: None,
1242                public_flags: None,
1243                system: None,
1244                verified: None,
1245            }),
1246        }
1247    }
1248
1249    /// Test that if a receiver drops their end, the result properly counts the
1250    /// statistic.
1251    #[tokio::test]
1252    async fn test_dropped() {
1253        let standby = Standby::new();
1254        let guild_id = Id::new(1);
1255
1256        {
1257            let _rx = standby.wait_for(guild_id, move |_: &Event| false);
1258        }
1259
1260        let results = standby.process(&Event::RoleDelete(RoleDelete {
1261            guild_id,
1262            role_id: Id::new(2),
1263        }));
1264
1265        assert_eq!(1, results.dropped());
1266        assert_eq!(0, results.fulfilled());
1267        assert_eq!(0, results.sent());
1268    }
1269
1270    /// Test that both events in guild 1 is matched but the event in guild 2 is
1271    /// not matched by testing the returned matched amount.
1272    #[tokio::test]
1273    async fn test_matched() {
1274        fn check(event: &Event, guild_id: Id<GuildMarker>) -> bool {
1275            matches!(event, Event::RoleDelete(e) if e.guild_id == guild_id)
1276        }
1277
1278        let standby = Standby::new();
1279        let guild_id_one = Id::new(1);
1280        let guild_id_two = Id::new(2);
1281        let _one = standby.wait_for(guild_id_one, move |event: &Event| {
1282            check(event, guild_id_one)
1283        });
1284        let _two = standby.wait_for(guild_id_one, move |event: &Event| {
1285            check(event, guild_id_one)
1286        });
1287        let _three = standby.wait_for(guild_id_two, move |event: &Event| {
1288            check(event, guild_id_two)
1289        });
1290
1291        let results = standby.process(&Event::RoleDelete(RoleDelete {
1292            guild_id: Id::new(1),
1293            role_id: Id::new(2),
1294        }));
1295
1296        assert_eq!(0, results.dropped());
1297        assert_eq!(2, results.fulfilled());
1298        assert_eq!(0, results.sent());
1299    }
1300
1301    /// Test that the [`ProcessResults::sent`] counter increments if a match is
1302    /// sent to it.
1303    #[tokio::test]
1304    async fn test_sent() {
1305        let standby = Standby::new();
1306        let guild_id = Id::new(1);
1307
1308        let _rx = standby.wait_for_stream(guild_id, move |_: &Event| true);
1309
1310        let results = standby.process(&Event::RoleDelete(RoleDelete {
1311            guild_id,
1312            role_id: Id::new(2),
1313        }));
1314
1315        assert_eq!(0, results.dropped());
1316        assert_eq!(0, results.fulfilled());
1317        assert_eq!(1, results.sent());
1318    }
1319
1320    /// Test basic functionality of the [`Standby::wait_for`] method.
1321    #[tokio::test]
1322    async fn test_wait_for() {
1323        let standby = Standby::new();
1324        let wait = standby.wait_for(
1325            Id::new(1),
1326            |event: &Event| matches!(event, Event::RoleDelete(e) if e.guild_id.get() == 1),
1327        );
1328        standby.process(&Event::RoleDelete(RoleDelete {
1329            guild_id: Id::new(1),
1330            role_id: Id::new(2),
1331        }));
1332
1333        assert_eq!(
1334            wait.await.unwrap(),
1335            Event::RoleDelete(RoleDelete {
1336                guild_id: Id::new(1),
1337                role_id: Id::new(2),
1338            })
1339        );
1340        assert!(standby.guilds.is_empty());
1341    }
1342
1343    /// Test basic functionality of the [`Standby::wait_for_stream`] method.
1344    #[tokio::test]
1345    async fn test_wait_for_stream() {
1346        let standby = Standby::new();
1347        let mut stream = standby.wait_for_stream(
1348            Id::new(1),
1349            |event: &Event| matches!(event, Event::RoleDelete(e) if e.guild_id.get() == 1),
1350        );
1351        standby.process(&Event::RoleDelete(RoleDelete {
1352            guild_id: Id::new(1),
1353            role_id: Id::new(2),
1354        }));
1355        standby.process(&Event::RoleDelete(RoleDelete {
1356            guild_id: Id::new(1),
1357            role_id: Id::new(3),
1358        }));
1359
1360        assert_eq!(
1361            stream.next().await,
1362            Some(Event::RoleDelete(RoleDelete {
1363                guild_id: Id::new(1),
1364                role_id: Id::new(2)
1365            }))
1366        );
1367        assert_eq!(
1368            stream.next().await,
1369            Some(Event::RoleDelete(RoleDelete {
1370                guild_id: Id::new(1),
1371                role_id: Id::new(3)
1372            }))
1373        );
1374        assert!(!standby.guilds.is_empty());
1375        drop(stream);
1376        standby.process(&Event::RoleDelete(RoleDelete {
1377            guild_id: Id::new(1),
1378            role_id: Id::new(4),
1379        }));
1380        assert!(standby.guilds.is_empty());
1381    }
1382
1383    /// Test basic functionality of the [`Standby::wait_for_event`] method.
1384    #[tokio::test]
1385    async fn test_wait_for_event() {
1386        let ready = Ready {
1387            application: PartialApplication {
1388                flags: ApplicationFlags::empty(),
1389                id: Id::new(1),
1390            },
1391            guilds: Vec::new(),
1392            resume_gateway_url: "wss://gateway.discord.gg".into(),
1393            session_id: String::new(),
1394            shard: Some(ShardId::new(5, 7)),
1395            user: CurrentUser {
1396                accent_color: None,
1397                avatar: None,
1398                banner: None,
1399                bot: false,
1400                discriminator: 1,
1401                email: None,
1402                id: Id::new(1),
1403                mfa_enabled: true,
1404                name: "twilight".to_owned(),
1405                verified: Some(false),
1406                premium_type: None,
1407                public_flags: None,
1408                flags: None,
1409                locale: None,
1410                global_name: None,
1411            },
1412            version: 6,
1413        };
1414        let event = Event::Ready(Box::new(ready));
1415
1416        let standby = Standby::new();
1417        let wait = standby.wait_for_event(|event: &Event| match event {
1418            Event::Ready(ready) => ready.shard.is_some_and(|id| id.number() == 5),
1419            _ => false,
1420        });
1421        assert!(!standby.events.is_empty());
1422        standby.process(&event);
1423
1424        assert_eq!(event, wait.await.unwrap());
1425        assert!(standby.events.is_empty());
1426    }
1427
1428    /// Test basic functionality of the [`Standby::wait_for_event_stream`]
1429    /// method.
1430    #[tokio::test]
1431    async fn test_wait_for_event_stream() {
1432        let standby = Standby::new();
1433        let mut stream =
1434            standby.wait_for_event_stream(|event: &Event| event.kind() == EventType::Resumed);
1435        standby.process(&Event::Resumed);
1436        assert_eq!(stream.next().await, Some(Event::Resumed));
1437        assert!(!standby.events.is_empty());
1438        drop(stream);
1439        standby.process(&Event::Resumed);
1440        assert!(standby.events.is_empty());
1441    }
1442
1443    /// Test basic functionality of the [`Standby::wait_for_message`] method.
1444    #[tokio::test]
1445    async fn test_wait_for_message() {
1446        let message = message();
1447        let event = Event::MessageCreate(Box::new(MessageCreate(message)));
1448
1449        let standby = Standby::new();
1450        let wait = standby.wait_for_message(Id::new(1), |message: &MessageCreate| {
1451            message.author.id.get() == 2
1452        });
1453        standby.process(&event);
1454
1455        assert_eq!(3, wait.await.map(|msg| msg.id.get()).unwrap());
1456        assert!(standby.messages.is_empty());
1457    }
1458
1459    /// Test basic functionality of the [`Standby::wait_for_message_stream`]
1460    /// method.
1461    #[tokio::test]
1462    async fn test_wait_for_message_stream() {
1463        let standby = Standby::new();
1464        let mut stream = standby.wait_for_message_stream(Id::new(1), |_: &MessageCreate| true);
1465        standby.process(&Event::MessageCreate(Box::new(MessageCreate(message()))));
1466        standby.process(&Event::MessageCreate(Box::new(MessageCreate(message()))));
1467
1468        assert!(stream.next().await.is_some());
1469        assert!(stream.next().await.is_some());
1470        drop(stream);
1471        assert_eq!(1, standby.messages.len());
1472        standby.process(&Event::MessageCreate(Box::new(MessageCreate(message()))));
1473        assert!(standby.messages.is_empty());
1474    }
1475
1476    /// Test basic functionality of the [`Standby::wait_for_reaction`] method.
1477    #[tokio::test]
1478    async fn test_wait_for_reaction() {
1479        let event = Event::ReactionAdd(Box::new(ReactionAdd(reaction())));
1480
1481        let standby = Standby::new();
1482        let wait = standby.wait_for_reaction(Id::new(4), |reaction: &ReactionAdd| {
1483            reaction.user_id.get() == 3
1484        });
1485
1486        standby.process(&event);
1487
1488        assert_eq!(
1489            Id::new(3),
1490            wait.await.map(|reaction| reaction.user_id).unwrap()
1491        );
1492        assert!(standby.reactions.is_empty());
1493    }
1494
1495    /// Test basic functionality of the [`Standby::wait_for_reaction_stream`]
1496    /// method.
1497    #[tokio::test]
1498    async fn test_wait_for_reaction_stream() {
1499        let standby = Standby::new();
1500        let mut stream = standby.wait_for_reaction_stream(Id::new(4), |_: &ReactionAdd| true);
1501        standby.process(&Event::ReactionAdd(Box::new(ReactionAdd(reaction()))));
1502        standby.process(&Event::ReactionAdd(Box::new(ReactionAdd(reaction()))));
1503
1504        assert!(stream.next().await.is_some());
1505        assert!(stream.next().await.is_some());
1506        drop(stream);
1507        assert_eq!(1, standby.reactions.len());
1508        standby.process(&Event::ReactionAdd(Box::new(ReactionAdd(reaction()))));
1509        assert!(standby.reactions.is_empty());
1510    }
1511
1512    /// Assert that Standby processing some non-matching events will not affect
1513    /// the matching of a later event.
1514    #[tokio::test]
1515    async fn test_wait_for_component() {
1516        let event = Event::InteractionCreate(Box::new(InteractionCreate(button())));
1517
1518        let standby = Standby::new();
1519        let wait = standby.wait_for_component(Id::new(3), |button: &Interaction| {
1520            button.author_id() == Some(Id::new(2))
1521        });
1522
1523        standby.process(&event);
1524
1525        assert_eq!(
1526            Some(Id::new(2)),
1527            wait.await.map(|button| button.author_id()).unwrap()
1528        );
1529        assert!(standby.components.is_empty());
1530    }
1531
1532    #[tokio::test]
1533    async fn test_wait_for_component_stream() {
1534        let standby = Standby::new();
1535        let mut stream = standby.wait_for_component_stream(Id::new(3), |_: &Interaction| true);
1536        standby.process(&Event::InteractionCreate(Box::new(InteractionCreate(
1537            button(),
1538        ))));
1539        standby.process(&Event::InteractionCreate(Box::new(InteractionCreate(
1540            button(),
1541        ))));
1542
1543        assert!(stream.next().await.is_some());
1544        assert!(stream.next().await.is_some());
1545        drop(stream);
1546        assert_eq!(1, standby.components.len());
1547        standby.process(&Event::InteractionCreate(Box::new(InteractionCreate(
1548            button(),
1549        ))));
1550        assert!(standby.components.is_empty());
1551    }
1552
1553    #[tokio::test]
1554    async fn test_handles_wrong_events() {
1555        let standby = Standby::new();
1556        let wait = standby.wait_for_event(|event: &Event| event.kind() == EventType::Resumed);
1557
1558        standby.process(&Event::GatewayHeartbeatAck);
1559        standby.process(&Event::GatewayHeartbeatAck);
1560        standby.process(&Event::Resumed);
1561
1562        assert_eq!(Event::Resumed, wait.await.unwrap());
1563    }
1564
1565    /// Test that generic event handlers will be given the opportunity to
1566    /// process events with specific handlers (message creates, reaction adds)
1567    /// and guild events. Similarly, guild handlers should be able to process
1568    /// specific handler events as well.
1569    #[tokio::test]
1570    async fn test_process_nonspecific_handling() {
1571        let standby = Standby::new();
1572
1573        // generic event handler gets message creates
1574        let wait = standby.wait_for_event(|event: &Event| event.kind() == EventType::MessageCreate);
1575        standby.process(&Event::MessageCreate(Box::new(MessageCreate(message()))));
1576        assert!(matches!(wait.await, Ok(Event::MessageCreate(_))));
1577
1578        // generic event handler gets reaction adds
1579        let wait = standby.wait_for_event(|event: &Event| event.kind() == EventType::ReactionAdd);
1580        standby.process(&Event::ReactionAdd(Box::new(ReactionAdd(reaction()))));
1581        assert!(matches!(wait.await, Ok(Event::ReactionAdd(_))));
1582
1583        // generic event handler gets other guild events
1584        let wait = standby.wait_for_event(|event: &Event| event.kind() == EventType::RoleDelete);
1585        standby.process(&Event::RoleDelete(RoleDelete {
1586            guild_id: Id::new(1),
1587            role_id: Id::new(2),
1588        }));
1589        assert!(matches!(wait.await, Ok(Event::RoleDelete(_))));
1590
1591        // guild event handler gets message creates or reaction events
1592        let wait = standby.wait_for(Id::new(1), |event: &Event| {
1593            event.kind() == EventType::ReactionAdd
1594        });
1595        standby.process(&Event::ReactionAdd(Box::new(ReactionAdd(reaction()))));
1596        assert!(matches!(wait.await, Ok(Event::ReactionAdd(_))));
1597    }
1598}