Skip to main content

tokio_xmpp/stanzastream/
mod.rs

1// Copyright (c) 2019 Emmanuel Gil Peyrot <linkmauve@linkmauve.fr>
2//
3// This Source Code Form is subject to the terms of the Mozilla Public
4// License, v. 2.0. If a copy of the MPL was not distributed with this
5// file, You can obtain one at http://mozilla.org/MPL/2.0/.
6
7//! # Resilient stanza stream
8//!
9//! This module provides the [`StanzaStream`], which is the next level up from
10//! the low-level [`XmlStream`][`crate::xmlstream::XmlStream`].
11//!
12//! The stanza stream knows about XMPP and it most importantly knows how to
13//! fix a broken connection with a reconnect and how to do this smoothly using
14//! [XEP-0198 (Stream Management)](https://xmpp.org/extensions/xep-0198.html).
15//! XEP-0198 is only used if the peer supports it. If the peer does not
16//! support XEP-0198, automatic reconnects are still done, but with more
17//! undetectable data loss.
18//!
19//! The main API entrypoint for the stanza stream is, unsurprisingly,
20//! [`StanzaStream`].
21
22use core::pin::Pin;
23use core::task::{Context, Poll};
24use core::time::Duration;
25
26// TODO: ensure that IDs are always set on stanzas.
27
28// TODO: figure out what to do with the mpsc::Sender<QueueEntry> on lossy
29// stream reconnects. Keeping it may cause stanzas to be sent which weren't
30// meant for that stream, replacing it is racy.
31
32use futures::{SinkExt, Stream};
33use std::sync::Arc;
34use tokio::sync::Mutex;
35use tokio::sync::{mpsc, oneshot};
36use xmpp_parsers::{jid::Jid, stream_features::StreamFeatures};
37
38use crate::connect::ServerConnector;
39use crate::xmlstream::Timeouts;
40use crate::Stanza;
41
42mod connected;
43mod error;
44mod negotiation;
45mod queue;
46mod stream_management;
47#[cfg(test)]
48mod tests;
49mod worker;
50
51use self::queue::QueueEntry;
52pub use self::queue::{StanzaStage, StanzaState, StanzaToken};
53pub use self::worker::{Connection, XmppStream};
54use self::worker::{StanzaStreamWorker, LOCAL_SHUTDOWN_TIMEOUT};
55
56/// Event informing about the change of the [`StanzaStream`]'s status.
57#[derive(Debug)]
58pub enum StreamEvent {
59    /// The stream was (re-)established **with** loss of state.
60    Reset {
61        /// The new JID to which the stream is bound.
62        bound_jid: Jid,
63
64        /// The features reported by the stream.
65        features: StreamFeatures,
66    },
67
68    /// The stream is currently inactive because a connection was lost.
69    ///
70    /// Resumption without loss of state is still possible. This event is
71    /// merely informative and may be used to prolong timeouts or inform the
72    /// user that the connection is currently unstable.
73    Suspended,
74
75    /// The stream was reestablished **without** loss of state.
76    ///
77    /// This is merely informative. Potentially useful to prolong timeouts.
78    Resumed,
79}
80
81/// Event emitted by the [`StanzaStream`].
82///
83/// Note that stream closure is not an explicit event, but the end of the
84/// event stream itself.
85#[derive(Debug)]
86pub enum Event {
87    /// The streams connectivity status has changed.
88    Stream(StreamEvent),
89
90    /// A stanza was received over the stream.
91    Stanza(Stanza),
92}
93
94/// Frontend interface to a reliable, always-online stanza stream.
95#[derive(Debug)]
96pub struct StanzaStream {
97    rx: mpsc::Receiver<Event>,
98    tx: mpsc::Sender<QueueEntry>,
99}
100
101impl StanzaStream {
102    /// Establish a new client-to-server stream using the given
103    /// [`ServerConnector`].
104    ///
105    /// `jid` and `password` must be the user account's credentials. `jid` may
106    /// either be a bare JID (to let the server choose a resource) or a full
107    /// JID (to request a specific resource from the server, with no guarantee
108    /// of success).
109    ///
110    /// `timeouts` controls the responsiveness to connection interruptions
111    /// on the underlying transports. Please see the [`Timeouts`] struct's
112    /// documentation for hints on how to correctly configure this.
113    ///
114    /// The `queue_depth` controls the sizes for the incoming and outgoing
115    /// stanza queues. If the size is exceeded, the corresponding direction
116    /// will block until the queues can be flushed. Note that the respective
117    /// reverse direction is not affected (i.e. if your outgoing queue is
118    /// full for example because of a slow server, you can still receive
119    /// data).
120    pub fn new_c2s<C: ServerConnector>(
121        server: C,
122        jid: Jid,
123        password: String,
124        timeouts: Timeouts,
125        queue_depth: usize,
126    ) -> Self {
127        let reconnector = Box::new(
128            move |_preferred_location: Option<String>, slot: oneshot::Sender<Connection>| {
129                let jid = jid.clone();
130                let server = server.clone();
131                let password = password.clone();
132                tokio::spawn(async move {
133                    const MAX_DELAY: Duration = Duration::new(30, 0);
134                    let mut delay = Duration::new(1, 0);
135                    loop {
136                        log::debug!("Starting new connection as {}", jid);
137                        match crate::client::login::client_auth(
138                            server.clone(),
139                            jid.clone(),
140                            password.clone(),
141                            timeouts,
142                        )
143                        .await
144                        {
145                            Ok((features, stream)) => {
146                                log::debug!("Connection as {} established", jid);
147                                let stream = stream.box_stream();
148                                let Err(mut conn) = slot.send(Connection {
149                                    stream,
150                                    features,
151                                    identity: jid,
152                                }) else {
153                                    // Send succeeded, we're done here.
154                                    return;
155                                };
156
157                                log::debug!("StanzaStream dropped, attempting graceful termination of fresh stream.");
158                                // Send failed, i.e. the stanzastream is dead. Let's
159                                // be polite and close this stream cleanly.
160                                // We don't care whether that works, though, we
161                                // just want to release the resources after a
162                                // defined amount of time.
163                                let _: Result<_, _> = tokio::time::timeout(
164                                    LOCAL_SHUTDOWN_TIMEOUT,
165                                    <XmppStream as SinkExt<&Stanza>>::close(&mut conn.stream),
166                                )
167                                .await;
168                                return;
169                            }
170                            Err(e) => {
171                                // TODO: auth errors should probably be fatal??
172                                log::error!("Failed to connect: {}. Retrying in {:?}.", e, delay);
173                                tokio::time::sleep(delay).await;
174                                delay *= 2;
175                                if delay > MAX_DELAY {
176                                    delay = MAX_DELAY;
177                                }
178                            }
179                        }
180                    }
181                });
182            },
183        );
184        Self::new(reconnector, queue_depth)
185    }
186
187    /// Create a new stanza stream.
188    ///
189    /// Stanza streams operate using a `connector` which is responsible for
190    /// producing a new stream whenever necessary. It is the connector's
191    /// responsibility that:
192    ///
193    /// - It never fails to send to the channel it is given. If the connector
194    ///   drops the channel, the `StanzaStream` will consider this fatal and
195    ///   fail the stream.
196    ///
197    /// - All streams are authenticated and secured as necessary.
198    ///
199    /// - All streams are authenticated for the same entity. If the connector
200    ///   were to provide streams for different identities, information leaks
201    ///   could occur as queues from previous sessions are being flushed on
202    ///   the new stream on a reconnect.
203    ///
204    /// Most notably, the `connector` is **not** responsible for performing
205    /// resource binding: Resource binding is handled by the `StanzaStream`.
206    ///
207    /// `connector` will be called soon after `new()` was called to establish
208    /// the first underlying stream for the `StanzaStream`.
209    ///
210    /// The `queue_depth` controls the sizes for the incoming and outgoing
211    /// stanza queues. If the size is exceeded, the corresponding direction
212    /// will block until the queues can be flushed. Note that the respective
213    /// reverse direction is not affected (i.e. if your outgoing queue is
214    /// full for example because of a slow server, you can still receive
215    /// data).
216    pub fn new(
217        connector: Box<dyn FnMut(Option<String>, oneshot::Sender<Connection>) + Send + 'static>,
218        queue_depth: usize,
219    ) -> Self {
220        // c2f = core to frontend, f2c = frontend to core
221        let (f2c_tx, c2f_rx) = StanzaStreamWorker::spawn(connector, queue_depth);
222        Self {
223            tx: f2c_tx,
224            rx: c2f_rx,
225        }
226    }
227
228    async fn assert_send(&self, cmd: QueueEntry) {
229        match self.tx.send(cmd).await {
230            Ok(()) => (),
231            Err(_) => panic!("Stream closed or the stream's background workers have crashed."),
232        }
233    }
234
235    /// Close the stream.
236    ///
237    /// This will initiate a clean shutdown of the stream and will prevent and
238    /// cancel any more reconnection attempts.
239    pub async fn close(mut self) {
240        drop(self.tx); // closes stream.
241        while let Some(ev) = self.rx.recv().await {
242            log::trace!("discarding event {:?} after stream closure", ev);
243        }
244    }
245
246    /// Send a stanza via the stream.
247    ///
248    /// Note that completion of this function merely signals that the stanza
249    /// has been enqueued successfully: it may be stuck in the transmission
250    /// queue for quite a while if the stream is currently disconnected. The
251    /// transmission progress can be observed via the returned
252    /// [`StanzaToken`].
253    ///
254    /// # Panics
255    ///
256    /// If the stream has failed catastrophically (i.e. due to a software
257    /// bug), this function may panic.
258    pub async fn send(&self, stanza: Box<Stanza>) -> StanzaToken {
259        let (queue_entry, token) = QueueEntry::tracked(stanza);
260        self.assert_send(queue_entry).await;
261        token
262    }
263
264    /// Split the stream into the [`StanzaSender`] and [`StanzaReceiver`].
265    pub fn split(self) -> (StanzaSender, StanzaReceiver) {
266        let stream = Arc::new(Mutex::new(self));
267
268        let tx = StanzaSender(stream.clone());
269        let rx = StanzaReceiver(stream);
270
271        (tx, rx)
272    }
273
274    /// Reunite the [`StanzaSender`] and [`StanzaReceiver`] back into a single stream.
275    ///
276    /// # Panics
277    ///
278    /// This function panics if the `Sender` and `Receiver` don't come from
279    /// the same [`Stream`].
280    pub fn reunite(tx: StanzaSender, rx: StanzaReceiver) -> Self {
281        assert!(
282            Arc::ptr_eq(&tx.0, &rx.0),
283            "Unrelated Sender and Receiver passed to reunite."
284        );
285
286        drop(tx);
287
288        let inner = Arc::try_unwrap(rx.0).expect("Failed to unwrap Receiver Arc");
289        inner.into_inner()
290    }
291}
292
293impl Stream for StanzaStream {
294    type Item = Event;
295
296    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
297        self.rx.poll_recv(cx)
298    }
299}
300
301/// Send half of the [`StanzaStream`]
302#[derive(Debug)]
303pub struct StanzaSender(pub(super) Arc<Mutex<StanzaStream>>);
304
305impl StanzaSender {
306    /// Send a stanza via the stream.
307    ///
308    /// See the documentation of [`StanzaStream::send()`].
309    pub async fn send(&self, stanza: Box<Stanza>) -> StanzaToken {
310        self.0.lock().await.send(stanza).await
311    }
312}
313
314/// Receive half of the [`StanzaStream`]
315#[derive(Debug)]
316pub struct StanzaReceiver(pub(super) Arc<Mutex<StanzaStream>>);
317
318impl Stream for StanzaReceiver {
319    type Item = Event;
320
321    fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
322        let Ok(mut stream) = self.0.try_lock() else {
323            return Poll::Pending;
324        };
325
326        stream.rx.poll_recv(cx)
327    }
328}