twilight_gateway/
stream.rs

1//! Convenient `Stream` extension trait for message deserialization.
2
3use crate::{error::ReceiveMessageError, EventTypeFlags, Message};
4use futures_core::Stream;
5
6/// An extension trait for the [`Stream`] trait.
7///
8/// If you need utilities from multiple `StreamExt` traits, [underscore import]
9/// this one.
10///
11/// [underscore import]: https://doc.rust-lang.org/reference/items/use-declarations.html#underscore-imports
12pub trait StreamExt: Stream {
13    /// Consumes and returns the next wanted [`Event`] in the stream or `None`
14    /// if the stream is finished.
15    ///
16    /// `next_event()` takes a `EventTypeFlags` which is then passed along to
17    /// [`parse`]. Unwanted event types are skipped.
18    ///
19    /// Close messages are always considered wanted and map onto
20    /// [`Event::GatewayClose`].
21    ///
22    /// Equivalent to:
23    ///
24    /// ```ignore
25    /// async fn next_event(&mut self, wanted_event_types: EventTypeFlags) -> Option<Result<Event, ReceiveMessageError>>
26    /// ```
27    ///
28    /// Note that because `next_event` doesn’t take ownership over the stream,
29    /// the [`Stream`] type must be [`Unpin`]. If you want to use `next` with a
30    /// [`!Unpin`](Unpin) stream, you’ll first have to pin the stream. This
31    /// can be done by boxing the stream using [`Box::pin`] or pinning it to
32    /// the stack using [`pin!`].
33    ///
34    /// # Cancel safety
35    ///
36    /// This method is cancel safe. The returned future only holds onto a
37    /// reference to the underlying stream, so dropping it will never lose a
38    /// value.
39    ///
40    /// # Example
41    ///
42    /// ```no_run
43    /// # use twilight_gateway::{Intents, Shard, ShardId};
44    /// # #[tokio::main] async fn main() {
45    /// # let mut shard = Shard::new(ShardId::ONE, String::new(), Intents::empty());
46    /// use twilight_gateway::{Event, EventTypeFlags, StreamExt as _};
47    ///
48    /// while let Some(item) = shard.next_event(EventTypeFlags::all()).await {
49    ///     let Ok(event) = item else {
50    ///         tracing::warn!(source = ?item.unwrap_err(), "error receiving event");
51    ///
52    ///         continue;
53    ///     };
54    ///
55    ///     match event {
56    ///         Event::Ready(_) => tracing::info!("ready!"),
57    ///         _ => {}
58    ///     }
59    /// }
60    /// # }
61    /// ```
62    ///
63    /// [`Event`]: crate::Event
64    /// [`Event::GatewayClose`]: crate::Event::GatewayClose
65    /// [`parse`]: crate::parse
66    /// [`pin!`]: std::pin::pin
67    fn next_event(&mut self, wanted_event_types: EventTypeFlags) -> private::NextEvent<Self>
68    where
69        Self: Unpin,
70    {
71        private::NextEvent::new(self, wanted_event_types)
72    }
73}
74
75impl<St: ?Sized> StreamExt for St where St: Stream<Item = Result<Message, ReceiveMessageError>> {}
76
77mod private {
78    //! Private module to hide the returned type from the [`next_event`](super::StreamExt::next_event)
79    //! method.
80    //!
81    //! Effectively disallows consumers from implementing the trait.
82
83    use crate::{error::ReceiveMessageError, json::parse, EventTypeFlags, Message};
84    use futures_core::Stream;
85    use std::{
86        future::Future,
87        pin::Pin,
88        task::{ready, Context, Poll},
89    };
90    use twilight_model::gateway::event::Event;
91
92    /// Future for the [`next_event`](super::StreamExt::next_event) method.
93    pub struct NextEvent<'a, St: ?Sized> {
94        /// Gateway event types to deserialize.
95        events: EventTypeFlags,
96        /// Inner wrapped stream.
97        stream: &'a mut St,
98    }
99
100    impl<'a, St: ?Sized> NextEvent<'a, St> {
101        /// Create a new future.
102        pub fn new(stream: &'a mut St, events: EventTypeFlags) -> Self {
103            Self { events, stream }
104        }
105    }
106
107    impl<St: ?Sized + Stream<Item = Result<Message, ReceiveMessageError>> + Unpin> Future
108        for NextEvent<'_, St>
109    {
110        type Output = Option<Result<Event, ReceiveMessageError>>;
111
112        fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
113            let events = self.events;
114            let try_from_message = |message| match message {
115                Message::Text(json) => parse(json, events).map(|opt| opt.map(Into::into)),
116                Message::Close(frame) => Ok(Some(Event::GatewayClose(frame))),
117            };
118
119            loop {
120                match ready!(Pin::new(&mut self.stream).poll_next(cx)) {
121                    Some(item) => {
122                        if let Some(event) = item.and_then(try_from_message).transpose() {
123                            return Poll::Ready(Some(event));
124                        }
125                    }
126                    None => return Poll::Ready(None),
127                }
128            }
129        }
130    }
131}