twilight_gateway/
channel.rs

1//! Channel for users to send messages when calling [`Shard::send`] isn't
2//! possible.
3//!
4//! [`Shard::send`]: crate::Shard::send
5
6use crate::{
7    CloseFrame,
8    command::Command,
9    error::{ChannelError, ChannelErrorType},
10    json,
11};
12use tokio::sync::mpsc;
13
14/// Channel between a user and shard for sending outgoing gateway messages.
15#[derive(Debug)]
16pub struct MessageChannel {
17    /// Receiving half for shards to receive users' close frames.
18    pub close_rx: mpsc::Receiver<CloseFrame<'static>>,
19    /// Sending half for users to send close frames via shards.
20    pub close_tx: mpsc::Sender<CloseFrame<'static>>,
21    /// Receiving half for shards to receive users' commands.
22    pub command_rx: mpsc::UnboundedReceiver<String>,
23    /// Sending half for users to send commands via shards.
24    pub command_tx: mpsc::UnboundedSender<String>,
25}
26
27impl MessageChannel {
28    /// Initialize a new message channel.
29    pub fn new() -> Self {
30        let (command_tx, command_rx) = mpsc::unbounded_channel();
31        let (close_tx, close_rx) = mpsc::channel(1);
32
33        Self {
34            close_rx,
35            close_tx,
36            command_rx,
37            command_tx,
38        }
39    }
40
41    /// Clone of the senders.
42    pub fn sender(&self) -> MessageSender {
43        MessageSender {
44            close: self.close_tx.clone(),
45            command: self.command_tx.clone(),
46        }
47    }
48}
49
50/// Channel to send messages over a [`Shard`] to the Discord gateway.
51///
52/// [`Shard`]: crate::Shard
53#[derive(Clone, Debug)]
54pub struct MessageSender {
55    /// Sending half of the close channel.
56    close: mpsc::Sender<CloseFrame<'static>>,
57    /// Sending half of the command channel.
58    command: mpsc::UnboundedSender<String>,
59}
60
61impl MessageSender {
62    /// Whether the channel is closed.
63    ///
64    /// The channel will only be closed if the associated shard has been
65    /// dropped.
66    pub fn is_closed(&self) -> bool {
67        self.command.is_closed()
68    }
69
70    /// Send a command to the associated shard.
71    ///
72    /// # Errors
73    ///
74    /// Returns a [`ChannelErrorType::Closed`] error type if the channel is
75    /// closed.
76    #[allow(clippy::missing_panics_doc)]
77    pub fn command(&self, command: &impl Command) -> Result<(), ChannelError> {
78        self.send(json::to_string(command).expect("serialization cannot fail"))
79    }
80
81    /// Send a JSON encoded gateway event to the associated shard.
82    ///
83    /// # Errors
84    ///
85    /// Returns a [`ChannelErrorType::Closed`] error type if the channel is
86    /// closed.
87    pub fn send(&self, json: String) -> Result<(), ChannelError> {
88        self.command.send(json).map_err(|source| ChannelError {
89            kind: ChannelErrorType::Closed,
90            source: Some(Box::new(source)),
91        })
92    }
93
94    /// Send a Websocket close frame to the associated shard.
95    ///
96    /// Subsequent calls may be queued up to be sent once the shard's
97    /// reestablished a Websocket connection or ignored if the queue is full.
98    /// The internal queue capacity is currently `1`.
99    ///
100    /// See the [`Shard::close`] docs for further information.
101    ///
102    /// # Errors
103    ///
104    /// Returns a [`ChannelErrorType::Closed`] error type if the channel is
105    /// closed.
106    ///
107    /// [`Shard::close`]: crate::Shard::close
108    pub fn close(&self, close_frame: CloseFrame<'static>) -> Result<(), ChannelError> {
109        if let Err(source @ mpsc::error::TrySendError::Closed(_)) = self.close.try_send(close_frame)
110        {
111            Err(ChannelError {
112                kind: ChannelErrorType::Closed,
113                source: Some(Box::new(source)),
114            })
115        } else {
116            Ok(())
117        }
118    }
119}
120
121#[cfg(test)]
122mod tests {
123    use super::{MessageChannel, MessageSender};
124    use static_assertions::assert_impl_all;
125    use std::fmt::Debug;
126
127    assert_impl_all!(MessageChannel: Debug, Send, Sync);
128    assert_impl_all!(MessageSender: Clone, Debug, Send, Sync);
129}