twilight_standby/lib.rs
1#![doc = include_str!("../README.md")]
2#![warn(
3 clippy::missing_const_for_fn,
4 clippy::missing_docs_in_private_items,
5 clippy::pedantic,
6 missing_docs,
7 unsafe_code
8)]
9#![allow(
10 clippy::module_name_repetitions,
11 clippy::must_use_candidate,
12 clippy::unnecessary_wraps
13)]
14
15pub mod future;
16
17use self::future::{
18 WaitForComponentFuture, WaitForComponentStream, WaitForEventFuture, WaitForEventStream,
19 WaitForGuildEventFuture, WaitForGuildEventStream, WaitForMessageFuture, WaitForMessageStream,
20 WaitForReactionFuture, WaitForReactionStream,
21};
22use dashmap::DashMap;
23use std::{
24 fmt::{Debug, Display, Formatter, Result as FmtResult},
25 hash::Hash,
26 sync::atomic::{AtomicU64, Ordering},
27};
28use tokio::sync::{
29 mpsc::{self, UnboundedReceiver, UnboundedSender as MpscSender},
30 oneshot::{self, Receiver, Sender as OneshotSender},
31};
32use twilight_model::{
33 application::interaction::{Interaction, InteractionType},
34 gateway::{
35 event::Event,
36 payload::incoming::{MessageCreate, ReactionAdd},
37 },
38 id::{
39 Id,
40 marker::{ChannelMarker, GuildMarker, MessageMarker},
41 },
42};
43
44/// Map keyed by an ID - such as a channel ID or message ID - storing a list of
45/// bystanders.
46type BystanderMap<K, V> = DashMap<K, Vec<Bystander<V>>>;
47
48/// Sender to a caller that may be for a future bystander or a stream bystander.
49#[derive(Debug)]
50enum Sender<E> {
51 /// Bystander is a future and the sender is a oneshot.
52 Future(OneshotSender<E>),
53 /// Bystander is a stream and the sender is an MPSC.
54 Stream(MpscSender<E>),
55}
56
57impl<E> Sender<E> {
58 /// Whether the channel is closed.
59 fn is_closed(&self) -> bool {
60 match self {
61 Self::Future(sender) => sender.is_closed(),
62 Self::Stream(sender) => sender.is_closed(),
63 }
64 }
65}
66
67/// Registration for a caller to wait for an event based on a predicate
68/// function.
69struct Bystander<T> {
70 /// Predicate check to perform on an event.
71 func: Box<dyn Fn(&T) -> bool + Send + Sync>,
72 /// [`Sender::Future`]s consume themselves once upon sending so the sender
73 /// needs to be able to be taken out separately.
74 sender: Option<Sender<T>>,
75}
76
77impl<T: Debug> Debug for Bystander<T> {
78 fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
79 f.debug_struct("Bystander")
80 .field("func", &"<dyn Fn(&T) -> bool>")
81 .field("sender", &self.sender)
82 .finish()
83 }
84}
85
86/// The `Standby` struct, used by the main event loop to process events and by
87/// tasks to wait for an event.
88///
89/// Refer to the crate-level documentation for more information.
90///
91/// # Using Standby in multiple tasks
92///
93/// To use a Standby instance in multiple tasks, consider wrapping it in an
94/// [`std::sync::Arc`] or [`std::rc::Rc`].
95///
96/// # Examples
97///
98/// ## Timeouts
99///
100/// Futures can be timed out by passing the future returned by Standby to
101/// functions such as [`tokio::time::timeout`]:
102///
103/// ```rust,no_run
104/// # #[tokio::main] async fn main() -> Result<(), Box<dyn std::error::Error>> {
105/// use std::time::Duration;
106/// use twilight_model::gateway::event::{Event, EventType};
107/// use twilight_standby::Standby;
108///
109/// let standby = Standby::new();
110/// let future = standby.wait_for_event(|event: &Event| event.kind() == EventType::Ready);
111/// let event = tokio::time::timeout(Duration::from_secs(1), future).await?;
112/// # Ok(()) }
113/// ```
114///
115/// [`tokio::time::timeout`]: https://docs.rs/tokio/latest/tokio/time/fn.timeout.html
116#[derive(Debug, Default)]
117pub struct Standby {
118 /// List of component bystanders where the ID of the message is known
119 /// beforehand.
120 components: DashMap<Id<MessageMarker>, Vec<Bystander<Interaction>>>,
121 /// Bystanders for any event that may not be in any particular guild.
122 ///
123 /// The key is generated via [`event_counter`].
124 ///
125 /// [`event_counter`]: Self::event_counter
126 events: DashMap<u64, Bystander<Event>>,
127 /// Event counter to be used as the key of [`events`].
128 ///
129 /// [`events`]: Self::events
130 event_counter: AtomicU64,
131 /// List of bystanders where the ID of the guild is known beforehand.
132 guilds: DashMap<Id<GuildMarker>, Vec<Bystander<Event>>>,
133 /// List of message bystanders where the ID of the channel is known
134 /// beforehand.
135 messages: DashMap<Id<ChannelMarker>, Vec<Bystander<MessageCreate>>>,
136 /// List of reaction bystanders where the ID of the message is known
137 /// beforehand.
138 reactions: DashMap<Id<MessageMarker>, Vec<Bystander<ReactionAdd>>>,
139}
140
141impl Standby {
142 /// Create a new instance of `Standby`.
143 ///
144 /// Once a `Standby` has been created it must process gateway events via
145 /// [`process`]. Awaiting an event can start via methods such as
146 /// [`wait_for`] and [`wait_for_message_stream`].
147 ///
148 /// [`process`]: Self::process
149 /// [`wait_for`]: Self::wait_for
150 /// [`wait_for_message_stream`]: Self::wait_for_message_stream
151 #[must_use = "must process events to be useful"]
152 pub fn new() -> Self {
153 Self::default()
154 }
155
156 /// Process an event, calling any bystanders that might be waiting on it.
157 ///
158 /// Returns statistics about matched [`Standby`] calls and how they were
159 /// processed. For example, by using [`ProcessResults::matched`] you can
160 /// determine how many calls were sent an event.
161 ///
162 /// When a bystander checks to see if an event is what it's waiting for, it
163 /// will receive the event by cloning it.
164 ///
165 /// This function must be called when events are received in order for
166 /// futures returned by methods to fulfill.
167 pub fn process(&self, event: &Event) -> ProcessResults {
168 tracing::trace!(event_type = ?event.kind(), ?event, "processing event");
169
170 let mut completions = ProcessResults::new();
171
172 match event {
173 Event::InteractionCreate(e) => {
174 if e.kind == InteractionType::MessageComponent
175 && let Some(message) = &e.message
176 {
177 completions.add_with(&Self::process_specific_event(
178 &self.components,
179 message.id,
180 e,
181 ));
182 }
183 }
184 Event::MessageCreate(e) => {
185 completions.add_with(&Self::process_specific_event(
186 &self.messages,
187 e.0.channel_id,
188 e,
189 ));
190 }
191 Event::ReactionAdd(e) => {
192 completions.add_with(&Self::process_specific_event(
193 &self.reactions,
194 e.0.message_id,
195 e,
196 ));
197 }
198 _ => {}
199 }
200
201 if let Some(guild_id) = event.guild_id() {
202 completions.add_with(&Self::process_specific_event(&self.guilds, guild_id, event));
203 }
204
205 completions.add_with(&Self::process_event(&self.events, event));
206
207 completions
208 }
209
210 /// Wait for an event in a certain guild.
211 ///
212 /// To wait for multiple guild events matching the given predicate use
213 /// [`wait_for_stream`].
214 ///
215 /// # Examples
216 ///
217 /// Wait for a [`BanAdd`] event in guild 123:
218 ///
219 /// ```no_run
220 /// # #[tokio::main]
221 /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
222 /// use twilight_model::{
223 /// gateway::event::{Event, EventType},
224 /// id::Id,
225 /// };
226 /// use twilight_standby::Standby;
227 ///
228 /// let standby = Standby::new();
229 ///
230 /// let guild_id = Id::new(123);
231 ///
232 /// let reaction = standby
233 /// .wait_for(guild_id, |event: &Event| event.kind() == EventType::BanAdd)
234 /// .await?;
235 /// # Ok(()) }
236 /// ```
237 ///
238 /// # Errors
239 ///
240 /// The returned future resolves to a [`Canceled`] error if the associated
241 /// [`Standby`] instance is dropped.
242 ///
243 /// [`BanAdd`]: twilight_model::gateway::payload::incoming::BanAdd
244 /// [`Canceled`]: future::Canceled
245 /// [`wait_for_stream`]: Self::wait_for_stream
246 pub fn wait_for<F: Fn(&Event) -> bool + Send + Sync + 'static>(
247 &self,
248 guild_id: Id<GuildMarker>,
249 check: impl Into<Box<F>>,
250 ) -> WaitForGuildEventFuture {
251 tracing::trace!(%guild_id, "waiting for event in guild");
252
253 WaitForGuildEventFuture {
254 rx: Self::insert_future(&self.guilds, guild_id, check),
255 }
256 }
257
258 /// Wait for a stream of events in a certain guild.
259 ///
260 /// To wait for only one guild event matching the given predicate use
261 /// [`wait_for`].
262 ///
263 /// # Examples
264 ///
265 /// Wait for multiple [`BanAdd`] events in guild 123:
266 ///
267 /// ```no_run
268 /// # #[tokio::main]
269 /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
270 /// use tokio_stream::StreamExt;
271 /// use twilight_model::{
272 /// gateway::event::{Event, EventType},
273 /// id::Id,
274 /// };
275 /// use twilight_standby::Standby;
276 ///
277 /// let standby = Standby::new();
278 ///
279 /// let guild_id = Id::new(123);
280 ///
281 /// let mut stream =
282 /// standby.wait_for_stream(guild_id, |event: &Event| event.kind() == EventType::BanAdd);
283 ///
284 /// while let Some(event) = stream.next().await {
285 /// if let Event::BanAdd(ban) = event {
286 /// println!("user {} was banned in guild {}", ban.user.id, ban.guild_id);
287 /// }
288 /// }
289 /// # Ok(()) }
290 /// ```
291 ///
292 /// # Errors
293 ///
294 /// The returned stream ends when the associated [`Standby`] instance is
295 /// dropped.
296 ///
297 /// [`BanAdd`]: twilight_model::gateway::payload::incoming::BanAdd
298 /// [`wait_for`]: Self::wait_for
299 pub fn wait_for_stream<F: Fn(&Event) -> bool + Send + Sync + 'static>(
300 &self,
301 guild_id: Id<GuildMarker>,
302 check: impl Into<Box<F>>,
303 ) -> WaitForGuildEventStream {
304 tracing::trace!(%guild_id, "waiting for event in guild");
305
306 WaitForGuildEventStream {
307 rx: Self::insert_stream(&self.guilds, guild_id, check),
308 }
309 }
310
311 /// Wait for an event not in a certain guild. This must be filtered by an
312 /// event type.
313 ///
314 /// To wait for multiple events matching the given predicate use
315 /// [`wait_for_event_stream`].
316 ///
317 /// # Examples
318 ///
319 /// Wait for a [`Ready`] event for shard 5:
320 ///
321 /// ```no_run
322 /// # #[tokio::main]
323 /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
324 /// use twilight_model::gateway::event::{Event, EventType};
325 /// use twilight_standby::Standby;
326 ///
327 /// let standby = Standby::new();
328 ///
329 /// let ready = standby
330 /// .wait_for_event(|event: &Event| {
331 /// if let Event::Ready(ready) = event {
332 /// ready.shard.map_or(false, |id| id.number() == 5)
333 /// } else {
334 /// false
335 /// }
336 /// })
337 /// .await?;
338 /// # Ok(()) }
339 /// ```
340 ///
341 /// # Errors
342 ///
343 /// The returned future resolves to a [`Canceled`] error if the associated
344 /// [`Standby`] instance is dropped.
345 ///
346 /// [`Canceled`]: future::Canceled
347 /// [`Ready`]: twilight_model::gateway::payload::incoming::Ready
348 /// [`wait_for_event_stream`]: Self::wait_for_event_stream
349 pub fn wait_for_event<F: Fn(&Event) -> bool + Send + Sync + 'static>(
350 &self,
351 check: impl Into<Box<F>>,
352 ) -> WaitForEventFuture {
353 tracing::trace!("waiting for event");
354
355 let (tx, rx) = oneshot::channel();
356
357 self.events.insert(
358 self.next_event_id(),
359 Bystander {
360 func: check.into(),
361 sender: Some(Sender::Future(tx)),
362 },
363 );
364
365 WaitForEventFuture { rx }
366 }
367
368 /// Wait for a stream of events not in a certain guild. This must be
369 /// filtered by an event type.
370 ///
371 /// To wait for only one event matching the given predicate use
372 /// [`wait_for_event`].
373 ///
374 /// # Examples
375 ///
376 /// Wait for multiple [`Ready`] events on shard 5:
377 ///
378 /// ```no_run
379 /// # #[tokio::main]
380 /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
381 /// use tokio_stream::StreamExt;
382 /// use twilight_model::gateway::event::{Event, EventType};
383 /// use twilight_standby::Standby;
384 ///
385 /// let standby = Standby::new();
386 ///
387 /// let mut events = standby.wait_for_event_stream(|event: &Event| {
388 /// if let Event::Ready(ready) = event {
389 /// ready.shard.map_or(false, |id| id.number() == 5)
390 /// } else {
391 /// false
392 /// }
393 /// });
394 ///
395 /// while let Some(event) = events.next().await {
396 /// println!("got event with type {:?}", event.kind());
397 /// }
398 /// # Ok(()) }
399 /// ```
400 ///
401 /// # Errors
402 ///
403 /// The returned stream ends when the associated [`Standby`] instance is
404 /// dropped.
405 ///
406 /// [`Ready`]: twilight_model::gateway::payload::incoming::Ready
407 /// [`wait_for_event`]: Self::wait_for_event
408 pub fn wait_for_event_stream<F: Fn(&Event) -> bool + Send + Sync + 'static>(
409 &self,
410 check: impl Into<Box<F>>,
411 ) -> WaitForEventStream {
412 tracing::trace!("waiting for event");
413
414 let (tx, rx) = mpsc::unbounded_channel();
415
416 self.events.insert(
417 self.next_event_id(),
418 Bystander {
419 func: check.into(),
420 sender: Some(Sender::Stream(tx)),
421 },
422 );
423
424 WaitForEventStream { rx }
425 }
426
427 /// Wait for a message in a certain channel.
428 ///
429 /// To wait for multiple messages matching the given predicate use
430 /// [`wait_for_message_stream`].
431 ///
432 /// # Examples
433 ///
434 /// Wait for a message in channel 123 by user 456 with the content "test":
435 ///
436 /// ```no_run
437 /// # #[tokio::main]
438 /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
439 /// use twilight_model::{gateway::payload::incoming::MessageCreate, id::Id};
440 /// use twilight_standby::Standby;
441 ///
442 /// let standby = Standby::new();
443 ///
444 /// let author_id = Id::new(456);
445 /// let channel_id = Id::new(123);
446 ///
447 /// let message = standby
448 /// .wait_for_message(channel_id, move |event: &MessageCreate| {
449 /// event.author.id == author_id && event.content == "test"
450 /// })
451 /// .await?;
452 /// # Ok(()) }
453 /// ```
454 ///
455 /// # Errors
456 ///
457 /// The returned future resolves to a [`Canceled`] error if the associated
458 /// [`Standby`] instance is dropped.
459 ///
460 /// [`Canceled`]: future::Canceled
461 /// [`wait_for_message_stream`]: Self::wait_for_message_stream
462 pub fn wait_for_message<F: Fn(&MessageCreate) -> bool + Send + Sync + 'static>(
463 &self,
464 channel_id: Id<ChannelMarker>,
465 check: impl Into<Box<F>>,
466 ) -> WaitForMessageFuture {
467 tracing::trace!(%channel_id, "waiting for message in channel");
468
469 WaitForMessageFuture {
470 rx: Self::insert_future(&self.messages, channel_id, check),
471 }
472 }
473
474 /// Wait for a stream of message in a certain channel.
475 ///
476 /// To wait for only one message matching the given predicate use
477 /// [`wait_for_message`].
478 ///
479 /// # Examples
480 ///
481 /// Wait for multiple messages in channel 123 by user 456 with the content
482 /// "test":
483 ///
484 /// ```no_run
485 /// # #[tokio::main]
486 /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
487 /// use tokio_stream::StreamExt;
488 /// use twilight_model::{gateway::payload::incoming::MessageCreate, id::Id};
489 /// use twilight_standby::Standby;
490 ///
491 /// let standby = Standby::new();
492 ///
493 /// let author_id = Id::new(456);
494 /// let channel_id = Id::new(123);
495 ///
496 /// let mut messages = standby.wait_for_message_stream(channel_id, move |event: &MessageCreate| {
497 /// event.author.id == author_id && event.content == "test"
498 /// });
499 ///
500 /// while let Some(message) = messages.next().await {
501 /// println!("got message by {}", message.author.id);
502 /// }
503 /// # Ok(()) }
504 /// ```
505 ///
506 /// # Errors
507 ///
508 /// The returned stream ends when the associated [`Standby`] instance is
509 /// dropped.
510 ///
511 /// [`wait_for_message`]: Self::wait_for_message
512 pub fn wait_for_message_stream<F: Fn(&MessageCreate) -> bool + Send + Sync + 'static>(
513 &self,
514 channel_id: Id<ChannelMarker>,
515 check: impl Into<Box<F>>,
516 ) -> WaitForMessageStream {
517 tracing::trace!(%channel_id, "waiting for message in channel");
518
519 WaitForMessageStream {
520 rx: Self::insert_stream(&self.messages, channel_id, check),
521 }
522 }
523
524 /// Wait for a reaction on a certain message.
525 ///
526 /// To wait for multiple reactions matching the given predicate use
527 /// [`wait_for_reaction_stream`].
528 ///
529 /// # Examples
530 ///
531 /// Wait for a reaction on message 123 by user 456:
532 ///
533 /// ```no_run
534 /// # #[tokio::main]
535 /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
536 /// use twilight_model::{gateway::payload::incoming::ReactionAdd, id::Id};
537 /// use twilight_standby::Standby;
538 ///
539 /// let standby = Standby::new();
540 ///
541 /// let message_id = Id::new(123);
542 /// let user_id = Id::new(456);
543 ///
544 /// let reaction = standby
545 /// .wait_for_reaction(message_id, move |event: &ReactionAdd| {
546 /// event.user_id == user_id
547 /// })
548 /// .await?;
549 /// # Ok(()) }
550 /// ```
551 ///
552 /// # Errors
553 ///
554 /// The returned future resolves to a [`Canceled`] error if the associated
555 /// [`Standby`] instance is dropped.
556 ///
557 /// [`Canceled`]: future::Canceled
558 /// [`wait_for_reaction_stream`]: Self::wait_for_reaction_stream
559 pub fn wait_for_reaction<F: Fn(&ReactionAdd) -> bool + Send + Sync + 'static>(
560 &self,
561 message_id: Id<MessageMarker>,
562 check: impl Into<Box<F>>,
563 ) -> WaitForReactionFuture {
564 tracing::trace!(%message_id, "waiting for reaction on message");
565
566 WaitForReactionFuture {
567 rx: Self::insert_future(&self.reactions, message_id, check),
568 }
569 }
570
571 /// Wait for a stream of reactions on a certain message.
572 ///
573 /// To wait for only one reaction matching the given predicate use
574 /// [`wait_for_reaction`].
575 ///
576 /// # Examples
577 ///
578 /// Wait for multiple reactions on message 123 with unicode reaction "🤠":
579 ///
580 /// ```no_run
581 /// # #[tokio::main]
582 /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
583 /// use tokio_stream::StreamExt;
584 /// use twilight_model::{
585 /// channel::message::EmojiReactionType,
586 /// gateway::payload::incoming::ReactionAdd,
587 /// id::Id,
588 /// };
589 /// use twilight_standby::Standby;
590 ///
591 /// let standby = Standby::new();
592 ///
593 /// let message_id = Id::new(123);
594 ///
595 /// let mut reactions = standby.wait_for_reaction_stream(message_id, |event: &ReactionAdd| {
596 /// matches!(&event.emoji, EmojiReactionType::Unicode { name } if name == "🤠")
597 /// });
598 ///
599 /// while let Some(reaction) = reactions.next().await {
600 /// println!("got a reaction by {}", reaction.user_id);
601 /// }
602 /// # Ok(()) }
603 /// ```
604 ///
605 /// # Errors
606 ///
607 /// The returned stream ends when the associated [`Standby`] instance is
608 /// dropped.
609 ///
610 /// [`wait_for_reaction`]: Self::wait_for_reaction
611 pub fn wait_for_reaction_stream<F: Fn(&ReactionAdd) -> bool + Send + Sync + 'static>(
612 &self,
613 message_id: Id<MessageMarker>,
614 check: impl Into<Box<F>>,
615 ) -> WaitForReactionStream {
616 tracing::trace!(%message_id, "waiting for reaction on message");
617
618 WaitForReactionStream {
619 rx: Self::insert_stream(&self.reactions, message_id, check),
620 }
621 }
622
623 /// Wait for a component on a certain message.
624 ///
625 /// Returns a `Canceled` error if the `Standby` struct was dropped.
626 ///
627 /// If you need to wait for multiple components matching the given predicate,
628 /// use [`wait_for_component_stream`].
629 ///
630 /// # Examples
631 ///
632 /// Wait for a component on message 123 by user 456:
633 ///
634 /// ```no_run
635 /// # #[tokio::main] async fn main() -> Result<(), Box<dyn std::error::Error>> {
636 /// use twilight_model::{application::interaction::Interaction, id::Id};
637 /// use twilight_standby::Standby;
638 ///
639 /// let standby = Standby::new();
640 /// let message_id = Id::new(123);
641 ///
642 /// let component = standby
643 /// .wait_for_component(message_id, |event: &Interaction| {
644 /// event.author_id() == Some(Id::new(456))
645 /// })
646 /// .await?;
647 /// # Ok(()) }
648 /// ```
649 ///
650 /// [`wait_for_component_stream`]: Self::wait_for_component_stream
651 pub fn wait_for_component<F: Fn(&Interaction) -> bool + Send + Sync + 'static>(
652 &self,
653 message_id: Id<MessageMarker>,
654 check: impl Into<Box<F>>,
655 ) -> WaitForComponentFuture {
656 tracing::trace!(%message_id, "waiting for component on message");
657
658 WaitForComponentFuture {
659 rx: Self::insert_future(&self.components, message_id, check),
660 }
661 }
662
663 /// Wait for a stream of components on a certain message.
664 ///
665 /// Returns a `Canceled` error if the `Standby` struct was dropped.
666 ///
667 /// If you need to wait for only one component matching the given predicate,
668 /// use [`wait_for_component`].
669 ///
670 /// # Examples
671 ///
672 /// Wait for multiple button components on message 123 with a `custom_id` of
673 /// "Click":
674 ///
675 /// ```no_run
676 /// # #[tokio::main] async fn main() -> Result<(), Box<dyn std::error::Error>> {
677 /// use tokio_stream::StreamExt;
678 /// use twilight_model::{
679 /// application::interaction::{Interaction, InteractionData},
680 /// id::Id,
681 /// };
682 /// use twilight_standby::Standby;
683 ///
684 /// let standby = Standby::new();
685 /// let message_id = Id::new(123);
686 ///
687 /// let mut components = standby.wait_for_component_stream(message_id, |event: &Interaction| {
688 /// if let Some(InteractionData::MessageComponent(data)) = &event.data {
689 /// data.custom_id == "Click".to_string()
690 /// } else {
691 /// false
692 /// }
693 /// });
694 ///
695 /// while let Some(component) = components.next().await {
696 /// println!("got a component by {}", component.author_id().unwrap());
697 /// }
698 /// # Ok(()) }
699 /// ```
700 ///
701 /// [`wait_for_component`]: Self::wait_for_component
702 pub fn wait_for_component_stream<F: Fn(&Interaction) -> bool + Send + Sync + 'static>(
703 &self,
704 message_id: Id<MessageMarker>,
705 check: impl Into<Box<F>>,
706 ) -> WaitForComponentStream {
707 tracing::trace!(%message_id, "waiting for component on message");
708
709 WaitForComponentStream {
710 rx: Self::insert_stream(&self.components, message_id, check),
711 }
712 }
713
714 /// Next event ID in [`Standby::event_counter`].
715 fn next_event_id(&self) -> u64 {
716 self.event_counter.fetch_add(1, Ordering::SeqCst)
717 }
718
719 /// Append a new future bystander into a map according to the ID.
720 fn insert_future<F: Fn(&V) -> bool + Send + Sync + 'static, K: Eq + Hash, V>(
721 map: &BystanderMap<K, V>,
722 id: K,
723 check: impl Into<Box<F>>,
724 ) -> Receiver<V> {
725 let (tx, rx) = oneshot::channel();
726
727 let mut entry = map.entry(id).or_default();
728 entry.push(Bystander {
729 func: check.into(),
730 sender: Some(Sender::Future(tx)),
731 });
732
733 rx
734 }
735
736 /// Append a new stream bystander into a map according to the ID.
737 fn insert_stream<F: Fn(&V) -> bool + Send + Sync + 'static, K: Eq + Hash, V>(
738 map: &BystanderMap<K, V>,
739 id: K,
740 check: impl Into<Box<F>>,
741 ) -> UnboundedReceiver<V> {
742 let (tx, rx) = mpsc::unbounded_channel();
743
744 let mut entry = map.entry(id).or_default();
745 entry.push(Bystander {
746 func: check.into(),
747 sender: Some(Sender::Stream(tx)),
748 });
749
750 rx
751 }
752
753 /// Process a general event that is not of any particular type or in any
754 /// particular guild.
755 #[tracing::instrument(level = "trace")]
756 fn process_event<K: Debug + Display + Eq + Hash + PartialEq + 'static, V: Clone + Debug>(
757 map: &DashMap<K, Bystander<V>>,
758 event: &V,
759 ) -> ProcessResults {
760 tracing::trace!(?event, "processing event");
761
762 let mut results = ProcessResults::new();
763
764 map.retain(|id, bystander| {
765 let result = Self::bystander_process(bystander, event);
766 results.handle(result);
767
768 tracing::trace!(bystander_id = %id, ?result, "event bystander processed");
769
770 // We want to retain bystanders that are *incomplete* and remove
771 // bystanders that are *complete*.
772 !result.is_complete()
773 });
774
775 results
776 }
777
778 /// Process a general event that is either of a particular type or in a
779 /// particular guild.
780 #[allow(clippy::needless_pass_by_value)]
781 #[tracing::instrument(level = "trace")]
782 fn process_specific_event<
783 K: Debug + Display + Eq + Hash + PartialEq + 'static,
784 V: Clone + Debug,
785 >(
786 map: &DashMap<K, Vec<Bystander<V>>>,
787 guild_id: K,
788 event: &V,
789 ) -> ProcessResults {
790 // Iterate over a guild's bystanders and mark it for removal if there
791 // are no bystanders remaining.
792 let (remove_guild, results) = if let Some(mut bystanders) = map.get_mut(&guild_id) {
793 let results = Self::bystander_iter(&mut bystanders, event);
794
795 (bystanders.is_empty(), results)
796 } else {
797 tracing::trace!(%guild_id, "guild has no event bystanders");
798
799 return ProcessResults::new();
800 };
801
802 if remove_guild {
803 tracing::trace!(%guild_id, "removing guild from map");
804
805 map.remove(&guild_id);
806 }
807
808 results
809 }
810
811 /// Iterate over bystanders and remove the ones that match the predicate.
812 #[tracing::instrument(level = "trace")]
813 fn bystander_iter<E: Clone + Debug>(
814 bystanders: &mut Vec<Bystander<E>>,
815 event: &E,
816 ) -> ProcessResults {
817 tracing::trace!(?bystanders, "iterating over bystanders");
818
819 // Iterate over the list of bystanders by using an index and manually
820 // indexing in to the list.
821 //
822 // # Logic
823 //
824 // In each iteration we decide whether to retain a bystander: if we do
825 // then we can increment our index and move on, but if we opt to instead
826 // remove it then we do so and don't increment the index. The reason we
827 // don't increment the index is because when we remove an element the
828 // index does not become empty and instead everything to the right is
829 // shifted to the left, illustrated as such:
830 //
831 // |---|
832 // v |
833 // A - B - C - D
834 // | ^ |
835 // | |---|
836 // |
837 // Remove B
838 //
839 // After: A - C - D
840 //
841 // # Reasons not to use alternatives
842 //
843 // **`Vec::retain`** we need to mutate the entries in order to take out
844 // the sender and `Vec::retain` only gives us immutable references.
845 //
846 // A form of enumeration can't be used because sometimes the index
847 // doesn't advance; iterators would continue to provide incrementing
848 // enumeration indexes while we sometimes want to reuse an index.
849 let mut index = 0;
850 let mut results = ProcessResults::new();
851
852 while index < bystanders.len() {
853 tracing::trace!(%index, "checking bystander");
854
855 let status = Self::bystander_process(&mut bystanders[index], event);
856 results.handle(status);
857
858 tracing::trace!(%index, ?status, "checked bystander");
859
860 if status.is_complete() {
861 bystanders.remove(index);
862 } else {
863 index += 1;
864 }
865 }
866
867 results
868 }
869
870 /// Process a bystander, sending the event if the sender is active and the
871 /// predicate matches. Returns whether the bystander has fulfilled.
872 ///
873 /// Returns whether the bystander is fulfilled; if the bystander has been
874 /// fulfilled then the channel is now closed.
875 #[tracing::instrument(level = "trace")]
876 fn bystander_process<T: Clone + Debug>(
877 bystander: &mut Bystander<T>,
878 event: &T,
879 ) -> ProcessStatus {
880 // We need to take the sender out because `OneshotSender`s consume
881 // themselves when calling `OneshotSender::send`.
882 let Some(sender) = bystander.sender.take() else {
883 tracing::trace!("bystander has no sender, indicating for removal");
884
885 return ProcessStatus::AlreadyComplete;
886 };
887
888 // The channel may have closed due to the receiver dropping their end,
889 // in which case we can say we're done.
890 if sender.is_closed() {
891 tracing::trace!("bystander's rx dropped, indicating for removal");
892
893 return ProcessStatus::Dropped;
894 }
895
896 // Lastly check to see if the predicate matches the event. If it doesn't
897 // then we can short-circuit.
898 if !(bystander.func)(event) {
899 tracing::trace!("bystander check doesn't match, not removing");
900
901 // Put the sender back into its bystander since we'll still need it
902 // next time around.
903 bystander.sender.replace(sender);
904
905 return ProcessStatus::Skip;
906 }
907
908 match sender {
909 Sender::Future(tx) => {
910 // We don't care if the event successfully sends or not since
911 // we're going to be tossing out the bystander anyway.
912 drop(tx.send(event.clone()));
913
914 tracing::trace!("bystander matched event, indicating for removal");
915
916 ProcessStatus::SentFuture
917 }
918 Sender::Stream(tx) => {
919 // If we can send an event to the receiver and the channel is
920 // still open then we need to retain the bystander, otherwise we
921 // need to mark it for removal.
922 if tx.send(event.clone()).is_ok() {
923 tracing::trace!("bystander is a stream, retaining in map");
924
925 bystander.sender.replace(Sender::Stream(tx));
926
927 ProcessStatus::SentStream
928 } else {
929 ProcessStatus::Dropped
930 }
931 }
932 }
933 }
934}
935/// Number of [`Standby`] calls that were completed.
936#[derive(Clone, Debug, Eq, Hash, PartialEq)]
937pub struct ProcessResults {
938 /// Number of bystanders that were dropped due to the receiving end
939 /// dropping.
940 dropped: usize,
941 /// Number of future bystanders that were open and were sent an event.
942 fulfilled: usize,
943 /// Number of stream bystanders that were open and were sent an event.
944 sent: usize,
945}
946
947impl ProcessResults {
948 /// Create a new set of zeroed out results.
949 const fn new() -> Self {
950 Self {
951 dropped: 0,
952 fulfilled: 0,
953 sent: 0,
954 }
955 }
956
957 /// Number of [`Standby`] calls where the receiver had already dropped their
958 /// end.
959 ///
960 /// This may happen when a caller calls into [`Standby`] but then times out
961 /// or otherwise cancels their request.
962 pub const fn dropped(&self) -> usize {
963 self.dropped
964 }
965
966 /// Number of [`Standby`] calls that were fulfilled.
967 ///
968 /// Calls for futures via methods such as [`Standby::wait_for`] will be
969 /// marked as fulfilled once matched and an event is sent over the channel.
970 ///
971 /// **Caveat**: although an event has been sent over the channel to the
972 /// receiver it is not guaranteed whether the receiver end actually received
973 /// it; the receiver end may drop shortly after an event is sent. In this
974 /// case the call is considered to be fulfilled.
975 pub const fn fulfilled(&self) -> usize {
976 self.fulfilled
977 }
978
979 /// Number of calls that were matched and were sent an event.
980 ///
981 /// This is the sum of [`fulfilled`] and [`sent`].
982 ///
983 /// See the caveats for both methods.
984 ///
985 /// [`fulfilled`]: Self::fulfilled
986 /// [`sent`]: Self::sent
987 pub const fn matched(&self) -> usize {
988 self.fulfilled() + self.sent()
989 }
990
991 /// Number of [`Standby`] streaming calls that were matched and had an event
992 /// sent to them.
993 ///
994 /// **Caveat**: although an event has been sent over the channel to the
995 /// receiver it is not guaranteed whether the receiver end actually received
996 /// it; the receiver end may drop shortly after an event is sent. In this
997 /// case the call is considered to be sent. Checks over this call will in
998 /// the future be considered [`dropped`].
999 ///
1000 /// [`dropped`]: Self::dropped
1001 pub const fn sent(&self) -> usize {
1002 self.sent
1003 }
1004
1005 /// Add another set of results to this set.
1006 const fn add_with(&mut self, other: &Self) {
1007 self.dropped = self.dropped.saturating_add(other.dropped);
1008 self.fulfilled = self.fulfilled.saturating_add(other.fulfilled);
1009 self.sent = self.sent.saturating_add(other.sent);
1010 }
1011
1012 /// Handle a process status.
1013 const fn handle(&mut self, status: ProcessStatus) {
1014 match status {
1015 ProcessStatus::Dropped => {
1016 self.dropped += 1;
1017 }
1018 ProcessStatus::SentFuture => {
1019 self.fulfilled += 1;
1020 }
1021 ProcessStatus::SentStream => {
1022 self.sent += 1;
1023 }
1024 ProcessStatus::AlreadyComplete | ProcessStatus::Skip => {}
1025 }
1026 }
1027}
1028
1029/// Status result of processing a bystander via [`Standby::bystander_process`].
1030#[derive(Clone, Copy, Debug)]
1031enum ProcessStatus {
1032 /// Call matched but already matched previously and was not removed, so the
1033 /// subject must be removed and not counted towards results.
1034 AlreadyComplete,
1035 /// Call matched but the receiver dropped their end.
1036 Dropped,
1037 /// Call matched a oneshot.
1038 SentFuture,
1039 /// Call matched a stream.
1040 SentStream,
1041 /// Call was not matched.
1042 Skip,
1043}
1044
1045impl ProcessStatus {
1046 /// Whether the call is complete.
1047 const fn is_complete(self) -> bool {
1048 matches!(
1049 self,
1050 Self::AlreadyComplete | Self::Dropped | Self::SentFuture
1051 )
1052 }
1053}
1054
1055#[cfg(test)]
1056mod tests {
1057 #![allow(clippy::non_ascii_literal)]
1058
1059 use crate::Standby;
1060 use static_assertions::assert_impl_all;
1061 use std::fmt::Debug;
1062 use tokio_stream::StreamExt;
1063 use twilight_gateway::{Event, EventType};
1064 use twilight_model::{
1065 application::interaction::{
1066 Interaction, InteractionData, InteractionType,
1067 message_component::MessageComponentInteractionData,
1068 },
1069 channel::{
1070 Channel, ChannelType,
1071 message::{EmojiReactionType, Message, MessageType, component::ComponentType},
1072 },
1073 gateway::{
1074 GatewayReaction, ShardId,
1075 payload::incoming::{InteractionCreate, MessageCreate, ReactionAdd, Ready, RoleDelete},
1076 },
1077 guild::Permissions,
1078 id::{Id, marker::GuildMarker},
1079 oauth::{ApplicationFlags, ApplicationIntegrationMap, PartialApplication},
1080 user::{CurrentUser, User},
1081 util::Timestamp,
1082 };
1083
1084 assert_impl_all!(Standby: Debug, Default, Send, Sync);
1085
1086 #[allow(deprecated)]
1087 fn message() -> Message {
1088 Message {
1089 activity: None,
1090 application: None,
1091 application_id: None,
1092 attachments: Vec::new(),
1093 author: User {
1094 accent_color: None,
1095 avatar: None,
1096 avatar_decoration: None,
1097 avatar_decoration_data: None,
1098 banner: None,
1099 bot: false,
1100 discriminator: 1,
1101 email: None,
1102 flags: None,
1103 global_name: Some("test".to_owned()),
1104 id: Id::new(2),
1105 locale: None,
1106 mfa_enabled: None,
1107 name: "twilight".to_owned(),
1108 premium_type: None,
1109 primary_guild: None,
1110 public_flags: None,
1111 system: None,
1112 verified: None,
1113 },
1114 call: None,
1115 channel_id: Id::new(1),
1116 components: Vec::new(),
1117 content: "test".to_owned(),
1118 edited_timestamp: None,
1119 embeds: Vec::new(),
1120 flags: None,
1121 guild_id: Some(Id::new(4)),
1122 id: Id::new(3),
1123 interaction: None,
1124 interaction_metadata: None,
1125 kind: MessageType::Regular,
1126 member: None,
1127 mention_channels: Vec::new(),
1128 mention_everyone: false,
1129 mention_roles: Vec::new(),
1130 mentions: Vec::new(),
1131 message_snapshots: Vec::new(),
1132 pinned: false,
1133 poll: None,
1134 reactions: Vec::new(),
1135 reference: None,
1136 referenced_message: None,
1137 role_subscription_data: None,
1138 sticker_items: Vec::new(),
1139 timestamp: Timestamp::from_secs(1_632_072_645).expect("non zero"),
1140 thread: None,
1141 tts: false,
1142 webhook_id: None,
1143 }
1144 }
1145
1146 fn reaction() -> GatewayReaction {
1147 GatewayReaction {
1148 burst: false,
1149 burst_colors: Vec::new(),
1150 channel_id: Id::new(2),
1151 emoji: EmojiReactionType::Unicode {
1152 name: "🍎".to_owned(),
1153 },
1154 guild_id: Some(Id::new(1)),
1155 member: None,
1156 message_author_id: None,
1157 message_id: Id::new(4),
1158 user_id: Id::new(3),
1159 }
1160 }
1161
1162 #[allow(deprecated)]
1163 fn button() -> Interaction {
1164 Interaction {
1165 app_permissions: Some(Permissions::SEND_MESSAGES),
1166 application_id: Id::new(1),
1167 authorizing_integration_owners: ApplicationIntegrationMap {
1168 guild: None,
1169 user: None,
1170 },
1171 channel: Some(Channel {
1172 bitrate: None,
1173 guild_id: None,
1174 id: Id::new(400),
1175 kind: ChannelType::GuildText,
1176 last_message_id: None,
1177 last_pin_timestamp: None,
1178 name: None,
1179 nsfw: None,
1180 owner_id: None,
1181 parent_id: None,
1182 permission_overwrites: None,
1183 position: None,
1184 rate_limit_per_user: None,
1185 recipients: None,
1186 rtc_region: None,
1187 topic: None,
1188 user_limit: None,
1189 application_id: None,
1190 applied_tags: None,
1191 available_tags: None,
1192 default_auto_archive_duration: None,
1193 default_forum_layout: None,
1194 default_reaction_emoji: None,
1195 default_sort_order: None,
1196 default_thread_rate_limit_per_user: None,
1197 flags: None,
1198 icon: None,
1199 invitable: None,
1200 managed: None,
1201 member: None,
1202 member_count: None,
1203 message_count: None,
1204 newly_created: None,
1205 thread_metadata: None,
1206 video_quality_mode: None,
1207 }),
1208 channel_id: None,
1209 context: None,
1210 data: Some(InteractionData::MessageComponent(Box::new(
1211 MessageComponentInteractionData {
1212 custom_id: String::from("Click"),
1213 component_type: ComponentType::Button,
1214 resolved: None,
1215 values: Vec::new(),
1216 },
1217 ))),
1218 entitlements: Vec::new(),
1219 guild: None,
1220 guild_id: Some(Id::new(3)),
1221 guild_locale: None,
1222 id: Id::new(4),
1223 kind: InteractionType::MessageComponent,
1224 locale: Some("en-GB".to_owned()),
1225 member: None,
1226 message: Some(message()),
1227 token: String::from("token"),
1228 user: Some(User {
1229 accent_color: None,
1230 avatar: None,
1231 avatar_decoration: None,
1232 avatar_decoration_data: None,
1233 banner: None,
1234 bot: false,
1235 discriminator: 1,
1236 email: None,
1237 flags: None,
1238 global_name: Some("test".to_owned()),
1239 id: Id::new(2),
1240 locale: None,
1241 mfa_enabled: None,
1242 name: "twilight".to_owned(),
1243 premium_type: None,
1244 primary_guild: None,
1245 public_flags: None,
1246 system: None,
1247 verified: None,
1248 }),
1249 }
1250 }
1251
1252 /// Test that if a receiver drops their end, the result properly counts the
1253 /// statistic.
1254 #[tokio::test]
1255 async fn test_dropped() {
1256 let standby = Standby::new();
1257 let guild_id = Id::new(1);
1258
1259 {
1260 let _rx = standby.wait_for(guild_id, move |_: &Event| false);
1261 }
1262
1263 let results = standby.process(&Event::RoleDelete(RoleDelete {
1264 guild_id,
1265 role_id: Id::new(2),
1266 }));
1267
1268 assert_eq!(1, results.dropped());
1269 assert_eq!(0, results.fulfilled());
1270 assert_eq!(0, results.sent());
1271 }
1272
1273 /// Test that both events in guild 1 is matched but the event in guild 2 is
1274 /// not matched by testing the returned matched amount.
1275 #[tokio::test]
1276 async fn test_matched() {
1277 fn check(event: &Event, guild_id: Id<GuildMarker>) -> bool {
1278 matches!(event, Event::RoleDelete(e) if e.guild_id == guild_id)
1279 }
1280
1281 let standby = Standby::new();
1282 let guild_id_one = Id::new(1);
1283 let guild_id_two = Id::new(2);
1284 let _one = standby.wait_for(guild_id_one, move |event: &Event| {
1285 check(event, guild_id_one)
1286 });
1287 let _two = standby.wait_for(guild_id_one, move |event: &Event| {
1288 check(event, guild_id_one)
1289 });
1290 let _three = standby.wait_for(guild_id_two, move |event: &Event| {
1291 check(event, guild_id_two)
1292 });
1293
1294 let results = standby.process(&Event::RoleDelete(RoleDelete {
1295 guild_id: Id::new(1),
1296 role_id: Id::new(2),
1297 }));
1298
1299 assert_eq!(0, results.dropped());
1300 assert_eq!(2, results.fulfilled());
1301 assert_eq!(0, results.sent());
1302 }
1303
1304 /// Test that the [`ProcessResults::sent`] counter increments if a match is
1305 /// sent to it.
1306 #[tokio::test]
1307 async fn test_sent() {
1308 let standby = Standby::new();
1309 let guild_id = Id::new(1);
1310
1311 let _rx = standby.wait_for_stream(guild_id, move |_: &Event| true);
1312
1313 let results = standby.process(&Event::RoleDelete(RoleDelete {
1314 guild_id,
1315 role_id: Id::new(2),
1316 }));
1317
1318 assert_eq!(0, results.dropped());
1319 assert_eq!(0, results.fulfilled());
1320 assert_eq!(1, results.sent());
1321 }
1322
1323 /// Test basic functionality of the [`Standby::wait_for`] method.
1324 #[tokio::test]
1325 async fn test_wait_for() {
1326 let standby = Standby::new();
1327 let wait = standby.wait_for(
1328 Id::new(1),
1329 |event: &Event| matches!(event, Event::RoleDelete(e) if e.guild_id.get() == 1),
1330 );
1331 standby.process(&Event::RoleDelete(RoleDelete {
1332 guild_id: Id::new(1),
1333 role_id: Id::new(2),
1334 }));
1335
1336 assert_eq!(
1337 wait.await.unwrap(),
1338 Event::RoleDelete(RoleDelete {
1339 guild_id: Id::new(1),
1340 role_id: Id::new(2),
1341 })
1342 );
1343 assert!(standby.guilds.is_empty());
1344 }
1345
1346 /// Test basic functionality of the [`Standby::wait_for_stream`] method.
1347 #[tokio::test]
1348 async fn test_wait_for_stream() {
1349 let standby = Standby::new();
1350 let mut stream = standby.wait_for_stream(
1351 Id::new(1),
1352 |event: &Event| matches!(event, Event::RoleDelete(e) if e.guild_id.get() == 1),
1353 );
1354 standby.process(&Event::RoleDelete(RoleDelete {
1355 guild_id: Id::new(1),
1356 role_id: Id::new(2),
1357 }));
1358 standby.process(&Event::RoleDelete(RoleDelete {
1359 guild_id: Id::new(1),
1360 role_id: Id::new(3),
1361 }));
1362
1363 assert_eq!(
1364 stream.next().await,
1365 Some(Event::RoleDelete(RoleDelete {
1366 guild_id: Id::new(1),
1367 role_id: Id::new(2)
1368 }))
1369 );
1370 assert_eq!(
1371 stream.next().await,
1372 Some(Event::RoleDelete(RoleDelete {
1373 guild_id: Id::new(1),
1374 role_id: Id::new(3)
1375 }))
1376 );
1377 assert!(!standby.guilds.is_empty());
1378 drop(stream);
1379 standby.process(&Event::RoleDelete(RoleDelete {
1380 guild_id: Id::new(1),
1381 role_id: Id::new(4),
1382 }));
1383 assert!(standby.guilds.is_empty());
1384 }
1385
1386 /// Test basic functionality of the [`Standby::wait_for_event`] method.
1387 #[tokio::test]
1388 async fn test_wait_for_event() {
1389 let ready = Ready {
1390 application: PartialApplication {
1391 flags: ApplicationFlags::empty(),
1392 id: Id::new(1),
1393 },
1394 guilds: Vec::new(),
1395 resume_gateway_url: "wss://gateway.discord.gg".into(),
1396 session_id: String::new(),
1397 shard: Some(ShardId::new(5, 7)),
1398 user: CurrentUser {
1399 accent_color: None,
1400 avatar: None,
1401 banner: None,
1402 bot: false,
1403 discriminator: 1,
1404 email: None,
1405 id: Id::new(1),
1406 mfa_enabled: true,
1407 name: "twilight".to_owned(),
1408 verified: Some(false),
1409 premium_type: None,
1410 public_flags: None,
1411 flags: None,
1412 locale: None,
1413 global_name: None,
1414 },
1415 version: 6,
1416 };
1417 let event = Event::Ready(ready);
1418
1419 let standby = Standby::new();
1420 let wait = standby.wait_for_event(|event: &Event| match event {
1421 Event::Ready(ready) => ready.shard.is_some_and(|id| id.number() == 5),
1422 _ => false,
1423 });
1424 assert!(!standby.events.is_empty());
1425 standby.process(&event);
1426
1427 assert_eq!(event, wait.await.unwrap());
1428 assert!(standby.events.is_empty());
1429 }
1430
1431 /// Test basic functionality of the [`Standby::wait_for_event_stream`]
1432 /// method.
1433 #[tokio::test]
1434 async fn test_wait_for_event_stream() {
1435 let standby = Standby::new();
1436 let mut stream =
1437 standby.wait_for_event_stream(|event: &Event| event.kind() == EventType::Resumed);
1438 standby.process(&Event::Resumed);
1439 assert_eq!(stream.next().await, Some(Event::Resumed));
1440 assert!(!standby.events.is_empty());
1441 drop(stream);
1442 standby.process(&Event::Resumed);
1443 assert!(standby.events.is_empty());
1444 }
1445
1446 /// Test basic functionality of the [`Standby::wait_for_message`] method.
1447 #[tokio::test]
1448 async fn test_wait_for_message() {
1449 let message = message();
1450 let event = Event::MessageCreate(Box::new(MessageCreate(message)));
1451
1452 let standby = Standby::new();
1453 let wait = standby.wait_for_message(Id::new(1), |message: &MessageCreate| {
1454 message.author.id.get() == 2
1455 });
1456 standby.process(&event);
1457
1458 assert_eq!(3, wait.await.map(|msg| msg.id.get()).unwrap());
1459 assert!(standby.messages.is_empty());
1460 }
1461
1462 /// Test basic functionality of the [`Standby::wait_for_message_stream`]
1463 /// method.
1464 #[tokio::test]
1465 async fn test_wait_for_message_stream() {
1466 let standby = Standby::new();
1467 let mut stream = standby.wait_for_message_stream(Id::new(1), |_: &MessageCreate| true);
1468 standby.process(&Event::MessageCreate(Box::new(MessageCreate(message()))));
1469 standby.process(&Event::MessageCreate(Box::new(MessageCreate(message()))));
1470
1471 assert!(stream.next().await.is_some());
1472 assert!(stream.next().await.is_some());
1473 drop(stream);
1474 assert_eq!(1, standby.messages.len());
1475 standby.process(&Event::MessageCreate(Box::new(MessageCreate(message()))));
1476 assert!(standby.messages.is_empty());
1477 }
1478
1479 /// Test basic functionality of the [`Standby::wait_for_reaction`] method.
1480 #[tokio::test]
1481 async fn test_wait_for_reaction() {
1482 let event = Event::ReactionAdd(Box::new(ReactionAdd(reaction())));
1483
1484 let standby = Standby::new();
1485 let wait = standby.wait_for_reaction(Id::new(4), |reaction: &ReactionAdd| {
1486 reaction.user_id.get() == 3
1487 });
1488
1489 standby.process(&event);
1490
1491 assert_eq!(
1492 Id::new(3),
1493 wait.await.map(|reaction| reaction.user_id).unwrap()
1494 );
1495 assert!(standby.reactions.is_empty());
1496 }
1497
1498 /// Test basic functionality of the [`Standby::wait_for_reaction_stream`]
1499 /// method.
1500 #[tokio::test]
1501 async fn test_wait_for_reaction_stream() {
1502 let standby = Standby::new();
1503 let mut stream = standby.wait_for_reaction_stream(Id::new(4), |_: &ReactionAdd| true);
1504 standby.process(&Event::ReactionAdd(Box::new(ReactionAdd(reaction()))));
1505 standby.process(&Event::ReactionAdd(Box::new(ReactionAdd(reaction()))));
1506
1507 assert!(stream.next().await.is_some());
1508 assert!(stream.next().await.is_some());
1509 drop(stream);
1510 assert_eq!(1, standby.reactions.len());
1511 standby.process(&Event::ReactionAdd(Box::new(ReactionAdd(reaction()))));
1512 assert!(standby.reactions.is_empty());
1513 }
1514
1515 /// Assert that Standby processing some non-matching events will not affect
1516 /// the matching of a later event.
1517 #[tokio::test]
1518 async fn test_wait_for_component() {
1519 let event = Event::InteractionCreate(Box::new(InteractionCreate(button())));
1520
1521 let standby = Standby::new();
1522 let wait = standby.wait_for_component(Id::new(3), |button: &Interaction| {
1523 button.author_id() == Some(Id::new(2))
1524 });
1525
1526 standby.process(&event);
1527
1528 assert_eq!(
1529 Some(Id::new(2)),
1530 wait.await.map(|button| button.author_id()).unwrap()
1531 );
1532 assert!(standby.components.is_empty());
1533 }
1534
1535 #[tokio::test]
1536 async fn test_wait_for_component_stream() {
1537 let standby = Standby::new();
1538 let mut stream = standby.wait_for_component_stream(Id::new(3), |_: &Interaction| true);
1539 standby.process(&Event::InteractionCreate(Box::new(InteractionCreate(
1540 button(),
1541 ))));
1542 standby.process(&Event::InteractionCreate(Box::new(InteractionCreate(
1543 button(),
1544 ))));
1545
1546 assert!(stream.next().await.is_some());
1547 assert!(stream.next().await.is_some());
1548 drop(stream);
1549 assert_eq!(1, standby.components.len());
1550 standby.process(&Event::InteractionCreate(Box::new(InteractionCreate(
1551 button(),
1552 ))));
1553 assert!(standby.components.is_empty());
1554 }
1555
1556 #[tokio::test]
1557 async fn test_handles_wrong_events() {
1558 let standby = Standby::new();
1559 let wait = standby.wait_for_event(|event: &Event| event.kind() == EventType::Resumed);
1560
1561 standby.process(&Event::GatewayHeartbeatAck);
1562 standby.process(&Event::GatewayHeartbeatAck);
1563 standby.process(&Event::Resumed);
1564
1565 assert_eq!(Event::Resumed, wait.await.unwrap());
1566 }
1567
1568 /// Test that generic event handlers will be given the opportunity to
1569 /// process events with specific handlers (message creates, reaction adds)
1570 /// and guild events. Similarly, guild handlers should be able to process
1571 /// specific handler events as well.
1572 #[tokio::test]
1573 async fn test_process_nonspecific_handling() {
1574 let standby = Standby::new();
1575
1576 // generic event handler gets message creates
1577 let wait = standby.wait_for_event(|event: &Event| event.kind() == EventType::MessageCreate);
1578 standby.process(&Event::MessageCreate(Box::new(MessageCreate(message()))));
1579 assert!(matches!(wait.await, Ok(Event::MessageCreate(_))));
1580
1581 // generic event handler gets reaction adds
1582 let wait = standby.wait_for_event(|event: &Event| event.kind() == EventType::ReactionAdd);
1583 standby.process(&Event::ReactionAdd(Box::new(ReactionAdd(reaction()))));
1584 assert!(matches!(wait.await, Ok(Event::ReactionAdd(_))));
1585
1586 // generic event handler gets other guild events
1587 let wait = standby.wait_for_event(|event: &Event| event.kind() == EventType::RoleDelete);
1588 standby.process(&Event::RoleDelete(RoleDelete {
1589 guild_id: Id::new(1),
1590 role_id: Id::new(2),
1591 }));
1592 assert!(matches!(wait.await, Ok(Event::RoleDelete(_))));
1593
1594 // guild event handler gets message creates or reaction events
1595 let wait = standby.wait_for(Id::new(1), |event: &Event| {
1596 event.kind() == EventType::ReactionAdd
1597 });
1598 standby.process(&Event::ReactionAdd(Box::new(ReactionAdd(reaction()))));
1599 assert!(matches!(wait.await, Ok(Event::ReactionAdd(_))));
1600 }
1601}