tokio_xmpp/xmlstream/
mod.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
// Copyright (c) 2024 Jonas Schäfer <jonas@zombofant.net>
//
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.

//! # RFC 6120 XML Streams
//!
//! **Note:** The XML stream is a low-level API which you should probably not
//! use directly.
//!
//! Establishing an XML stream is always a multi-stage process due to how
//! stream negotiation works. Based on the values sent by the initiator in the
//! stream header, the responder may choose to offer different features.
//!
//! In order to allow this, the following multi-step processes are defined.
//!
//! ## Initiating an XML stream
//!
//! To initiate an XML stream, you need to:
//!
//! 1. Call [`initiate_stream`] to obtain the [`PendingFeaturesRecv`] object.
//!    That object holds the stream header sent by the peer for inspection.
//! 2. Call [`PendingFeaturesRecv::recv_features`] if you are content with
//!    the content of the stream header to obtain the [`XmlStream`] object and
//!    the features sent by the peer.
//!
//! ## Accepting an XML stream connection
//!
//! To accept an XML stream, you need to:
//!
//! 1. Call [`accept_stream`] to obtain the [`AcceptedStream`] object.
//!    That object holds the stream header sent by the peer for inspection.
//! 2. Call [`AcceptedStream::send_header`] if you are content with
//!    the content of the stream header to obtain the [`PendingFeaturesSend`]
//!    object.
//! 3. Call [`PendingFeaturesSend::send_features`] to send the stream features
//!    to the peer and obtain the [`XmlStream`] object.
//!
//! ## Mid-stream resets
//!
//! RFC 6120 describes a couple of situations where stream resets are executed
//! during stream negotiation. During a stream reset, both parties drop their
//! parser state and the stream is started from the beginning, with a new
//! stream header sent by the initiator and received by the responder.
//!
//! Stream resets are inherently prone to race conditions. If the responder
//! executes a read from the underlying transport between sending the element
//! which triggers the stream reset and discarding its parser state, it may
//! accidentally read the initiator's stream header into the *old* parser
//! state instead of the post-reset parser state.
//!
//! Stream resets are executed with the [`XmlStream::initiate_reset`] and
//! [`XmlStream::accept_reset`] functions, for initiator and responder,
//! respectively. In order to avoid the race condition,
//! [`XmlStream::accept_reset`] handles sending the last pre-reset element and
//! resetting the stream in a single step.

use core::fmt;
use core::future::Future;
use core::pin::Pin;
use core::task::{Context, Poll};
use std::io;
#[cfg(feature = "syntax-highlighting")]
use std::sync::LazyLock;

use futures::{ready, Sink, SinkExt, Stream};

use tokio::io::{AsyncBufRead, AsyncWrite};

use xso::{AsXml, FromXml, Item};

mod capture;
mod common;
mod initiator;
mod responder;
#[cfg(test)]
mod tests;
pub(crate) mod xmpp;

use self::common::{RawError, RawXmlStream, ReadXsoError, ReadXsoState};
pub use self::common::{StreamHeader, Timeouts};
pub use self::initiator::{InitiatingStream, PendingFeaturesRecv};
pub use self::responder::{AcceptedStream, PendingFeaturesSend};
pub use self::xmpp::XmppStreamElement;

#[cfg(feature = "syntax-highlighting")]
static PS: LazyLock<syntect::parsing::SyntaxSet> =
    LazyLock::new(syntect::parsing::SyntaxSet::load_defaults_newlines);

#[cfg(feature = "syntax-highlighting")]
static SYNTAX: LazyLock<&syntect::parsing::SyntaxReference> =
    LazyLock::new(|| PS.find_syntax_by_extension("xml").unwrap());

#[cfg(feature = "syntax-highlighting")]
static THEME: LazyLock<syntect::highlighting::Theme> = LazyLock::new(|| {
    syntect::highlighting::ThemeSet::load_defaults().themes["Solarized (dark)"].clone()
});

#[cfg(feature = "syntax-highlighting")]
fn highlight_xml(xml: &str) -> String {
    let mut h = syntect::easy::HighlightLines::new(&SYNTAX, &THEME);
    let ranges: Vec<_> = h.highlight_line(&xml, &PS).unwrap();
    let mut escaped = syntect::util::as_24_bit_terminal_escaped(&ranges[..], false);
    escaped += "\x1b[0m";
    escaped
}

