twilight_gateway/channel.rs
1//! Channel for users to send messages when calling [`Shard::send`] isn't
2//! possible.
3//!
4//! [`Shard::send`]: crate::Shard::send
5
6use crate::{
7 command::Command,
8 error::{ChannelError, ChannelErrorType},
9 json, CloseFrame,
10};
11use tokio::sync::mpsc;
12
13/// Channel between a user and shard for sending outgoing gateway messages.
14#[derive(Debug)]
15pub struct MessageChannel {
16 /// Receiving half for shards to receive users' close frames.
17 pub close_rx: mpsc::Receiver<CloseFrame<'static>>,
18 /// Sending half for users to send close frames via shards.
19 pub close_tx: mpsc::Sender<CloseFrame<'static>>,
20 /// Receiving half for shards to receive users' commands.
21 pub command_rx: mpsc::UnboundedReceiver<String>,
22 /// Sending half for users to send commands via shards.
23 pub command_tx: mpsc::UnboundedSender<String>,
24}
25
26impl MessageChannel {
27 /// Initialize a new message channel.
28 pub fn new() -> Self {
29 let (command_tx, command_rx) = mpsc::unbounded_channel();
30 let (close_tx, close_rx) = mpsc::channel(1);
31
32 Self {
33 close_rx,
34 close_tx,
35 command_rx,
36 command_tx,
37 }
38 }
39
40 /// Clone of the senders.
41 pub fn sender(&self) -> MessageSender {
42 MessageSender {
43 close: self.close_tx.clone(),
44 command: self.command_tx.clone(),
45 }
46 }
47}
48
49/// Channel to send messages over a [`Shard`] to the Discord gateway.
50///
51/// [`Shard`]: crate::Shard
52#[derive(Clone, Debug)]
53pub struct MessageSender {
54 /// Sending half of the close channel.
55 close: mpsc::Sender<CloseFrame<'static>>,
56 /// Sending half of the command channel.
57 command: mpsc::UnboundedSender<String>,
58}
59
60impl MessageSender {
61 /// Whether the channel is closed.
62 ///
63 /// The channel will only be closed if the associated shard has been
64 /// dropped.
65 pub fn is_closed(&self) -> bool {
66 self.command.is_closed()
67 }
68
69 /// Send a command to the associated shard.
70 ///
71 /// # Errors
72 ///
73 /// Returns a [`ChannelErrorType::Closed`] error type if the channel is
74 /// closed.
75 #[allow(clippy::missing_panics_doc)]
76 pub fn command(&self, command: &impl Command) -> Result<(), ChannelError> {
77 self.send(json::to_string(command).expect("serialization cannot fail"))
78 }
79
80 /// Send a JSON encoded gateway event to the associated shard.
81 ///
82 /// # Errors
83 ///
84 /// Returns a [`ChannelErrorType::Closed`] error type if the channel is
85 /// closed.
86 pub fn send(&self, json: String) -> Result<(), ChannelError> {
87 self.command.send(json).map_err(|source| ChannelError {
88 kind: ChannelErrorType::Closed,
89 source: Some(Box::new(source)),
90 })
91 }
92
93 /// Send a Websocket close frame to the associated shard.
94 ///
95 /// Subsequent calls may be queued up to be sent once the shard's
96 /// reestablished a Websocket connection or ignored if the queue is full.
97 /// The internal queue capacity is currently `1`.
98 ///
99 /// See the [`Shard::close`] docs for further information.
100 ///
101 /// # Errors
102 ///
103 /// Returns a [`ChannelErrorType::Closed`] error type if the channel is
104 /// closed.
105 ///
106 /// [`Shard::close`]: crate::Shard::close
107 pub fn close(&self, close_frame: CloseFrame<'static>) -> Result<(), ChannelError> {
108 if let Err(source @ mpsc::error::TrySendError::Closed(_)) = self.close.try_send(close_frame)
109 {
110 Err(ChannelError {
111 kind: ChannelErrorType::Closed,
112 source: Some(Box::new(source)),
113 })
114 } else {
115 Ok(())
116 }
117 }
118}
119
120#[cfg(test)]
121mod tests {
122 use super::{MessageChannel, MessageSender};
123 use static_assertions::assert_impl_all;
124 use std::fmt::Debug;
125
126 assert_impl_all!(MessageChannel: Debug, Send, Sync);
127 assert_impl_all!(MessageSender: Clone, Debug, Send, Sync);
128}