tokio_xmpp/stanzastream/mod.rs
1// Copyright (c) 2019 Emmanuel Gil Peyrot <linkmauve@linkmauve.fr>
2//
3// This Source Code Form is subject to the terms of the Mozilla Public
4// License, v. 2.0. If a copy of the MPL was not distributed with this
5// file, You can obtain one at http://mozilla.org/MPL/2.0/.
6
7//! # Resilient stanza stream
8//!
9//! This module provides the [`StanzaStream`], which is the next level up from
10//! the low-level [`XmlStream`][`crate::xmlstream::XmlStream`].
11//!
12//! The stanza stream knows about XMPP and it most importantly knows how to
13//! fix a broken connection with a reconnect and how to do this smoothly using
14//! [XEP-0198 (Stream Management)](https://xmpp.org/extensions/xep-0198.html).
15//! XEP-0198 is only used if the peer supports it. If the peer does not
16//! support XEP-0198, automatic reconnects are still done, but with more
17//! undetectable data loss.
18//!
19//! The main API entrypoint for the stanza stream is, unsurprisingly,
20//! [`StanzaStream`].
21
22use core::pin::Pin;
23use core::task::{Context, Poll};
24use core::time::Duration;
25
26// TODO: ensure that IDs are always set on stanzas.
27
28// TODO: figure out what to do with the mpsc::Sender<QueueEntry> on lossy
29// stream reconnects. Keeping it may cause stanzas to be sent which weren't
30// meant for that stream, replacing it is racy.
31
32use futures::{SinkExt, Stream};
33
34use tokio::sync::{mpsc, oneshot};
35
36use xmpp_parsers::{jid::Jid, stream_features::StreamFeatures};
37
38use crate::connect::ServerConnector;
39use crate::xmlstream::Timeouts;
40use crate::Stanza;
41
42mod connected;
43mod error;
44mod negotiation;
45mod queue;
46mod stream_management;
47mod worker;
48
49use self::queue::QueueEntry;
50pub use self::queue::{StanzaStage, StanzaState, StanzaToken};
51pub use self::worker::{Connection, XmppStream};
52use self::worker::{StanzaStreamWorker, LOCAL_SHUTDOWN_TIMEOUT};
53
54/// Event informing about the change of the [`StanzaStream`]'s status.
55#[derive(Debug)]
56pub enum StreamEvent {
57 /// The stream was (re-)established **with** loss of state.
58 Reset {
59 /// The new JID to which the stream is bound.
60 bound_jid: Jid,
61
62 /// The features reported by the stream.
63 features: StreamFeatures,
64 },
65
66 /// The stream is currently inactive because a connection was lost.
67 ///
68 /// Resumption without loss of state is still possible. This event is
69 /// merely informative and may be used to prolong timeouts or inform the
70 /// user that the connection is currently unstable.
71 Suspended,
72
73 /// The stream was reestablished **without** loss of state.
74 ///
75 /// This is merely informative. Potentially useful to prolong timeouts.
76 Resumed,
77}
78
79/// Event emitted by the [`StanzaStream`].
80///
81/// Note that stream closure is not an explicit event, but the end of the
82/// event stream itself.
83#[derive(Debug)]
84pub enum Event {
85 /// The streams connectivity status has changed.
86 Stream(StreamEvent),
87
88 /// A stanza was received over the stream.
89 Stanza(Stanza),
90}
91
92/// Frontend interface to a reliable, always-online stanza stream.
93#[derive(Debug)]
94pub struct StanzaStream {
95 rx: mpsc::Receiver<Event>,
96 tx: mpsc::Sender<QueueEntry>,
97}
98
99impl StanzaStream {
100 /// Establish a new client-to-server stream using the given
101 /// [`ServerConnector`].
102 ///
103 /// `jid` and `password` must be the user account's credentials. `jid` may
104 /// either be a bare JID (to let the server choose a resource) or a full
105 /// JID (to request a specific resource from the server, with no guarantee
106 /// of succcess).
107 ///
108 /// `timeouts` controls the responsiveness to connection interruptions
109 /// on the underlying transports. Please see the [`Timeouts`] struct's
110 /// documentation for hints on how to correctly configure this.
111 ///
112 /// The `queue_depth` controls the sizes for the incoming and outgoing
113 /// stanza queues. If the size is exceeded, the corresponding direction
114 /// will block until the queues can be flushed. Note that the respective
115 /// reverse direction is not affected (i.e. if your outgoing queue is
116 /// full for example because of a slow server, you can still receive
117 /// data).
118 pub fn new_c2s<C: ServerConnector>(
119 server: C,
120 jid: Jid,
121 password: String,
122 timeouts: Timeouts,
123 queue_depth: usize,
124 ) -> Self {
125 let reconnector = Box::new(
126 move |_preferred_location: Option<String>, slot: oneshot::Sender<Connection>| {
127 let jid = jid.clone();
128 let server = server.clone();
129 let password = password.clone();
130 tokio::spawn(async move {
131 const MAX_DELAY: Duration = Duration::new(30, 0);
132 let mut delay = Duration::new(1, 0);
133 loop {
134 log::debug!("Starting new connection as {}", jid);
135 match crate::client::login::client_auth(
136 server.clone(),
137 jid.clone(),
138 password.clone(),
139 timeouts,
140 )
141 .await
142 {
143 Ok((features, stream)) => {
144 log::debug!("Connection as {} established", jid);
145 let stream = stream.box_stream();
146 let Err(mut conn) = slot.send(Connection {
147 stream,
148 features,
149 identity: jid,
150 }) else {
151 // Send succeeded, we're done here.
152 return;
153 };
154
155 log::debug!("StanzaStream dropped, attempting graceful termination of fresh stream.");
156 // Send failed, i.e. the stanzastream is dead. Let's
157 // be polite and close this stream cleanly.
158 // We don't care whether that works, though, we
159 // just want to release the resources after a
160 // defined amount of time.
161 let _: Result<_, _> = tokio::time::timeout(
162 LOCAL_SHUTDOWN_TIMEOUT,
163 <XmppStream as SinkExt<&Stanza>>::close(&mut conn.stream),
164 )
165 .await;
166 return;
167 }
168 Err(e) => {
169 // TODO: auth errors should probably be fatal??
170 log::error!("Failed to connect: {}. Retrying in {:?}.", e, delay);
171 tokio::time::sleep(delay).await;
172 delay = delay * 2;
173 if delay > MAX_DELAY {
174 delay = MAX_DELAY;
175 }
176 }
177 }
178 }
179 });
180 },
181 );
182 Self::new(reconnector, queue_depth)
183 }
184
185 /// Create a new stanza stream.
186 ///
187 /// Stanza streams operate using a `connector` which is responsible for
188 /// producing a new stream whenever necessary. It is the connector's
189 /// responsibility that:
190 ///
191 /// - It never fails to send to the channel it is given. If the connector
192 /// drops the channel, the `StanzaStream` will consider this fatal and
193 /// fail the stream.
194 ///
195 /// - All streams are authenticated and secured as necessary.
196 ///
197 /// - All streams are authenticated for the same entity. If the connector
198 /// were to provide streams for different identities, information leaks
199 /// could occur as queues from previous sessions are being flushed on
200 /// the new stream on a reconnect.
201 ///
202 /// Most notably, the `connector` is **not** responsible for performing
203 /// resource binding: Resource binding is handled by the `StanzaStream`.
204 ///
205 /// `connector` will be called soon after `new()` was called to establish
206 /// the first underlying stream for the `StanzaStream`.
207 ///
208 /// The `queue_depth` controls the sizes for the incoming and outgoing
209 /// stanza queues. If the size is exceeded, the corresponding direction
210 /// will block until the queues can be flushed. Note that the respective
211 /// reverse direction is not affected (i.e. if your outgoing queue is
212 /// full for example because of a slow server, you can still receive
213 /// data).
214 pub fn new(
215 connector: Box<dyn FnMut(Option<String>, oneshot::Sender<Connection>) + Send + 'static>,
216 queue_depth: usize,
217 ) -> Self {
218 // c2f = core to frontend, f2c = frontend to core
219 let (f2c_tx, c2f_rx) = StanzaStreamWorker::spawn(connector, queue_depth);
220 Self {
221 tx: f2c_tx,
222 rx: c2f_rx,
223 }
224 }
225
226 async fn assert_send(&self, cmd: QueueEntry) {
227 match self.tx.send(cmd).await {
228 Ok(()) => (),
229 Err(_) => panic!("Stream closed or the stream's background workers have crashed."),
230 }
231 }
232
233 /// Close the stream.
234 ///
235 /// This will initiate a clean shutdown of the stream and will prevent and
236 /// cancel any more reconnection attempts.
237 pub async fn close(mut self) {
238 drop(self.tx); // closes stream.
239 while let Some(ev) = self.rx.recv().await {
240 log::trace!("discarding event {:?} after stream closure", ev);
241 }
242 }
243
244 /// Send a stanza via the stream.
245 ///
246 /// Note that completion of this function merely signals that the stanza
247 /// has been enqueued successfully: it may be stuck in the transmission
248 /// queue for quite a while if the stream is currently disconnected. The
249 /// transmission progress can be observed via the returned
250 /// [`StanzaToken`].
251 ///
252 /// # Panics
253 ///
254 /// If the stream has failed catastrophically (i.e. due to a software
255 /// bug), this function may panic.
256 pub async fn send(&self, stanza: Box<Stanza>) -> StanzaToken {
257 let (queue_entry, token) = QueueEntry::tracked(stanza);
258 self.assert_send(queue_entry).await;
259 token
260 }
261}
262
263impl Stream for StanzaStream {
264 type Item = Event;
265
266 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
267 self.rx.poll_recv(cx)
268 }
269}