struct LogXsoBuf<'x>(&'x [u8]);

impl<'x> fmt::Display for LogXsoBuf<'x> {
    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
        // We always generate UTF-8, so this should be good... I think.
        let text = std::str::from_utf8(&self.0).unwrap();
        #[cfg(feature = "syntax-highlighting")]
        let text = highlight_xml(text);
        f.write_str(&text)
    }
}

/// Initiate a new stream
///
/// Initiate a new stream using the given I/O object `io`. The default
/// XML namespace will be set to `stream_ns` and the stream header will use
/// the attributes as set in `stream_header`, along with version `1.0`.
///
/// The returned object contains the stream header sent by the remote side
/// as well as the internal parser state to continue the negotiation.
pub async fn initiate_stream<Io: AsyncBufRead + AsyncWrite + Unpin>(
    io: Io,
    stream_ns: &'static str,
    stream_header: StreamHeader<'_>,
    timeouts: Timeouts,
) -> Result<PendingFeaturesRecv<Io>, io::Error> {
    let stream = InitiatingStream(RawXmlStream::new(io, stream_ns, timeouts));
    stream.send_header(stream_header).await
}

/// Accept a new XML stream as responder
///
/// Prepares the responer side of an XML stream using the given I/O object
/// `io`. The default XML namespace will be set to `stream_ns`.
///
/// The returned object contains the stream header sent by the remote side
/// as well as the internal parser state to continue the negotiation.
pub async fn accept_stream<Io: AsyncBufRead + AsyncWrite + Unpin>(
    io: Io,
    stream_ns: &'static str,
    timeouts: Timeouts,
) -> Result<AcceptedStream<Io>, io::Error> {
    let mut stream = RawXmlStream::new(io, stream_ns, timeouts);
    let header = StreamHeader::recv(Pin::new(&mut stream)).await?;
    Ok(AcceptedStream { stream, header })
}

/// A non-success state which may occur while reading an XSO from a
/// [`XmlStream`]
#[derive(Debug)]
pub enum ReadError {
    /// The soft timeout of the stream triggered.
    ///
    /// User code should handle this by sending something into the stream
    /// which causes the peer to send data before the hard timeout triggers.
    SoftTimeout,

    /// An I/O error occurred in the underlying I/O object.
    ///
    /// This is generally fatal.
    HardError(io::Error),

    /// A parse error occurred while processing the XSO.
    ///
    /// This is non-fatal and more XSOs may be read from the stream.
    ParseError(xso::error::Error),

    /// The stream footer was received.
    ///
    /// Any future read attempts will again return this error. The stream has
    /// been closed by the peer and you should probably close it, too.
    StreamFooterReceived,
}

impl fmt::Display for ReadError {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        match self {
            ReadError::SoftTimeout => write!(f, "soft timeout"),
            ReadError::HardError(e) => write!(f, "hard error: {}", e),
            ReadError::ParseError(e) => write!(f, "parse error: {}", e),
            ReadError::StreamFooterReceived => write!(f, "stream footer received"),
        }
    }
}

impl core::error::Error for ReadError {
    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
        match self {
            ReadError::HardError(e) => Some(e),
            ReadError::ParseError(e) => Some(e),
            _ => None,
        }
    }
}

enum WriteState {
    Open,
    SendElementFoot,
    FooterSent,
    Failed,
}

impl WriteState {
    fn check_ok(&self) -> io::Result<()> {
        match self {
            WriteState::Failed => Err(io::Error::new(
                io::ErrorKind::NotConnected,
                "XML stream sink unusable because of previous write error",
            )),
            WriteState::Open | WriteState::SendElementFoot | WriteState::FooterSent => Ok(()),
        }
    }

    fn check_writable(&self) -> io::Result<()> {
        match self {
            WriteState::SendElementFoot | WriteState::FooterSent => Err(io::Error::new(
                io::ErrorKind::NotConnected,
                "stream footer already sent",
            )),
            WriteState::Failed | WriteState::Open => self.check_ok(),
        }
    }
}

pin_project_lite::pin_project! {
    /// XML stream
    ///
    /// This struct represents an
    /// [RFC 6120](https://tools.ietf.org/html/rfc6120) XML stream, where the
    /// payload consists of items of type `T` implementing [`FromXml`] and
    /// [`AsXml`].
    pub struct XmlStream<Io, T: FromXml> {
        #[pin]
        inner: RawXmlStream<Io>,
        read_state: Option<ReadXsoState<T>>,
        write_state: WriteState,
    }
}

