Skip to main content

tokio_xmpp/xmlstream/
mod.rs

1// Copyright (c) 2024 Jonas Schäfer <jonas@zombofant.net>
2//
3// This Source Code Form is subject to the terms of the Mozilla Public
4// License, v. 2.0. If a copy of the MPL was not distributed with this
5// file, You can obtain one at http://mozilla.org/MPL/2.0/.
6
7//! # RFC 6120 XML Streams
8//!
9//! **Note:** The XML stream is a low-level API which you should probably not
10//! use directly. You may be looking for
11//! [`StanzaStream`][`crate::stanzastream::StanzaStream`] instead.
12//!
13//! Establishing an XML stream is always a multi-stage process due to how
14//! stream negotiation works. Based on the values sent by the initiator in the
15//! stream header, the responder may choose to offer different features.
16//!
17//! In order to allow this, the following multi-step processes are defined.
18//!
19//! ## Initiating an XML stream
20//!
21//! To initiate an XML stream, you need to:
22//!
23//! 1. Call [`initiate_stream`] to obtain the [`PendingFeaturesRecv`] object.
24//!    That object holds the stream header sent by the peer for inspection.
25//! 2. Call [`PendingFeaturesRecv::recv_features`] if you are content with
26//!    the content of the stream header to obtain the [`XmlStream`] object and
27//!    the features sent by the peer.
28//!
29//! ## Accepting an XML stream connection
30//!
31//! To accept an XML stream, you need to:
32//!
33//! 1. Call [`accept_stream`] to obtain the [`AcceptedStream`] object.
34//!    That object holds the stream header sent by the peer for inspection.
35//! 2. Call [`AcceptedStream::send_header`] if you are content with
36//!    the content of the stream header to obtain the [`PendingFeaturesSend`]
37//!    object.
38//! 3. Call [`PendingFeaturesSend::send_features`] to send the stream features
39//!    to the peer and obtain the [`XmlStream`] object.
40//!
41//! ## Mid-stream resets
42//!
43//! RFC 6120 describes a couple of situations where stream resets are executed
44//! during stream negotiation. During a stream reset, both parties drop their
45//! parser state and the stream is started from the beginning, with a new
46//! stream header sent by the initiator and received by the responder.
47//!
48//! Stream resets are inherently prone to race conditions. If the responder
49//! executes a read from the underlying transport between sending the element
50//! which triggers the stream reset and discarding its parser state, it may
51//! accidentally read the initiator's stream header into the *old* parser
52//! state instead of the post-reset parser state.
53//!
54//! Stream resets are executed with the [`XmlStream::initiate_reset`] and
55//! [`XmlStream::accept_reset`] functions, for initiator and responder,
56//! respectively. In order to avoid the race condition,
57//! [`XmlStream::accept_reset`] handles sending the last pre-reset element and
58//! resetting the stream in a single step.
59
60use core::fmt;
61use core::future::Future;
62use core::pin::Pin;
63use core::task::{Context, Poll};
64use std::io;
65#[cfg(feature = "syntax-highlighting")]
66use std::sync::LazyLock;
67
68use futures::{ready, Sink, SinkExt, Stream};
69
70use tokio::io::{AsyncBufRead, AsyncWrite};
71
72use xso::{AsXml, FromXml, Item};
73
74use crate::connect::AsyncReadAndWrite;
75
76mod capture;
77mod common;
78mod initiator;
79mod responder;
80#[cfg(test)]
81mod tests;
82pub(crate) mod xmpp;
83
84use self::common::{RawError, RawXmlStream, ReadXsoError, ReadXsoState};
85pub use self::common::{StreamHeader, Timeouts};
86pub use self::initiator::{InitiatingStream, PendingFeaturesRecv, RecvFeaturesError};
87pub use self::responder::{AcceptedStream, PendingFeaturesSend};
88pub use self::xmpp::{
89    FallibleStreamElement, RawStanzaHeader, StreamElementError, XmppStreamElement,
90};
91
92#[cfg(feature = "syntax-highlighting")]
93static PS: LazyLock<syntect::parsing::SyntaxSet> =
94    LazyLock::new(syntect::parsing::SyntaxSet::load_defaults_newlines);
95
96#[cfg(feature = "syntax-highlighting")]
97static SYNTAX: LazyLock<&syntect::parsing::SyntaxReference> =
98    LazyLock::new(|| PS.find_syntax_by_extension("xml").unwrap());
99
100#[cfg(feature = "syntax-highlighting")]
101static THEME: LazyLock<syntect::highlighting::Theme> = LazyLock::new(|| {
102    syntect::highlighting::ThemeSet::load_defaults().themes["Solarized (dark)"].clone()
103});
104
105#[cfg(feature = "syntax-highlighting")]
106fn highlight_xml(xml: &str) -> String {
107    let mut h = syntect::easy::HighlightLines::new(&SYNTAX, &THEME);
108    let ranges: Vec<_> = h.highlight_line(&xml, &PS).unwrap();
109    let mut escaped = syntect::util::as_24_bit_terminal_escaped(&ranges[..], false);
110    escaped += "\x1b[0m";
111    escaped
112}
113
114struct LogXsoBuf<'x>(&'x [u8]);
115
116impl fmt::Display for LogXsoBuf<'_> {
117    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
118        // We always generate UTF-8, so this should be good... I think.
119        let text = core::str::from_utf8(self.0).unwrap();
120        #[cfg(feature = "syntax-highlighting")]
121        let text = highlight_xml(text);
122        f.write_str(&text)
123    }
124}
125
126/// Initiate a new stream
127///
128/// Initiate a new stream using the given I/O object `io`. The default
129/// XML namespace will be set to `stream_ns` and the stream header will use
130/// the attributes as set in `stream_header`, along with version `1.0`.
131///
132/// The returned object contains the stream header sent by the remote side
133/// as well as the internal parser state to continue the negotiation.
134pub async fn initiate_stream<Io: AsyncBufRead + AsyncWrite + Unpin>(
135    io: Io,
136    stream_ns: &'static str,
137    stream_header: StreamHeader<'_>,
138    timeouts: Timeouts,
139) -> Result<PendingFeaturesRecv<Io>, io::Error> {
140    let stream = InitiatingStream(RawXmlStream::new(io, stream_ns, timeouts));
141    stream.send_header(stream_header).await
142}
143
144/// Accept a new XML stream as responder
145///
146/// Prepares the responer side of an XML stream using the given I/O object
147/// `io`. The default XML namespace will be set to `stream_ns`.
148///
149/// The returned object contains the stream header sent by the remote side
150/// as well as the internal parser state to continue the negotiation.
151pub async fn accept_stream<Io: AsyncBufRead + AsyncWrite + Unpin>(
152    io: Io,
153    stream_ns: &'static str,
154    timeouts: Timeouts,
155) -> Result<AcceptedStream<Io>, io::Error> {
156    let mut stream = RawXmlStream::new(io, stream_ns, timeouts);
157    let header = StreamHeader::recv(Pin::new(&mut stream)).await?;
158    Ok(AcceptedStream { stream, header })
159}
160
161/// A non-success state which may occur while reading an XSO from a
162/// [`XmlStream`]
163#[derive(Debug)]
164pub enum ReadError {
165    /// The soft timeout of the stream triggered.
166    ///
167    /// User code should handle this by sending something into the stream
168    /// which causes the peer to send data before the hard timeout triggers.
169    SoftTimeout,
170
171    /// An I/O error occurred in the underlying I/O object.
172    ///
173    /// This is generally fatal.
174    HardError(io::Error),
175
176    /// A parse error occurred while processing the XSO.
177    ///
178    /// This is non-fatal and more XSOs may be read from the stream.
179    ParseError(xso::error::Error),
180
181    /// The stream footer was received.
182    ///
183    /// Any future read attempts will again return this error. The stream has
184    /// been closed by the peer and you should probably close it, too.
185    StreamFooterReceived,
186}
187
188impl fmt::Display for ReadError {
189    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
190        match self {
191            ReadError::SoftTimeout => write!(f, "soft timeout"),
192            ReadError::HardError(e) => write!(f, "hard error: {}", e),
193            ReadError::ParseError(e) => write!(f, "parse error: {}", e),
194            ReadError::StreamFooterReceived => write!(f, "stream footer received"),
195        }
196    }
197}
198
199impl core::error::Error for ReadError {
200    fn source(&self) -> Option<&(dyn core::error::Error + 'static)> {
201        match self {
202            ReadError::HardError(e) => Some(e),
203            ReadError::ParseError(e) => Some(e),
204            _ => None,
205        }
206    }
207}
208
209enum WriteState {
210    Open,
211    SendElementFoot,
212    FooterSent,
213    Failed,
214}
215
216impl WriteState {
217    fn check_ok(&self) -> io::Result<()> {
218        match self {
219            WriteState::Failed => Err(io::Error::new(
220                io::ErrorKind::NotConnected,
221                "XML stream sink unusable because of previous write error",
222            )),
223            WriteState::Open | WriteState::SendElementFoot | WriteState::FooterSent => Ok(()),
224        }
225    }
226
227    fn check_writable(&self) -> io::Result<()> {
228        match self {
229            WriteState::SendElementFoot | WriteState::FooterSent => Err(io::Error::new(
230                io::ErrorKind::NotConnected,
231                "stream footer already sent",
232            )),
233            WriteState::Failed | WriteState::Open => self.check_ok(),
234        }
235    }
236}
237
238pin_project_lite::pin_project! {
239    /// XML stream
240    ///
241    /// This struct represents an
242    /// [RFC 6120](https://tools.ietf.org/html/rfc6120) XML stream, where the
243    /// payload consists of items of type `T` implementing [`FromXml`] and
244    /// [`AsXml`].
245    ///
246    /// **Note:** The XML stream is a low-level API which you should probably
247    /// not use directly. You may be looking for
248    /// [`StanzaStream`][`crate::stanzastream::StnazaStream`] instead.
249    pub struct XmlStream<Io, T: FromXml> {
250        #[pin]
251        inner: RawXmlStream<Io>,
252        read_state: Option<ReadXsoState<T>>,
253        write_state: WriteState,
254    }
255}
256
257impl<Io, T: FromXml> XmlStream<Io, T> {
258    /// Obtain a reference to the `Io` stream.
259    pub fn get_stream(&self) -> &Io {
260        self.inner.get_stream()
261    }
262}
263
264impl<Io: AsyncBufRead, T: FromXml> XmlStream<Io, T> {
265    fn wrap(inner: RawXmlStream<Io>) -> Self {
266        Self {
267            inner,
268            read_state: Some(ReadXsoState::default()),
269            write_state: WriteState::Open,
270        }
271    }
272
273    fn assert_retypable(&self) {
274        match self.read_state {
275            Some(ReadXsoState::PreData) => (),
276            Some(_) => panic!("cannot reset stream: XSO parsing in progress!"),
277            None => panic!("cannot reset stream: stream footer received!"),
278        }
279        match self.write_state.check_writable() {
280            Ok(()) => (),
281            Err(e) => panic!("cannot reset stream: {}", e),
282        }
283    }
284}
285
286impl<Io: AsyncBufRead + AsyncWrite + Unpin, T: FromXml + fmt::Debug> XmlStream<Io, T> {
287    /// Initiate a stream reset
288    ///
289    /// To actually send the stream header, call
290    /// [`send_header`][`InitiatingStream::send_header`] on the result.
291    ///
292    /// # Panics
293    ///
294    /// Attempting to reset the stream while an object is being received will
295    /// panic. This can generally only happen if you call `poll_next`
296    /// directly, as doing that is otherwise prevented by the borrowchecker.
297    ///
298    /// In addition, attempting to reset a stream which has been closed by
299    /// either side or which has had an I/O error will also cause a panic.
300    pub fn initiate_reset(self) -> InitiatingStream<Io> {
301        self.assert_retypable();
302
303        let mut stream = self.inner;
304        Pin::new(&mut stream).reset_state();
305        InitiatingStream(stream)
306    }
307
308    /// Trigger a stream reset on the initiator side and await the new stream
309    /// header.
310    ///
311    /// This is the responder-side counterpart to
312    /// [`initiate_reset`][`Self::initiate_reset`]. The element which causes
313    /// the stream reset must be passed as `barrier` and it will be sent
314    /// right before resetting the parser state. This way, the race condition
315    /// outlined in the [`xmlstream`][`self`] module's documentation is
316    /// guaranteed to be avoided.
317    ///
318    /// Note that you should not send the element passed as `barrier` down the
319    /// stream yourself, as this function takes care of it.
320    ///
321    /// # Stream resets without a triggering element
322    ///
323    /// These are not possible to do safely and not specified in RFC 6120,
324    /// hence they cannot be done in [`XmlStream`].
325    ///
326    /// # Panics
327    ///
328    /// Attempting to reset the stream while an object is being received will
329    /// panic. This can generally only happen if you call `poll_next`
330    /// directly, as doing that is otherwise prevented by the borrowchecker.
331    ///
332    /// In addition, attempting to reset a stream which has been closed by
333    /// either side or which has had an I/O error will also cause a panic.
334    pub async fn accept_reset<U: AsXml>(mut self, barrier: &U) -> io::Result<AcceptedStream<Io>> {
335        self.assert_retypable();
336        self.send(barrier).await?;
337
338        let mut stream = self.inner;
339        Pin::new(&mut stream).reset_state();
340        let header = StreamHeader::recv(Pin::new(&mut stream)).await?;
341        Ok(AcceptedStream { stream, header })
342    }
343
344    /// Discard all XML state and return the inner I/O object.
345    pub fn into_inner(self) -> Io {
346        self.assert_retypable();
347        self.inner.into_inner()
348    }
349
350    /// Box the underlying transport stream.
351    ///
352    /// This removes the specific type of the transport from the XML stream's
353    /// type signature.
354    pub fn box_stream(self) -> XmlStream<Box<dyn AsyncReadAndWrite + Send + 'static>, T>
355    where
356        Io: AsyncReadAndWrite + Send + 'static,
357    {
358        XmlStream {
359            inner: self.inner.box_stream(),
360            read_state: self.read_state,
361            write_state: self.write_state,
362        }
363    }
364}
365
366impl<Io: AsyncBufRead, T: FromXml + fmt::Debug> Stream for XmlStream<Io, T> {
367    type Item = Result<T, ReadError>;
368
369    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
370        let mut this = self.project();
371        let result = match this.read_state.as_mut() {
372            None => {
373                // awaiting eof.
374                return loop {
375                    match ready!(this.inner.as_mut().poll_next(cx)) {
376                        None => break Poll::Ready(None),
377                        Some(Ok(_)) => unreachable!("xml parser allowed data after stream footer"),
378                        Some(Err(RawError::Io(e))) => {
379                            break Poll::Ready(Some(Err(ReadError::HardError(e))))
380                        }
381                        // Swallow soft timeout, we don't want the user to trigger
382                        // anything here.
383                        Some(Err(RawError::SoftTimeout)) => continue,
384                    }
385                };
386            }
387            Some(read_state) => ready!(read_state.poll_advance(this.inner, cx)),
388        };
389        let result = match result {
390            Ok(v) => Poll::Ready(Some(Ok(v))),
391            Err(ReadXsoError::Hard(e)) => Poll::Ready(Some(Err(ReadError::HardError(e)))),
392            Err(ReadXsoError::Parse(e)) => Poll::Ready(Some(Err(ReadError::ParseError(e)))),
393            Err(ReadXsoError::Footer) => {
394                *this.read_state = None;
395                // Return early here, because we cannot allow recreation of
396                // another read state.
397                return Poll::Ready(Some(Err(ReadError::StreamFooterReceived)));
398            }
399            Err(ReadXsoError::SoftTimeout) => Poll::Ready(Some(Err(ReadError::SoftTimeout))),
400        };
401        *this.read_state = Some(ReadXsoState::default());
402        result
403    }
404}
405
406impl<Io: AsyncWrite, T: FromXml> XmlStream<Io, T> {
407    /// Initiate stream shutdown and poll for completion.
408    ///
409    /// Please see [`Self::shutdown`] for details.
410    pub fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), io::Error>> {
411        let mut this = self.project();
412        this.write_state.check_ok()?;
413        loop {
414            match this.write_state {
415                // Open => initiate closing.
416                WriteState::Open => {
417                    *this.write_state = WriteState::SendElementFoot;
418                }
419                // Sending => wait for readiness, then send.
420                WriteState::SendElementFoot => {
421                    match ready!(this.inner.as_mut().poll_ready(cx))
422                        .and_then(|_| this.inner.as_mut().start_send(Item::ElementFoot))
423                    {
424                        Ok(()) => {
425                            log::trace!("stream footer sent successfully");
426                        }
427                        // If it fails, we fail the sink immediately.
428                        Err(e) => {
429                            log::debug!(
430                                "omitting stream footer: failed to make stream ready: {}",
431                                e
432                            );
433                            *this.write_state = WriteState::Failed;
434                            return Poll::Ready(Err(e));
435                        }
436                    }
437                    *this.write_state = WriteState::FooterSent;
438                }
439                // Footer sent => just close the inner stream.
440                WriteState::FooterSent => break,
441                WriteState::Failed => unreachable!(), // caught by check_ok()
442            }
443        }
444        this.inner.poll_shutdown(cx)
445    }
446}
447
448impl<Io: AsyncWrite + Unpin, T: FromXml> XmlStream<Io, T> {
449    /// Send the stream footer and close the sender side of the underlying
450    /// transport.
451    ///
452    /// Unlike `poll_close` (from the `Sink` impls), this will not close the
453    /// receiving side of the underlying the transport. It is advisable to call
454    /// `poll_close` eventually after `poll_shutdown` in order to gracefully
455    /// handle situations where the remote side does not close the stream
456    /// cleanly.
457    pub fn shutdown(&mut self) -> Shutdown<'_, Io, T> {
458        Shutdown {
459            stream: Pin::new(self),
460        }
461    }
462}
463
464impl<'x, Io: AsyncWrite, T: FromXml, U: AsXml> Sink<&'x U> for XmlStream<Io, T> {
465    type Error = io::Error;
466
467    fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
468        let this = self.project();
469        this.write_state.check_writable()?;
470        this.inner.poll_ready(cx)
471    }
472
473    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
474        let this = self.project();
475        this.write_state.check_writable()?;
476        this.inner.poll_flush(cx)
477    }
478
479    fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
480        ready!(self.as_mut().poll_shutdown(cx))?;
481        let this = self.project();
482        this.inner.poll_close(cx)
483    }
484
485    fn start_send(self: Pin<&mut Self>, item: &'x U) -> Result<(), Self::Error> {
486        let this = self.project();
487        this.write_state.check_writable()?;
488        this.inner.start_send_xso(item)
489    }
490}
491
492/// Future implementing [`XmlStream::shutdown`] using
493/// [`XmlStream::poll_shutdown`].
494pub struct Shutdown<'a, Io: AsyncWrite, T: FromXml> {
495    stream: Pin<&'a mut XmlStream<Io, T>>,
496}
497
498impl<Io: AsyncWrite, T: FromXml> Future for Shutdown<'_, Io, T> {
499    type Output = io::Result<()>;
500
501    fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
502        self.stream.as_mut().poll_shutdown(cx)
503    }
504}
505
506/// Convenience alias for an XML stream using [`XmppStreamElement`].
507pub type XmppStream<Io> = XmlStream<Io, FallibleStreamElement>;