tokio_xmpp/stanzastream/mod.rs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268
// Copyright (c) 2019 Emmanuel Gil Peyrot <linkmauve@linkmauve.fr>
//
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
//! # Resilient stanza stream
//!
//! This module provides the [`StanzaStream`], which is the next level up from
//! the low-level [`XmlStream`][`crate::xmlstream::XmlStream`].
//!
//! The stanza stream knows about XMPP and it most importantly knows how to
//! fix a broken connection with a reconnect and how to do this smoothly using
//! [XEP-0198 (Stream Management)](https://xmpp.org/extensions/xep-0198.html).
//! XEP-0198 is only used if the peer supports it. If the peer does not
//! support XEP-0198, automatic reconnects are still done, but with more
//! undetectable data loss.
//!
//! The main API entrypoint for the stanza stream is, unsurprisingly,
//! [`StanzaStream`].
use core::pin::Pin;
use core::task::{Context, Poll};
use core::time::Duration;
// TODO: ensure that IDs are always set on stanzas.
// TODO: figure out what to do with the mpsc::Sender<QueueEntry> on lossy
// stream reconnects. Keeping it may cause stanzas to be sent which weren't
// meant for that stream, replacing it is racy.
use futures::{SinkExt, Stream};
use tokio::sync::{mpsc, oneshot};
use xmpp_parsers::{jid::Jid, stream_features::StreamFeatures};
use crate::connect::ServerConnector;
use crate::xmlstream::Timeouts;
use crate::Stanza;
mod connected;
mod error;
mod negotiation;
mod queue;
mod stream_management;
mod worker;
use self::queue::QueueEntry;
pub use self::queue::{StanzaStage, StanzaState, StanzaToken};
pub use self::worker::{Connection, XmppStream};
use self::worker::{StanzaStreamWorker, LOCAL_SHUTDOWN_TIMEOUT};
/// Event informing about the change of the [`StanzaStream`]'s status.
#[derive(Debug)]
pub enum StreamEvent {
/// The stream was (re-)established **with** loss of state.
Reset {
/// The new JID to which the stream is bound.
bound_jid: Jid,
/// The features reported by the stream.
features: StreamFeatures,
},
/// The stream is currently inactive because a connection was lost.
///
/// Resumption without loss of state is still possible. This event is
/// merely informative and may be used to prolong timeouts or inform the
/// user that the connection is currently unstable.
Suspended,
/// The stream was reestablished **without** loss of state.
///
/// This is merely informative. Potentially useful to prolong timeouts.
Resumed,
}
/// Event emitted by the [`StanzaStream`].
///
/// Note that stream closure is not an explicit event, but the end of the
/// event stream itself.
#[derive(Debug)]
pub enum Event {
/// The streams connectivity status has changed.
Stream(StreamEvent),
/// A stanza was received over the stream.
Stanza(Stanza),
}
/// Frontend interface to a reliable, always-online stanza stream.
pub struct StanzaStream {
rx: mpsc::Receiver<Event>,
tx: mpsc::Sender<QueueEntry>,
}
impl StanzaStream {
/// Establish a new client-to-server stream using the given
/// [`ServerConnector`].
///
/// `jid` and `password` must be the user account's credentials. `jid` may
/// either be a bare JID (to let the server choose a resource) or a full
/// JID (to request a specific resource from the server, with no guarantee
/// of succcess).
///
/// `timeouts` controls the responsiveness to connection interruptions
/// on the underlying transports. Please see the [`Timeouts`] struct's
/// documentation for hints on how to correctly configure this.
///
/// The `queue_depth` controls the sizes for the incoming and outgoing
/// stanza queues. If the size is exceeded, the corresponding direction
/// will block until the queues can be flushed. Note that the respective
/// reverse direction is not affected (i.e. if your outgoing queue is
/// full for example because of a slow server, you can still receive
/// data).
pub fn new_c2s<C: ServerConnector>(
server: C,
jid: Jid,
password: String,
timeouts: Timeouts,
queue_depth: usize,
) -> Self {
let reconnector = Box::new(
move |_preferred_location: Option<String>, slot: oneshot::Sender<Connection>| {
let jid = jid.clone();
let server = server.clone();
let password = password.clone();
tokio::spawn(async move {
const MAX_DELAY: Duration = Duration::new(30, 0);
let mut delay = Duration::new(1, 0);
loop {
log::debug!("Starting new connection as {}", jid);
match crate::client::login::client_auth(
server.clone(),
jid.clone(),
password.clone(),
timeouts,
)
.await
{
Ok((features, stream)) => {
log::debug!("Connection as {} established", jid);
let stream = stream.box_stream();
let Err(mut conn) = slot.send(Connection {
stream,
features,
identity: jid,
}) else {
// Send succeeded, we're done here.
return;
};
log::debug!("StanzaStream dropped, attempting graceful termination of fresh stream.");
// Send failed, i.e. the stanzastream is dead. Let's
// be polite and close this stream cleanly.
// We don't care whether that works, though, we
// just want to release the resources after a
// defined amount of time.
let _: Result<_, _> = tokio::time::timeout(
LOCAL_SHUTDOWN_TIMEOUT,
<XmppStream as SinkExt<&Stanza>>::close(&mut conn.stream),
)
.await;
return;
}
Err(e) => {
// TODO: auth errors should probably be fatal??
log::error!("Failed to connect: {}. Retrying in {:?}.", e, delay);
tokio::time::sleep(delay).await;
delay = delay * 2;
if delay > MAX_DELAY {
delay = MAX_DELAY;
}
}
}
}
});
},
);
Self::new(reconnector, queue_depth)
}
/// Create a new stanza stream.
///
/// Stanza streams operate using a `connector` which is responsible for
/// producing a new stream whenever necessary. It is the connector's
/// responsibility that:
///
/// - It never fails to send to the channel it is given. If the connector
/// drops the channel, the `StanzaStream` will consider this fatal and
/// fail the stream.
///
/// - All streams are authenticated and secured as necessary.
///
/// - All streams are authenticated for the same entity. If the connector
/// were to provide streams for different identities, information leaks
/// could occur as queues from previous sessions are being flushed on
/// the new stream on a reconnect.
///
/// Most notably, the `connector` is **not** responsible for performing
/// resource binding: Resource binding is handled by the `StanzaStream`.
///
/// `connector` will be called soon after `new()` was called to establish
/// the first underlying stream for the `StanzaStream`.
///
/// The `queue_depth` controls the sizes for the incoming and outgoing
/// stanza queues. If the size is exceeded, the corresponding direction
/// will block until the queues can be flushed. Note that the respective
/// reverse direction is not affected (i.e. if your outgoing queue is
/// full for example because of a slow server, you can still receive
/// data).
pub fn new(
connector: Box<dyn FnMut(Option<String>, oneshot::Sender<Connection>) + Send + 'static>,
queue_depth: usize,
) -> Self {
// c2f = core to frontend, f2c = frontend to core
let (f2c_tx, c2f_rx) = StanzaStreamWorker::spawn(connector, queue_depth);
Self {
tx: f2c_tx,
rx: c2f_rx,
}
}
async fn assert_send(&self, cmd: QueueEntry) {
match self.tx.send(cmd).await {
Ok(()) => (),
Err(_) => panic!("Stream closed or the stream's background workers have crashed."),
}
}
/// Close the stream.
///
/// This will initiate a clean shutdown of the stream and will prevent and
/// cancel any more reconnection attempts.
pub async fn close(mut self) {
drop(self.tx); // closes stream.
while let Some(ev) = self.rx.recv().await {
log::trace!("discarding event {:?} after stream closure", ev);
}
}
/// Send a stanza via the stream.
///
/// Note that completion of this function merely signals that the stanza
/// has been enqueued successfully: it may be stuck in the transmission
/// queue for quite a while if the stream is currently disconnected. The
/// transmission progress can be observed via the returned
/// [`StanzaToken`].
///
/// # Panics
///
/// If the stream has failed catastrophically (i.e. due to a software
/// bug), this function may panic.
pub async fn send(&self, stanza: Box<Stanza>) -> StanzaToken {
let (queue_entry, token) = QueueEntry::tracked(stanza);
self.assert_send(queue_entry).await;
token
}
}
impl Stream for StanzaStream {
type Item = Event;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
self.rx.poll_recv(cx)
}
}