impl<Io, T: FromXml> XmlStream<Io, T> {
    /// Obtain a reference to the `Io` stream.
    pub fn get_stream(&self) -> &Io {
        self.inner.get_stream()
    }
}

impl<Io: AsyncBufRead, T: FromXml + AsXml> XmlStream<Io, T> {
    fn wrap(inner: RawXmlStream<Io>) -> Self {
        Self {
            inner,
            read_state: Some(ReadXsoState::default()),
            write_state: WriteState::Open,
        }
    }

    fn assert_retypable(&self) {
        match self.read_state {
            Some(ReadXsoState::PreData) => (),
            Some(_) => panic!("cannot reset stream: XSO parsing in progress!"),
            None => panic!("cannot reset stream: stream footer received!"),
        }
        match self.write_state.check_writable() {
            Ok(()) => (),
            Err(e) => panic!("cannot reset stream: {}", e),
        }
    }
}

impl<Io: AsyncBufRead + AsyncWrite + Unpin, T: FromXml + AsXml + fmt::Debug> XmlStream<Io, T> {
    /// Initiate a stream reset
    ///
    /// To actually send the stream header, call
    /// [`send_header`][`InitiatingStream::send_header`] on the result.
    ///
    /// # Panics
    ///
    /// Attempting to reset the stream while an object is being received will
    /// panic. This can generally only happen if you call `poll_next`
    /// directly, as doing that is otherwise prevented by the borrowchecker.
    ///
    /// In addition, attempting to reset a stream which has been closed by
    /// either side or which has had an I/O error will also cause a panic.
    pub fn initiate_reset(self) -> InitiatingStream<Io> {
        self.assert_retypable();

        let mut stream = self.inner;
        Pin::new(&mut stream).reset_state();
        InitiatingStream(stream)
    }

    /// Trigger a stream reset on the initiator side and await the new stream
    /// header.
    ///
    /// This is the responder-side counterpart to
    /// [`initiate_reset`][`Self::initiate_reset`]. The element which causes
    /// the stream reset must be passed as `barrier` and it will be sent
    /// right before resetting the parser state. This way, the race condition
    /// outlined in the [`xmlstream`][`self`] module's documentation is
    /// guaranteed to be avoided.
    ///
    /// Note that you should not send the element passed as `barrier` down the
    /// stream yourself, as this function takes care of it.
    ///
    /// # Stream resets without a triggering element
    ///
    /// These are not possible to do safely and not specified in RFC 6120,
    /// hence they cannot be done in [`XmlStream`].
    ///
    /// # Panics
    ///
    /// Attempting to reset the stream while an object is being received will
    /// panic. This can generally only happen if you call `poll_next`
    /// directly, as doing that is otherwise prevented by the borrowchecker.
    ///
    /// In addition, attempting to reset a stream which has been closed by
    /// either side or which has had an I/O error will also cause a panic.
    pub async fn accept_reset(mut self, barrier: &T) -> io::Result<AcceptedStream<Io>> {
        self.assert_retypable();
        self.send(barrier).await?;

        let mut stream = self.inner;
        Pin::new(&mut stream).reset_state();
        let header = StreamHeader::recv(Pin::new(&mut stream)).await?;
        Ok(AcceptedStream { stream, header })
    }

    /// Discard all XML state and return the inner I/O object.
    pub fn into_inner(self) -> Io {
        self.assert_retypable();
        self.inner.into_inner()
    }
}

