tokio_xmpp/xmlstream/
mod.rs

1// Copyright (c) 2024 Jonas Schäfer <jonas@zombofant.net>
2//
3// This Source Code Form is subject to the terms of the Mozilla Public
4// License, v. 2.0. If a copy of the MPL was not distributed with this
5// file, You can obtain one at http://mozilla.org/MPL/2.0/.
6
7//! # RFC 6120 XML Streams
8//!
9//! **Note:** The XML stream is a low-level API which you should probably not
10//! use directly. You may be looking for
11//! [`StanzaStream`][`crate::stanzastream::StanzaStream`] instead.
12//!
13//! Establishing an XML stream is always a multi-stage process due to how
14//! stream negotiation works. Based on the values sent by the initiator in the
15//! stream header, the responder may choose to offer different features.
16//!
17//! In order to allow this, the following multi-step processes are defined.
18//!
19//! ## Initiating an XML stream
20//!
21//! To initiate an XML stream, you need to:
22//!
23//! 1. Call [`initiate_stream`] to obtain the [`PendingFeaturesRecv`] object.
24//!    That object holds the stream header sent by the peer for inspection.
25//! 2. Call [`PendingFeaturesRecv::recv_features`] if you are content with
26//!    the content of the stream header to obtain the [`XmlStream`] object and
27//!    the features sent by the peer.
28//!
29//! ## Accepting an XML stream connection
30//!
31//! To accept an XML stream, you need to:
32//!
33//! 1. Call [`accept_stream`] to obtain the [`AcceptedStream`] object.
34//!    That object holds the stream header sent by the peer for inspection.
35//! 2. Call [`AcceptedStream::send_header`] if you are content with
36//!    the content of the stream header to obtain the [`PendingFeaturesSend`]
37//!    object.
38//! 3. Call [`PendingFeaturesSend::send_features`] to send the stream features
39//!    to the peer and obtain the [`XmlStream`] object.
40//!
41//! ## Mid-stream resets
42//!
43//! RFC 6120 describes a couple of situations where stream resets are executed
44//! during stream negotiation. During a stream reset, both parties drop their
45//! parser state and the stream is started from the beginning, with a new
46//! stream header sent by the initiator and received by the responder.
47//!
48//! Stream resets are inherently prone to race conditions. If the responder
49//! executes a read from the underlying transport between sending the element
50//! which triggers the stream reset and discarding its parser state, it may
51//! accidentally read the initiator's stream header into the *old* parser
52//! state instead of the post-reset parser state.
53//!
54//! Stream resets are executed with the [`XmlStream::initiate_reset`] and
55//! [`XmlStream::accept_reset`] functions, for initiator and responder,
56//! respectively. In order to avoid the race condition,
57//! [`XmlStream::accept_reset`] handles sending the last pre-reset element and
58//! resetting the stream in a single step.
59
60use core::fmt;
61use core::future::Future;
62use core::pin::Pin;
63use core::task::{Context, Poll};
64use std::io;
65#[cfg(feature = "syntax-highlighting")]
66use std::sync::LazyLock;
67
68use futures::{ready, Sink, SinkExt, Stream};
69
70use tokio::io::{AsyncBufRead, AsyncWrite};
71
72use xso::{AsXml, FromXml, Item};
73
74use crate::connect::AsyncReadAndWrite;
75
76mod capture;
77mod common;
78mod initiator;
79mod responder;
80#[cfg(test)]
81mod tests;
82pub(crate) mod xmpp;
83
84use self::common::{RawError, RawXmlStream, ReadXsoError, ReadXsoState};
85pub use self::common::{StreamHeader, Timeouts};
86pub use self::initiator::{InitiatingStream, PendingFeaturesRecv};
87pub use self::responder::{AcceptedStream, PendingFeaturesSend};
88pub use self::xmpp::XmppStreamElement;
89
90#[cfg(feature = "syntax-highlighting")]
91static PS: LazyLock<syntect::parsing::SyntaxSet> =
92    LazyLock::new(syntect::parsing::SyntaxSet::load_defaults_newlines);
93
94#[cfg(feature = "syntax-highlighting")]
95static SYNTAX: LazyLock<&syntect::parsing::SyntaxReference> =
96    LazyLock::new(|| PS.find_syntax_by_extension("xml").unwrap());
97
98#[cfg(feature = "syntax-highlighting")]
99static THEME: LazyLock<syntect::highlighting::Theme> = LazyLock::new(|| {
100    syntect::highlighting::ThemeSet::load_defaults().themes["Solarized (dark)"].clone()
101});
102
103#[cfg(feature = "syntax-highlighting")]
104fn highlight_xml(xml: &str) -> String {
105    let mut h = syntect::easy::HighlightLines::new(&SYNTAX, &THEME);
106    let ranges: Vec<_> = h.highlight_line(&xml, &PS).unwrap();
107    let mut escaped = syntect::util::as_24_bit_terminal_escaped(&ranges[..], false);
108    escaped += "\x1b[0m";
109    escaped
110}
111
112struct LogXsoBuf<'x>(&'x [u8]);
113
114impl<'x> fmt::Display for LogXsoBuf<'x> {
115    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
116        // We always generate UTF-8, so this should be good... I think.
117        let text = core::str::from_utf8(&self.0).unwrap();
118        #[cfg(feature = "syntax-highlighting")]
119        let text = highlight_xml(text);
120        f.write_str(&text)
121    }
122}
123
124/// Initiate a new stream
125///
126/// Initiate a new stream using the given I/O object `io`. The default
127/// XML namespace will be set to `stream_ns` and the stream header will use
128/// the attributes as set in `stream_header`, along with version `1.0`.
129///
130/// The returned object contains the stream header sent by the remote side
131/// as well as the internal parser state to continue the negotiation.
132pub async fn initiate_stream<Io: AsyncBufRead + AsyncWrite + Unpin>(
133    io: Io,
134    stream_ns: &'static str,
135    stream_header: StreamHeader<'_>,
136    timeouts: Timeouts,
137) -> Result<PendingFeaturesRecv<Io>, io::Error> {
138    let stream = InitiatingStream(RawXmlStream::new(io, stream_ns, timeouts));
139    stream.send_header(stream_header).await
140}
141
142/// Accept a new XML stream as responder
143///
144/// Prepares the responer side of an XML stream using the given I/O object
145/// `io`. The default XML namespace will be set to `stream_ns`.
146///
147/// The returned object contains the stream header sent by the remote side
148/// as well as the internal parser state to continue the negotiation.
149pub async fn accept_stream<Io: AsyncBufRead + AsyncWrite + Unpin>(
150    io: Io,
151    stream_ns: &'static str,
152    timeouts: Timeouts,
153) -> Result<AcceptedStream<Io>, io::Error> {
154    let mut stream = RawXmlStream::new(io, stream_ns, timeouts);
155    let header = StreamHeader::recv(Pin::new(&mut stream)).await?;
156    Ok(AcceptedStream { stream, header })
157}
158
159/// A non-success state which may occur while reading an XSO from a
160/// [`XmlStream`]
161#[derive(Debug)]
162pub enum ReadError {
163    /// The soft timeout of the stream triggered.
164    ///
165    /// User code should handle this by sending something into the stream
166    /// which causes the peer to send data before the hard timeout triggers.
167    SoftTimeout,
168
169    /// An I/O error occurred in the underlying I/O object.
170    ///
171    /// This is generally fatal.
172    HardError(io::Error),
173
174    /// A parse error occurred while processing the XSO.
175    ///
176    /// This is non-fatal and more XSOs may be read from the stream.
177    ParseError(xso::error::Error),
178
179    /// The stream footer was received.
180    ///
181    /// Any future read attempts will again return this error. The stream has
182    /// been closed by the peer and you should probably close it, too.
183    StreamFooterReceived,
184}
185
186impl fmt::Display for ReadError {
187    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
188        match self {
189            ReadError::SoftTimeout => write!(f, "soft timeout"),
190            ReadError::HardError(e) => write!(f, "hard error: {}", e),
191            ReadError::ParseError(e) => write!(f, "parse error: {}", e),
192            ReadError::StreamFooterReceived => write!(f, "stream footer received"),
193        }
194    }
195}
196
197impl core::error::Error for ReadError {
198    fn source(&self) -> Option<&(dyn core::error::Error + 'static)> {
199        match self {
200            ReadError::HardError(e) => Some(e),
201            ReadError::ParseError(e) => Some(e),
202            _ => None,
203        }
204    }
205}
206
207enum WriteState {
208    Open,
209    SendElementFoot,
210    FooterSent,
211    Failed,
212}
213
214impl WriteState {
215    fn check_ok(&self) -> io::Result<()> {
216        match self {
217            WriteState::Failed => Err(io::Error::new(
218                io::ErrorKind::NotConnected,
219                "XML stream sink unusable because of previous write error",
220            )),
221            WriteState::Open | WriteState::SendElementFoot | WriteState::FooterSent => Ok(()),
222        }
223    }
224
225    fn check_writable(&self) -> io::Result<()> {
226        match self {
227            WriteState::SendElementFoot | WriteState::FooterSent => Err(io::Error::new(
228                io::ErrorKind::NotConnected,
229                "stream footer already sent",
230            )),
231            WriteState::Failed | WriteState::Open => self.check_ok(),
232        }
233    }
234}
235
236pin_project_lite::pin_project! {
237    /// XML stream
238    ///
239    /// This struct represents an
240    /// [RFC 6120](https://tools.ietf.org/html/rfc6120) XML stream, where the
241    /// payload consists of items of type `T` implementing [`FromXml`] and
242    /// [`AsXml`].
243    ///
244    /// **Note:** The XML stream is a low-level API which you should probably
245    /// not use directly. You may be looking for
246    /// [`StanzaStream`][`crate::stanzastream::StnazaStream`] instead.
247    pub struct XmlStream<Io, T: FromXml> {
248        #[pin]
249        inner: RawXmlStream<Io>,
250        read_state: Option<ReadXsoState<T>>,
251        write_state: WriteState,
252    }
253}
254
255impl<Io, T: FromXml> XmlStream<Io, T> {
256    /// Obtain a reference to the `Io` stream.
257    pub fn get_stream(&self) -> &Io {
258        self.inner.get_stream()
259    }
260}
261
262impl<Io: AsyncBufRead, T: FromXml + AsXml> XmlStream<Io, T> {
263    fn wrap(inner: RawXmlStream<Io>) -> Self {
264        Self {
265            inner,
266            read_state: Some(ReadXsoState::default()),
267            write_state: WriteState::Open,
268        }
269    }
270
271    fn assert_retypable(&self) {
272        match self.read_state {
273            Some(ReadXsoState::PreData) => (),
274            Some(_) => panic!("cannot reset stream: XSO parsing in progress!"),
275            None => panic!("cannot reset stream: stream footer received!"),
276        }
277        match self.write_state.check_writable() {
278            Ok(()) => (),
279            Err(e) => panic!("cannot reset stream: {}", e),
280        }
281    }
282}
283
284impl<Io: AsyncBufRead + AsyncWrite + Unpin, T: FromXml + AsXml + fmt::Debug> XmlStream<Io, T> {
285    /// Initiate a stream reset
286    ///
287    /// To actually send the stream header, call
288    /// [`send_header`][`InitiatingStream::send_header`] on the result.
289    ///
290    /// # Panics
291    ///
292    /// Attempting to reset the stream while an object is being received will
293    /// panic. This can generally only happen if you call `poll_next`
294    /// directly, as doing that is otherwise prevented by the borrowchecker.
295    ///
296    /// In addition, attempting to reset a stream which has been closed by
297    /// either side or which has had an I/O error will also cause a panic.
298    pub fn initiate_reset(self) -> InitiatingStream<Io> {
299        self.assert_retypable();
300
301        let mut stream = self.inner;
302        Pin::new(&mut stream).reset_state();
303        InitiatingStream(stream)
304    }
305
306    /// Trigger a stream reset on the initiator side and await the new stream
307    /// header.
308    ///
309    /// This is the responder-side counterpart to
310    /// [`initiate_reset`][`Self::initiate_reset`]. The element which causes
311    /// the stream reset must be passed as `barrier` and it will be sent
312    /// right before resetting the parser state. This way, the race condition
313    /// outlined in the [`xmlstream`][`self`] module's documentation is
314    /// guaranteed to be avoided.
315    ///
316    /// Note that you should not send the element passed as `barrier` down the
317    /// stream yourself, as this function takes care of it.
318    ///
319    /// # Stream resets without a triggering element
320    ///
321    /// These are not possible to do safely and not specified in RFC 6120,
322    /// hence they cannot be done in [`XmlStream`].
323    ///
324    /// # Panics
325    ///
326    /// Attempting to reset the stream while an object is being received will
327    /// panic. This can generally only happen if you call `poll_next`
328    /// directly, as doing that is otherwise prevented by the borrowchecker.
329    ///
330    /// In addition, attempting to reset a stream which has been closed by
331    /// either side or which has had an I/O error will also cause a panic.
332    pub async fn accept_reset(mut self, barrier: &T) -> io::Result<AcceptedStream<Io>> {
333        self.assert_retypable();
334        self.send(barrier).await?;
335
336        let mut stream = self.inner;
337        Pin::new(&mut stream).reset_state();
338        let header = StreamHeader::recv(Pin::new(&mut stream)).await?;
339        Ok(AcceptedStream { stream, header })
340    }
341
342    /// Discard all XML state and return the inner I/O object.
343    pub fn into_inner(self) -> Io {
344        self.assert_retypable();
345        self.inner.into_inner()
346    }
347
348    /// Box the underlying transport stream.
349    ///
350    /// This removes the specific type of the transport from the XML stream's
351    /// type signature.
352    pub fn box_stream(self) -> XmlStream<Box<dyn AsyncReadAndWrite + Send + 'static>, T>
353    where
354        Io: AsyncReadAndWrite + Send + 'static,
355    {
356        XmlStream {
357            inner: self.inner.box_stream(),
358            read_state: self.read_state,
359            write_state: self.write_state,
360        }
361    }
362}
363
364impl<Io: AsyncBufRead, T: FromXml + AsXml + fmt::Debug> Stream for XmlStream<Io, T> {
365    type Item = Result<T, ReadError>;
366
367    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
368        let mut this = self.project();
369        let result = match this.read_state.as_mut() {
370            None => {
371                // awaiting eof.
372                return loop {
373                    match ready!(this.inner.as_mut().poll_next(cx)) {
374                        None => break Poll::Ready(None),
375                        Some(Ok(_)) => unreachable!("xml parser allowed data after stream footer"),
376                        Some(Err(RawError::Io(e))) => {
377                            break Poll::Ready(Some(Err(ReadError::HardError(e))))
378                        }
379                        // Swallow soft timeout, we don't want the user to trigger
380                        // anything here.
381                        Some(Err(RawError::SoftTimeout)) => continue,
382                    }
383                };
384            }
385            Some(read_state) => ready!(read_state.poll_advance(this.inner, cx)),
386        };
387        let result = match result {
388            Ok(v) => Poll::Ready(Some(Ok(v))),
389            Err(ReadXsoError::Hard(e)) => Poll::Ready(Some(Err(ReadError::HardError(e)))),
390            Err(ReadXsoError::Parse(e)) => Poll::Ready(Some(Err(ReadError::ParseError(e)))),
391            Err(ReadXsoError::Footer) => {
392                *this.read_state = None;
393                // Return early here, because we cannot allow recreation of
394                // another read state.
395                return Poll::Ready(Some(Err(ReadError::StreamFooterReceived)));
396            }
397            Err(ReadXsoError::SoftTimeout) => Poll::Ready(Some(Err(ReadError::SoftTimeout))),
398        };
399        *this.read_state = Some(ReadXsoState::default());
400        result
401    }
402}
403
404impl<Io: AsyncWrite, T: FromXml + AsXml> XmlStream<Io, T> {
405    /// Initiate stream shutdown and poll for completion.
406    ///
407    /// Please see [`Self::shutdown`] for details.
408    pub fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), io::Error>> {
409        let mut this = self.project();
410        this.write_state.check_ok()?;
411        loop {
412            match this.write_state {
413                // Open => initiate closing.
414                WriteState::Open => {
415                    *this.write_state = WriteState::SendElementFoot;
416                }
417                // Sending => wait for readiness, then send.
418                WriteState::SendElementFoot => {
419                    match ready!(this.inner.as_mut().poll_ready(cx))
420                        .and_then(|_| this.inner.as_mut().start_send(Item::ElementFoot))
421                    {
422                        Ok(()) => {
423                            log::trace!("stream footer sent successfully");
424                        }
425                        // If it fails, we fail the sink immediately.
426                        Err(e) => {
427                            log::debug!(
428                                "omitting stream footer: failed to make stream ready: {}",
429                                e
430                            );
431                            *this.write_state = WriteState::Failed;
432                            return Poll::Ready(Err(e));
433                        }
434                    }
435                    *this.write_state = WriteState::FooterSent;
436                }
437                // Footer sent => just close the inner stream.
438                WriteState::FooterSent => break,
439                WriteState::Failed => unreachable!(), // caught by check_ok()
440            }
441        }
442        this.inner.poll_shutdown(cx)
443    }
444}
445
446impl<Io: AsyncWrite + Unpin, T: FromXml + AsXml> XmlStream<Io, T> {
447    /// Send the stream footer and close the sender side of the underlying
448    /// transport.
449    ///
450    /// Unlike `poll_close` (from the `Sink` impls), this will not close the
451    /// receiving side of the underlying the transport. It is advisable to call
452    /// `poll_close` eventually after `poll_shutdown` in order to gracefully
453    /// handle situations where the remote side does not close the stream
454    /// cleanly.
455    pub fn shutdown(&mut self) -> Shutdown<'_, Io, T> {
456        Shutdown {
457            stream: Pin::new(self),
458        }
459    }
460}
461
462impl<'x, Io: AsyncWrite, T: FromXml + AsXml, U: AsXml> Sink<&'x U> for XmlStream<Io, T> {
463    type Error = io::Error;
464
465    fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
466        let this = self.project();
467        this.write_state.check_writable()?;
468        this.inner.poll_ready(cx)
469    }
470
471    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
472        let this = self.project();
473        this.write_state.check_writable()?;
474        this.inner.poll_flush(cx)
475    }
476
477    fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
478        ready!(self.as_mut().poll_shutdown(cx))?;
479        let this = self.project();
480        this.inner.poll_close(cx)
481    }
482
483    fn start_send(self: Pin<&mut Self>, item: &'x U) -> Result<(), Self::Error> {
484        let this = self.project();
485        this.write_state.check_writable()?;
486        this.inner.start_send_xso(item)
487    }
488}
489
490/// Future implementing [`XmlStream::shutdown`] using
491/// [`XmlStream::poll_shutdown`].
492pub struct Shutdown<'a, Io: AsyncWrite, T: FromXml + AsXml> {
493    stream: Pin<&'a mut XmlStream<Io, T>>,
494}
495
496impl<'a, Io: AsyncWrite, T: FromXml + AsXml> Future for Shutdown<'a, Io, T> {
497    type Output = io::Result<()>;
498
499    fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
500        self.stream.as_mut().poll_shutdown(cx)
501    }
502}
503
504/// Convenience alias for an XML stream using [`XmppStreamElement`].
505pub type XmppStream<Io> = XmlStream<Io, XmppStreamElement>;