twilight_gateway/channel.rs
1//! Channel for users to send messages when calling [`Shard::send`] isn't
2//! possible.
3//!
4//! [`Shard::send`]: crate::Shard::send
5
6use crate::{
7 CloseFrame,
8 command::Command,
9 error::{ChannelError, ChannelErrorType},
10 json,
11};
12use tokio::sync::mpsc;
13
14/// Channel between a user and shard for sending outgoing gateway messages.
15#[derive(Debug)]
16pub struct MessageChannel {
17 /// Receiving half for shards to receive users' close frames.
18 pub close_rx: mpsc::Receiver<CloseFrame<'static>>,
19 /// Sending half for users to send close frames via shards.
20 pub close_tx: mpsc::Sender<CloseFrame<'static>>,
21 /// Receiving half for shards to receive users' commands.
22 pub command_rx: mpsc::UnboundedReceiver<String>,
23 /// Sending half for users to send commands via shards.
24 pub command_tx: mpsc::UnboundedSender<String>,
25}
26
27impl MessageChannel {
28 /// Initialize a new message channel.
29 pub fn new() -> Self {
30 let (command_tx, command_rx) = mpsc::unbounded_channel();
31 let (close_tx, close_rx) = mpsc::channel(1);
32
33 Self {
34 close_rx,
35 close_tx,
36 command_rx,
37 command_tx,
38 }
39 }
40
41 /// Clone of the senders.
42 pub fn sender(&self) -> MessageSender {
43 MessageSender {
44 close: self.close_tx.clone(),
45 command: self.command_tx.clone(),
46 }
47 }
48}
49
50/// Channel to send messages over a [`Shard`] to the Discord gateway.
51///
52/// [`Shard`]: crate::Shard
53#[derive(Clone, Debug)]
54pub struct MessageSender {
55 /// Sending half of the close channel.
56 close: mpsc::Sender<CloseFrame<'static>>,
57 /// Sending half of the command channel.
58 command: mpsc::UnboundedSender<String>,
59}
60
61impl MessageSender {
62 /// Whether the channel is closed.
63 ///
64 /// The channel will only be closed if the associated shard has been
65 /// dropped.
66 pub fn is_closed(&self) -> bool {
67 self.command.is_closed()
68 }
69
70 /// Send a command to the associated shard.
71 ///
72 /// # Errors
73 ///
74 /// Returns a [`ChannelErrorType::Closed`] error type if the channel is
75 /// closed.
76 #[allow(clippy::missing_panics_doc)]
77 pub fn command(&self, command: &impl Command) -> Result<(), ChannelError> {
78 self.send(json::to_string(command).expect("serialization cannot fail"))
79 }
80
81 /// Send a JSON encoded gateway event to the associated shard.
82 ///
83 /// # Errors
84 ///
85 /// Returns a [`ChannelErrorType::Closed`] error type if the channel is
86 /// closed.
87 pub fn send(&self, json: String) -> Result<(), ChannelError> {
88 self.command.send(json).map_err(|source| ChannelError {
89 kind: ChannelErrorType::Closed,
90 source: Some(Box::new(source)),
91 })
92 }
93
94 /// Send a Websocket close frame to the associated shard.
95 ///
96 /// Subsequent calls may be queued up to be sent once the shard's
97 /// reestablished a Websocket connection or ignored if the queue is full.
98 /// The internal queue capacity is currently `1`.
99 ///
100 /// See the [`Shard::close`] docs for further information.
101 ///
102 /// # Errors
103 ///
104 /// Returns a [`ChannelErrorType::Closed`] error type if the channel is
105 /// closed.
106 ///
107 /// [`Shard::close`]: crate::Shard::close
108 pub fn close(&self, close_frame: CloseFrame<'static>) -> Result<(), ChannelError> {
109 if let Err(source @ mpsc::error::TrySendError::Closed(_)) = self.close.try_send(close_frame)
110 {
111 Err(ChannelError {
112 kind: ChannelErrorType::Closed,
113 source: Some(Box::new(source)),
114 })
115 } else {
116 Ok(())
117 }
118 }
119}
120
121#[cfg(test)]
122mod tests {
123 use super::{MessageChannel, MessageSender};
124 use static_assertions::assert_impl_all;
125 use std::fmt::Debug;
126
127 assert_impl_all!(MessageChannel: Debug, Send, Sync);
128 assert_impl_all!(MessageSender: Clone, Debug, Send, Sync);
129}