impl<Io: AsyncBufRead, T: FromXml + AsXml + fmt::Debug> Stream for XmlStream<Io, T> {
    type Item = Result<T, ReadError>;

    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        let mut this = self.project();
        let result = match this.read_state.as_mut() {
            None => {
                // awaiting eof.
                return loop {
                    match ready!(this.inner.as_mut().poll_next(cx)) {
                        None => break Poll::Ready(None),
                        Some(Ok(_)) => unreachable!("xml parser allowed data after stream footer"),
                        Some(Err(RawError::Io(e))) => {
                            break Poll::Ready(Some(Err(ReadError::HardError(e))))
                        }
                        // Swallow soft timeout, we don't want the user to trigger
                        // anything here.
                        Some(Err(RawError::SoftTimeout)) => continue,
                    }
                };
            }
            Some(read_state) => ready!(read_state.poll_advance(this.inner, cx)),
        };
        let result = match result {
            Ok(v) => Poll::Ready(Some(Ok(v))),
            Err(ReadXsoError::Hard(e)) => Poll::Ready(Some(Err(ReadError::HardError(e)))),
            Err(ReadXsoError::Parse(e)) => Poll::Ready(Some(Err(ReadError::ParseError(e)))),
            Err(ReadXsoError::Footer) => {
                *this.read_state = None;
                // Return early here, because we cannot allow recreation of
                // another read state.
                return Poll::Ready(Some(Err(ReadError::StreamFooterReceived)));
            }
            Err(ReadXsoError::SoftTimeout) => Poll::Ready(Some(Err(ReadError::SoftTimeout))),
        };
        *this.read_state = Some(ReadXsoState::default());
        result
    }
}

impl<Io: AsyncWrite, T: FromXml + AsXml> XmlStream<Io, T> {
    /// Initiate stream shutdown and poll for completion.
    ///
    /// Please see [`Self::shutdown`] for details.
    pub fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), io::Error>> {
        let mut this = self.project();
        this.write_state.check_ok()?;
        loop {
            match this.write_state {
                // Open => initiate closing.
                WriteState::Open => {
                    *this.write_state = WriteState::SendElementFoot;
                }
                // Sending => wait for readiness, then send.
                WriteState::SendElementFoot => {
                    match ready!(this.inner.as_mut().poll_ready(cx))
                        .and_then(|_| this.inner.as_mut().start_send(Item::ElementFoot))
                    {
                        Ok(()) => {
                            log::trace!("stream footer sent successfully");
                        }
                        // If it fails, we fail the sink immediately.
                        Err(e) => {
                            log::debug!(
                                "omitting stream footer: failed to make stream ready: {}",
                                e
                            );
                            *this.write_state = WriteState::Failed;
                            return Poll::Ready(Err(e));
                        }
                    }
                    *this.write_state = WriteState::FooterSent;
                }
                // Footer sent => just close the inner stream.
                WriteState::FooterSent => break,
                WriteState::Failed => unreachable!(), // caught by check_ok()
            }
        }
        this.inner.poll_shutdown(cx)
    }
}

impl<Io: AsyncWrite + Unpin, T: FromXml + AsXml> XmlStream<Io, T> {
    /// Send the stream footer and close the sender side of the underlying
    /// transport.
    ///
    /// Unlike `poll_close` (from the `Sink` impls), this will not close the
    /// receiving side of the underlying the transport. It is advisable to call
    /// `poll_close` eventually after `poll_shutdown` in order to gracefully
    /// handle situations where the remote side does not close the stream
    /// cleanly.
    pub fn shutdown(&mut self) -> Shutdown<'_, Io, T> {
        Shutdown {
            stream: Pin::new(self),
        }
    }
}

impl<'x, Io: AsyncWrite, T: FromXml + AsXml> Sink<&'x T> for XmlStream<Io, T> {
    type Error = io::Error;

    fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
        let this = self.project();
        this.write_state.check_writable()?;
        this.inner.poll_ready(cx)
    }

    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
        let this = self.project();
        this.write_state.check_writable()?;
        this.inner.poll_flush(cx)
    }

    fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
        ready!(self.as_mut().poll_shutdown(cx))?;
        let this = self.project();
        this.inner.poll_close(cx)
    }

    fn start_send(self: Pin<&mut Self>, item: &'x T) -> Result<(), Self::Error> {
        let this = self.project();
        this.write_state.check_writable()?;
        this.inner.start_send_xso(item)
    }
}

/// Future implementing [`XmlStream::shutdown`] using
/// [`XmlStream::poll_shutdown`].
pub struct Shutdown<'a, Io: AsyncWrite, T: FromXml + AsXml> {
    stream: Pin<&'a mut XmlStream<Io, T>>,
}

impl<'a, Io: AsyncWrite, T: FromXml + AsXml> Future for Shutdown<'a, Io, T> {
    type Output = io::Result<()>;

    fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
        self.stream.as_mut().poll_shutdown(cx)
    }
}

/// Convenience alias for an XML stream using [`XmppStreamElement`].
pub type XmppStream<Io> = XmlStream<Io, XmppStreamElement>;