twilight_gateway/stream.rs
1//! Convenient `Stream` extension trait for message deserialization.
2
3use crate::{error::ReceiveMessageError, EventTypeFlags, Message};
4use futures_core::Stream;
5
6/// An extension trait for the [`Stream`] trait.
7///
8/// If you need utilities from multiple `StreamExt` traits, [underscore import]
9/// this one.
10///
11/// [underscore import]: https://doc.rust-lang.org/reference/items/use-declarations.html#underscore-imports
12pub trait StreamExt: Stream {
13 /// Consumes and returns the next wanted [`Event`] in the stream or `None`
14 /// if the stream is finished.
15 ///
16 /// `next_event()` takes a `EventTypeFlags` which is then passed along to
17 /// [`parse`]. Unwanted event types are skipped.
18 ///
19 /// Close messages are always considered wanted and map onto
20 /// [`Event::GatewayClose`].
21 ///
22 /// Equivalent to:
23 ///
24 /// ```ignore
25 /// async fn next_event(&mut self, wanted_event_types: EventTypeFlags) -> Option<Result<Event, ReceiveMessageError>>
26 /// ```
27 ///
28 /// Note that because `next_event` doesn’t take ownership over the stream,
29 /// the [`Stream`] type must be [`Unpin`]. If you want to use `next` with a
30 /// [`!Unpin`](Unpin) stream, you’ll first have to pin the stream. This
31 /// can be done by boxing the stream using [`Box::pin`] or pinning it to
32 /// the stack using [`pin!`].
33 ///
34 /// # Cancel safety
35 ///
36 /// This method is cancel safe. The returned future only holds onto a
37 /// reference to the underlying stream, so dropping it will never lose a
38 /// value.
39 ///
40 /// # Example
41 ///
42 /// ```no_run
43 /// # use twilight_gateway::{Intents, Shard, ShardId};
44 /// # #[tokio::main] async fn main() {
45 /// # let mut shard = Shard::new(ShardId::ONE, String::new(), Intents::empty());
46 /// use twilight_gateway::{Event, EventTypeFlags, StreamExt as _};
47 ///
48 /// while let Some(item) = shard.next_event(EventTypeFlags::all()).await {
49 /// let Ok(event) = item else {
50 /// tracing::warn!(source = ?item.unwrap_err(), "error receiving event");
51 ///
52 /// continue;
53 /// };
54 ///
55 /// match event {
56 /// Event::Ready(_) => tracing::info!("ready!"),
57 /// _ => {}
58 /// }
59 /// }
60 /// # }
61 /// ```
62 ///
63 /// [`Event`]: crate::Event
64 /// [`Event::GatewayClose`]: crate::Event::GatewayClose
65 /// [`parse`]: crate::parse
66 /// [`pin!`]: std::pin::pin
67 fn next_event(&mut self, wanted_event_types: EventTypeFlags) -> private::NextEvent<Self>
68 where
69 Self: Unpin,
70 {
71 private::NextEvent::new(self, wanted_event_types)
72 }
73}
74
75impl<St: ?Sized> StreamExt for St where St: Stream<Item = Result<Message, ReceiveMessageError>> {}
76
77mod private {
78 //! Private module to hide the returned type from the [`next_event`](super::StreamExt::next_event)
79 //! method.
80 //!
81 //! Effectively disallows consumers from implementing the trait.
82
83 use crate::{error::ReceiveMessageError, json::parse, EventTypeFlags, Message};
84 use futures_core::Stream;
85 use std::{
86 future::Future,
87 pin::Pin,
88 task::{ready, Context, Poll},
89 };
90 use twilight_model::gateway::event::Event;
91
92 /// Future for the [`next_event`](super::StreamExt::next_event) method.
93 pub struct NextEvent<'a, St: ?Sized> {
94 /// Gateway event types to deserialize.
95 events: EventTypeFlags,
96 /// Inner wrapped stream.
97 stream: &'a mut St,
98 }
99
100 impl<'a, St: ?Sized> NextEvent<'a, St> {
101 /// Create a new future.
102 pub fn new(stream: &'a mut St, events: EventTypeFlags) -> Self {
103 Self { events, stream }
104 }
105 }
106
107 impl<St: ?Sized + Stream<Item = Result<Message, ReceiveMessageError>> + Unpin> Future
108 for NextEvent<'_, St>
109 {
110 type Output = Option<Result<Event, ReceiveMessageError>>;
111
112 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
113 let events = self.events;
114 let try_from_message = |message| match message {
115 Message::Text(json) => parse(json, events).map(|opt| opt.map(Into::into)),
116 Message::Close(frame) => Ok(Some(Event::GatewayClose(frame))),
117 };
118
119 loop {
120 match ready!(Pin::new(&mut self.stream).poll_next(cx)) {
121 Some(item) => {
122 if let Some(event) = item.and_then(try_from_message).transpose() {
123 return Poll::Ready(Some(event));
124 }
125 }
126 None => return Poll::Ready(None),
127 }
128 }
129 }
130 }
131}