tokio_xmpp/xmlstream/mod.rs
1// Copyright (c) 2024 Jonas Schäfer <jonas@zombofant.net>
2//
3// This Source Code Form is subject to the terms of the Mozilla Public
4// License, v. 2.0. If a copy of the MPL was not distributed with this
5// file, You can obtain one at http://mozilla.org/MPL/2.0/.
6
7//! # RFC 6120 XML Streams
8//!
9//! **Note:** The XML stream is a low-level API which you should probably not
10//! use directly. You may be looking for
11//! [`StanzaStream`][`crate::stanzastream::StanzaStream`] instead.
12//!
13//! Establishing an XML stream is always a multi-stage process due to how
14//! stream negotiation works. Based on the values sent by the initiator in the
15//! stream header, the responder may choose to offer different features.
16//!
17//! In order to allow this, the following multi-step processes are defined.
18//!
19//! ## Initiating an XML stream
20//!
21//! To initiate an XML stream, you need to:
22//!
23//! 1. Call [`initiate_stream`] to obtain the [`PendingFeaturesRecv`] object.
24//! That object holds the stream header sent by the peer for inspection.
25//! 2. Call [`PendingFeaturesRecv::recv_features`] if you are content with
26//! the content of the stream header to obtain the [`XmlStream`] object and
27//! the features sent by the peer.
28//!
29//! ## Accepting an XML stream connection
30//!
31//! To accept an XML stream, you need to:
32//!
33//! 1. Call [`accept_stream`] to obtain the [`AcceptedStream`] object.
34//! That object holds the stream header sent by the peer for inspection.
35//! 2. Call [`AcceptedStream::send_header`] if you are content with
36//! the content of the stream header to obtain the [`PendingFeaturesSend`]
37//! object.
38//! 3. Call [`PendingFeaturesSend::send_features`] to send the stream features
39//! to the peer and obtain the [`XmlStream`] object.
40//!
41//! ## Mid-stream resets
42//!
43//! RFC 6120 describes a couple of situations where stream resets are executed
44//! during stream negotiation. During a stream reset, both parties drop their
45//! parser state and the stream is started from the beginning, with a new
46//! stream header sent by the initiator and received by the responder.
47//!
48//! Stream resets are inherently prone to race conditions. If the responder
49//! executes a read from the underlying transport between sending the element
50//! which triggers the stream reset and discarding its parser state, it may
51//! accidentally read the initiator's stream header into the *old* parser
52//! state instead of the post-reset parser state.
53//!
54//! Stream resets are executed with the [`XmlStream::initiate_reset`] and
55//! [`XmlStream::accept_reset`] functions, for initiator and responder,
56//! respectively. In order to avoid the race condition,
57//! [`XmlStream::accept_reset`] handles sending the last pre-reset element and
58//! resetting the stream in a single step.
59
60use core::fmt;
61use core::future::Future;
62use core::pin::Pin;
63use core::task::{Context, Poll};
64use std::io;
65#[cfg(feature = "syntax-highlighting")]
66use std::sync::LazyLock;
67
68use futures::{ready, Sink, SinkExt, Stream};
69
70use tokio::io::{AsyncBufRead, AsyncWrite};
71
72use xso::{AsXml, FromXml, Item};
73
74use crate::connect::AsyncReadAndWrite;
75
76mod capture;
77mod common;
78mod initiator;
79mod responder;
80#[cfg(test)]
81mod tests;
82pub(crate) mod xmpp;
83
84use self::common::{RawError, RawXmlStream, ReadXsoError, ReadXsoState};
85pub use self::common::{StreamHeader, Timeouts};
86pub use self::initiator::{InitiatingStream, PendingFeaturesRecv, RecvFeaturesError};
87pub use self::responder::{AcceptedStream, PendingFeaturesSend};
88pub use self::xmpp::{
89 FallibleStreamElement, RawStanzaHeader, StreamElementError, XmppStreamElement,
90};
91
92#[cfg(feature = "syntax-highlighting")]
93static PS: LazyLock<syntect::parsing::SyntaxSet> =
94 LazyLock::new(syntect::parsing::SyntaxSet::load_defaults_newlines);
95
96#[cfg(feature = "syntax-highlighting")]
97static SYNTAX: LazyLock<&syntect::parsing::SyntaxReference> =
98 LazyLock::new(|| PS.find_syntax_by_extension("xml").unwrap());
99
100#[cfg(feature = "syntax-highlighting")]
101static THEME: LazyLock<syntect::highlighting::Theme> = LazyLock::new(|| {
102 syntect::highlighting::ThemeSet::load_defaults().themes["Solarized (dark)"].clone()
103});
104
105#[cfg(feature = "syntax-highlighting")]
106fn highlight_xml(xml: &str) -> String {
107 let mut h = syntect::easy::HighlightLines::new(&SYNTAX, &THEME);
108 let ranges: Vec<_> = h.highlight_line(&xml, &PS).unwrap();
109 let mut escaped = syntect::util::as_24_bit_terminal_escaped(&ranges[..], false);
110 escaped += "\x1b[0m";
111 escaped
112}
113
114struct LogXsoBuf<'x>(&'x [u8]);
115
116impl fmt::Display for LogXsoBuf<'_> {
117 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
118 // We always generate UTF-8, so this should be good... I think.
119 let text = core::str::from_utf8(self.0).unwrap();
120 #[cfg(feature = "syntax-highlighting")]
121 let text = highlight_xml(text);
122 f.write_str(&text)
123 }
124}
125
126/// Initiate a new stream
127///
128/// Initiate a new stream using the given I/O object `io`. The default
129/// XML namespace will be set to `stream_ns` and the stream header will use
130/// the attributes as set in `stream_header`, along with version `1.0`.
131///
132/// The returned object contains the stream header sent by the remote side
133/// as well as the internal parser state to continue the negotiation.
134pub async fn initiate_stream<Io: AsyncBufRead + AsyncWrite + Unpin>(
135 io: Io,
136 stream_ns: &'static str,
137 stream_header: StreamHeader<'_>,
138 timeouts: Timeouts,
139) -> Result<PendingFeaturesRecv<Io>, io::Error> {
140 let stream = InitiatingStream(RawXmlStream::new(io, stream_ns, timeouts));
141 stream.send_header(stream_header).await
142}
143
144/// Accept a new XML stream as responder
145///
146/// Prepares the responer side of an XML stream using the given I/O object
147/// `io`. The default XML namespace will be set to `stream_ns`.
148///
149/// The returned object contains the stream header sent by the remote side
150/// as well as the internal parser state to continue the negotiation.
151pub async fn accept_stream<Io: AsyncBufRead + AsyncWrite + Unpin>(
152 io: Io,
153 stream_ns: &'static str,
154 timeouts: Timeouts,
155) -> Result<AcceptedStream<Io>, io::Error> {
156 let mut stream = RawXmlStream::new(io, stream_ns, timeouts);
157 let header = StreamHeader::recv(Pin::new(&mut stream)).await?;
158 Ok(AcceptedStream { stream, header })
159}
160
161/// A non-success state which may occur while reading an XSO from a
162/// [`XmlStream`]
163#[derive(Debug)]
164pub enum ReadError {
165 /// The soft timeout of the stream triggered.
166 ///
167 /// User code should handle this by sending something into the stream
168 /// which causes the peer to send data before the hard timeout triggers.
169 SoftTimeout,
170
171 /// An I/O error occurred in the underlying I/O object.
172 ///
173 /// This is generally fatal.
174 HardError(io::Error),
175
176 /// A parse error occurred while processing the XSO.
177 ///
178 /// This is non-fatal and more XSOs may be read from the stream.
179 ParseError(xso::error::Error),
180
181 /// The stream footer was received.
182 ///
183 /// Any future read attempts will again return this error. The stream has
184 /// been closed by the peer and you should probably close it, too.
185 StreamFooterReceived,
186}
187
188impl fmt::Display for ReadError {
189 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
190 match self {
191 ReadError::SoftTimeout => write!(f, "soft timeout"),
192 ReadError::HardError(e) => write!(f, "hard error: {}", e),
193 ReadError::ParseError(e) => write!(f, "parse error: {}", e),
194 ReadError::StreamFooterReceived => write!(f, "stream footer received"),
195 }
196 }
197}
198
199impl core::error::Error for ReadError {
200 fn source(&self) -> Option<&(dyn core::error::Error + 'static)> {
201 match self {
202 ReadError::HardError(e) => Some(e),
203 ReadError::ParseError(e) => Some(e),
204 _ => None,
205 }
206 }
207}
208
209enum WriteState {
210 Open,
211 SendElementFoot,
212 FooterSent,
213 Failed,
214}
215
216impl WriteState {
217 fn check_ok(&self) -> io::Result<()> {
218 match self {
219 WriteState::Failed => Err(io::Error::new(
220 io::ErrorKind::NotConnected,
221 "XML stream sink unusable because of previous write error",
222 )),
223 WriteState::Open | WriteState::SendElementFoot | WriteState::FooterSent => Ok(()),
224 }
225 }
226
227 fn check_writable(&self) -> io::Result<()> {
228 match self {
229 WriteState::SendElementFoot | WriteState::FooterSent => Err(io::Error::new(
230 io::ErrorKind::NotConnected,
231 "stream footer already sent",
232 )),
233 WriteState::Failed | WriteState::Open => self.check_ok(),
234 }
235 }
236}
237
238pin_project_lite::pin_project! {
239 /// XML stream
240 ///
241 /// This struct represents an
242 /// [RFC 6120](https://tools.ietf.org/html/rfc6120) XML stream, where the
243 /// payload consists of items of type `T` implementing [`FromXml`] and
244 /// [`AsXml`].
245 ///
246 /// **Note:** The XML stream is a low-level API which you should probably
247 /// not use directly. You may be looking for
248 /// [`StanzaStream`][`crate::stanzastream::StnazaStream`] instead.
249 pub struct XmlStream<Io, T: FromXml> {
250 #[pin]
251 inner: RawXmlStream<Io>,
252 read_state: Option<ReadXsoState<T>>,
253 write_state: WriteState,
254 }
255}
256
257impl<Io, T: FromXml> XmlStream<Io, T> {
258 /// Obtain a reference to the `Io` stream.
259 pub fn get_stream(&self) -> &Io {
260 self.inner.get_stream()
261 }
262}
263
264impl<Io: AsyncBufRead, T: FromXml> XmlStream<Io, T> {
265 fn wrap(inner: RawXmlStream<Io>) -> Self {
266 Self {
267 inner,
268 read_state: Some(ReadXsoState::default()),
269 write_state: WriteState::Open,
270 }
271 }
272
273 fn assert_retypable(&self) {
274 match self.read_state {
275 Some(ReadXsoState::PreData) => (),
276 Some(_) => panic!("cannot reset stream: XSO parsing in progress!"),
277 None => panic!("cannot reset stream: stream footer received!"),
278 }
279 match self.write_state.check_writable() {
280 Ok(()) => (),
281 Err(e) => panic!("cannot reset stream: {}", e),
282 }
283 }
284}
285
286impl<Io: AsyncBufRead + AsyncWrite + Unpin, T: FromXml + fmt::Debug> XmlStream<Io, T> {
287 /// Initiate a stream reset
288 ///
289 /// To actually send the stream header, call
290 /// [`send_header`][`InitiatingStream::send_header`] on the result.
291 ///
292 /// # Panics
293 ///
294 /// Attempting to reset the stream while an object is being received will
295 /// panic. This can generally only happen if you call `poll_next`
296 /// directly, as doing that is otherwise prevented by the borrowchecker.
297 ///
298 /// In addition, attempting to reset a stream which has been closed by
299 /// either side or which has had an I/O error will also cause a panic.
300 pub fn initiate_reset(self) -> InitiatingStream<Io> {
301 self.assert_retypable();
302
303 let mut stream = self.inner;
304 Pin::new(&mut stream).reset_state();
305 InitiatingStream(stream)
306 }
307
308 /// Trigger a stream reset on the initiator side and await the new stream
309 /// header.
310 ///
311 /// This is the responder-side counterpart to
312 /// [`initiate_reset`][`Self::initiate_reset`]. The element which causes
313 /// the stream reset must be passed as `barrier` and it will be sent
314 /// right before resetting the parser state. This way, the race condition
315 /// outlined in the [`xmlstream`][`self`] module's documentation is
316 /// guaranteed to be avoided.
317 ///
318 /// Note that you should not send the element passed as `barrier` down the
319 /// stream yourself, as this function takes care of it.
320 ///
321 /// # Stream resets without a triggering element
322 ///
323 /// These are not possible to do safely and not specified in RFC 6120,
324 /// hence they cannot be done in [`XmlStream`].
325 ///
326 /// # Panics
327 ///
328 /// Attempting to reset the stream while an object is being received will
329 /// panic. This can generally only happen if you call `poll_next`
330 /// directly, as doing that is otherwise prevented by the borrowchecker.
331 ///
332 /// In addition, attempting to reset a stream which has been closed by
333 /// either side or which has had an I/O error will also cause a panic.
334 pub async fn accept_reset<U: AsXml>(mut self, barrier: &U) -> io::Result<AcceptedStream<Io>> {
335 self.assert_retypable();
336 self.send(barrier).await?;
337
338 let mut stream = self.inner;
339 Pin::new(&mut stream).reset_state();
340 let header = StreamHeader::recv(Pin::new(&mut stream)).await?;
341 Ok(AcceptedStream { stream, header })
342 }
343
344 /// Discard all XML state and return the inner I/O object.
345 pub fn into_inner(self) -> Io {
346 self.assert_retypable();
347 self.inner.into_inner()
348 }
349
350 /// Box the underlying transport stream.
351 ///
352 /// This removes the specific type of the transport from the XML stream's
353 /// type signature.
354 pub fn box_stream(self) -> XmlStream<Box<dyn AsyncReadAndWrite + Send + 'static>, T>
355 where
356 Io: AsyncReadAndWrite + Send + 'static,
357 {
358 XmlStream {
359 inner: self.inner.box_stream(),
360 read_state: self.read_state,
361 write_state: self.write_state,
362 }
363 }
364}
365
366impl<Io: AsyncBufRead, T: FromXml + fmt::Debug> Stream for XmlStream<Io, T> {
367 type Item = Result<T, ReadError>;
368
369 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
370 let mut this = self.project();
371 let result = match this.read_state.as_mut() {
372 None => {
373 // awaiting eof.
374 return loop {
375 match ready!(this.inner.as_mut().poll_next(cx)) {
376 None => break Poll::Ready(None),
377 Some(Ok(_)) => unreachable!("xml parser allowed data after stream footer"),
378 Some(Err(RawError::Io(e))) => {
379 break Poll::Ready(Some(Err(ReadError::HardError(e))))
380 }
381 // Swallow soft timeout, we don't want the user to trigger
382 // anything here.
383 Some(Err(RawError::SoftTimeout)) => continue,
384 }
385 };
386 }
387 Some(read_state) => ready!(read_state.poll_advance(this.inner, cx)),
388 };
389 let result = match result {
390 Ok(v) => Poll::Ready(Some(Ok(v))),
391 Err(ReadXsoError::Hard(e)) => Poll::Ready(Some(Err(ReadError::HardError(e)))),
392 Err(ReadXsoError::Parse(e)) => Poll::Ready(Some(Err(ReadError::ParseError(e)))),
393 Err(ReadXsoError::Footer) => {
394 *this.read_state = None;
395 // Return early here, because we cannot allow recreation of
396 // another read state.
397 return Poll::Ready(Some(Err(ReadError::StreamFooterReceived)));
398 }
399 Err(ReadXsoError::SoftTimeout) => Poll::Ready(Some(Err(ReadError::SoftTimeout))),
400 };
401 *this.read_state = Some(ReadXsoState::default());
402 result
403 }
404}
405
406impl<Io: AsyncWrite, T: FromXml> XmlStream<Io, T> {
407 /// Initiate stream shutdown and poll for completion.
408 ///
409 /// Please see [`Self::shutdown`] for details.
410 pub fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), io::Error>> {
411 let mut this = self.project();
412 this.write_state.check_ok()?;
413 loop {
414 match this.write_state {
415 // Open => initiate closing.
416 WriteState::Open => {
417 *this.write_state = WriteState::SendElementFoot;
418 }
419 // Sending => wait for readiness, then send.
420 WriteState::SendElementFoot => {
421 match ready!(this.inner.as_mut().poll_ready(cx))
422 .and_then(|_| this.inner.as_mut().start_send(Item::ElementFoot))
423 {
424 Ok(()) => {
425 log::trace!("stream footer sent successfully");
426 }
427 // If it fails, we fail the sink immediately.
428 Err(e) => {
429 log::debug!(
430 "omitting stream footer: failed to make stream ready: {}",
431 e
432 );
433 *this.write_state = WriteState::Failed;
434 return Poll::Ready(Err(e));
435 }
436 }
437 *this.write_state = WriteState::FooterSent;
438 }
439 // Footer sent => just close the inner stream.
440 WriteState::FooterSent => break,
441 WriteState::Failed => unreachable!(), // caught by check_ok()
442 }
443 }
444 this.inner.poll_shutdown(cx)
445 }
446}
447
448impl<Io: AsyncWrite + Unpin, T: FromXml> XmlStream<Io, T> {
449 /// Send the stream footer and close the sender side of the underlying
450 /// transport.
451 ///
452 /// Unlike `poll_close` (from the `Sink` impls), this will not close the
453 /// receiving side of the underlying the transport. It is advisable to call
454 /// `poll_close` eventually after `poll_shutdown` in order to gracefully
455 /// handle situations where the remote side does not close the stream
456 /// cleanly.
457 pub fn shutdown(&mut self) -> Shutdown<'_, Io, T> {
458 Shutdown {
459 stream: Pin::new(self),
460 }
461 }
462}
463
464impl<'x, Io: AsyncWrite, T: FromXml, U: AsXml> Sink<&'x U> for XmlStream<Io, T> {
465 type Error = io::Error;
466
467 fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
468 let this = self.project();
469 this.write_state.check_writable()?;
470 this.inner.poll_ready(cx)
471 }
472
473 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
474 let this = self.project();
475 this.write_state.check_writable()?;
476 this.inner.poll_flush(cx)
477 }
478
479 fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
480 ready!(self.as_mut().poll_shutdown(cx))?;
481 let this = self.project();
482 this.inner.poll_close(cx)
483 }
484
485 fn start_send(self: Pin<&mut Self>, item: &'x U) -> Result<(), Self::Error> {
486 let this = self.project();
487 this.write_state.check_writable()?;
488 this.inner.start_send_xso(item)
489 }
490}
491
492/// Future implementing [`XmlStream::shutdown`] using
493/// [`XmlStream::poll_shutdown`].
494pub struct Shutdown<'a, Io: AsyncWrite, T: FromXml> {
495 stream: Pin<&'a mut XmlStream<Io, T>>,
496}
497
498impl<Io: AsyncWrite, T: FromXml> Future for Shutdown<'_, Io, T> {
499 type Output = io::Result<()>;
500
501 fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
502 self.stream.as_mut().poll_shutdown(cx)
503 }
504}
505
506/// Convenience alias for an XML stream using [`XmppStreamElement`].
507pub type XmppStream<Io> = XmlStream<Io, FallibleStreamElement>;