tokio_xmpp/xmlstream/mod.rs
1// Copyright (c) 2024 Jonas Schäfer <jonas@zombofant.net>
2//
3// This Source Code Form is subject to the terms of the Mozilla Public
4// License, v. 2.0. If a copy of the MPL was not distributed with this
5// file, You can obtain one at http://mozilla.org/MPL/2.0/.
6
7//! # RFC 6120 XML Streams
8//!
9//! **Note:** The XML stream is a low-level API which you should probably not
10//! use directly. You may be looking for
11//! [`StanzaStream`][`crate::stanzastream::StanzaStream`] instead.
12//!
13//! Establishing an XML stream is always a multi-stage process due to how
14//! stream negotiation works. Based on the values sent by the initiator in the
15//! stream header, the responder may choose to offer different features.
16//!
17//! In order to allow this, the following multi-step processes are defined.
18//!
19//! ## Initiating an XML stream
20//!
21//! To initiate an XML stream, you need to:
22//!
23//! 1. Call [`initiate_stream`] to obtain the [`PendingFeaturesRecv`] object.
24//! That object holds the stream header sent by the peer for inspection.
25//! 2. Call [`PendingFeaturesRecv::recv_features`] if you are content with
26//! the content of the stream header to obtain the [`XmlStream`] object and
27//! the features sent by the peer.
28//!
29//! ## Accepting an XML stream connection
30//!
31//! To accept an XML stream, you need to:
32//!
33//! 1. Call [`accept_stream`] to obtain the [`AcceptedStream`] object.
34//! That object holds the stream header sent by the peer for inspection.
35//! 2. Call [`AcceptedStream::send_header`] if you are content with
36//! the content of the stream header to obtain the [`PendingFeaturesSend`]
37//! object.
38//! 3. Call [`PendingFeaturesSend::send_features`] to send the stream features
39//! to the peer and obtain the [`XmlStream`] object.
40//!
41//! ## Mid-stream resets
42//!
43//! RFC 6120 describes a couple of situations where stream resets are executed
44//! during stream negotiation. During a stream reset, both parties drop their
45//! parser state and the stream is started from the beginning, with a new
46//! stream header sent by the initiator and received by the responder.
47//!
48//! Stream resets are inherently prone to race conditions. If the responder
49//! executes a read from the underlying transport between sending the element
50//! which triggers the stream reset and discarding its parser state, it may
51//! accidentally read the initiator's stream header into the *old* parser
52//! state instead of the post-reset parser state.
53//!
54//! Stream resets are executed with the [`XmlStream::initiate_reset`] and
55//! [`XmlStream::accept_reset`] functions, for initiator and responder,
56//! respectively. In order to avoid the race condition,
57//! [`XmlStream::accept_reset`] handles sending the last pre-reset element and
58//! resetting the stream in a single step.
59
60use core::fmt;
61use core::future::Future;
62use core::pin::Pin;
63use core::task::{Context, Poll};
64use std::io;
65#[cfg(feature = "syntax-highlighting")]
66use std::sync::LazyLock;
67
68use futures::{ready, Sink, SinkExt, Stream};
69
70use tokio::io::{AsyncBufRead, AsyncWrite};
71
72use xso::{AsXml, FromXml, Item};
73
74use crate::connect::AsyncReadAndWrite;
75
76mod capture;
77mod common;
78mod initiator;
79mod responder;
80#[cfg(test)]
81mod tests;
82pub(crate) mod xmpp;
83
84use self::common::{RawError, RawXmlStream, ReadXsoError, ReadXsoState};
85pub use self::common::{StreamHeader, Timeouts};
86pub use self::initiator::{InitiatingStream, PendingFeaturesRecv};
87pub use self::responder::{AcceptedStream, PendingFeaturesSend};
88pub use self::xmpp::XmppStreamElement;
89
90#[cfg(feature = "syntax-highlighting")]
91static PS: LazyLock<syntect::parsing::SyntaxSet> =
92 LazyLock::new(syntect::parsing::SyntaxSet::load_defaults_newlines);
93
94#[cfg(feature = "syntax-highlighting")]
95static SYNTAX: LazyLock<&syntect::parsing::SyntaxReference> =
96 LazyLock::new(|| PS.find_syntax_by_extension("xml").unwrap());
97
98#[cfg(feature = "syntax-highlighting")]
99static THEME: LazyLock<syntect::highlighting::Theme> = LazyLock::new(|| {
100 syntect::highlighting::ThemeSet::load_defaults().themes["Solarized (dark)"].clone()
101});
102
103#[cfg(feature = "syntax-highlighting")]
104fn highlight_xml(xml: &str) -> String {
105 let mut h = syntect::easy::HighlightLines::new(&SYNTAX, &THEME);
106 let ranges: Vec<_> = h.highlight_line(&xml, &PS).unwrap();
107 let mut escaped = syntect::util::as_24_bit_terminal_escaped(&ranges[..], false);
108 escaped += "\x1b[0m";
109 escaped
110}
111
112struct LogXsoBuf<'x>(&'x [u8]);
113
114impl<'x> fmt::Display for LogXsoBuf<'x> {
115 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
116 // We always generate UTF-8, so this should be good... I think.
117 let text = core::str::from_utf8(&self.0).unwrap();
118 #[cfg(feature = "syntax-highlighting")]
119 let text = highlight_xml(text);
120 f.write_str(&text)
121 }
122}
123
124/// Initiate a new stream
125///
126/// Initiate a new stream using the given I/O object `io`. The default
127/// XML namespace will be set to `stream_ns` and the stream header will use
128/// the attributes as set in `stream_header`, along with version `1.0`.
129///
130/// The returned object contains the stream header sent by the remote side
131/// as well as the internal parser state to continue the negotiation.
132pub async fn initiate_stream<Io: AsyncBufRead + AsyncWrite + Unpin>(
133 io: Io,
134 stream_ns: &'static str,
135 stream_header: StreamHeader<'_>,
136 timeouts: Timeouts,
137) -> Result<PendingFeaturesRecv<Io>, io::Error> {
138 let stream = InitiatingStream(RawXmlStream::new(io, stream_ns, timeouts));
139 stream.send_header(stream_header).await
140}
141
142/// Accept a new XML stream as responder
143///
144/// Prepares the responer side of an XML stream using the given I/O object
145/// `io`. The default XML namespace will be set to `stream_ns`.
146///
147/// The returned object contains the stream header sent by the remote side
148/// as well as the internal parser state to continue the negotiation.
149pub async fn accept_stream<Io: AsyncBufRead + AsyncWrite + Unpin>(
150 io: Io,
151 stream_ns: &'static str,
152 timeouts: Timeouts,
153) -> Result<AcceptedStream<Io>, io::Error> {
154 let mut stream = RawXmlStream::new(io, stream_ns, timeouts);
155 let header = StreamHeader::recv(Pin::new(&mut stream)).await?;
156 Ok(AcceptedStream { stream, header })
157}
158
159/// A non-success state which may occur while reading an XSO from a
160/// [`XmlStream`]
161#[derive(Debug)]
162pub enum ReadError {
163 /// The soft timeout of the stream triggered.
164 ///
165 /// User code should handle this by sending something into the stream
166 /// which causes the peer to send data before the hard timeout triggers.
167 SoftTimeout,
168
169 /// An I/O error occurred in the underlying I/O object.
170 ///
171 /// This is generally fatal.
172 HardError(io::Error),
173
174 /// A parse error occurred while processing the XSO.
175 ///
176 /// This is non-fatal and more XSOs may be read from the stream.
177 ParseError(xso::error::Error),
178
179 /// The stream footer was received.
180 ///
181 /// Any future read attempts will again return this error. The stream has
182 /// been closed by the peer and you should probably close it, too.
183 StreamFooterReceived,
184}
185
186impl fmt::Display for ReadError {
187 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
188 match self {
189 ReadError::SoftTimeout => write!(f, "soft timeout"),
190 ReadError::HardError(e) => write!(f, "hard error: {}", e),
191 ReadError::ParseError(e) => write!(f, "parse error: {}", e),
192 ReadError::StreamFooterReceived => write!(f, "stream footer received"),
193 }
194 }
195}
196
197impl core::error::Error for ReadError {
198 fn source(&self) -> Option<&(dyn core::error::Error + 'static)> {
199 match self {
200 ReadError::HardError(e) => Some(e),
201 ReadError::ParseError(e) => Some(e),
202 _ => None,
203 }
204 }
205}
206
207enum WriteState {
208 Open,
209 SendElementFoot,
210 FooterSent,
211 Failed,
212}
213
214impl WriteState {
215 fn check_ok(&self) -> io::Result<()> {
216 match self {
217 WriteState::Failed => Err(io::Error::new(
218 io::ErrorKind::NotConnected,
219 "XML stream sink unusable because of previous write error",
220 )),
221 WriteState::Open | WriteState::SendElementFoot | WriteState::FooterSent => Ok(()),
222 }
223 }
224
225 fn check_writable(&self) -> io::Result<()> {
226 match self {
227 WriteState::SendElementFoot | WriteState::FooterSent => Err(io::Error::new(
228 io::ErrorKind::NotConnected,
229 "stream footer already sent",
230 )),
231 WriteState::Failed | WriteState::Open => self.check_ok(),
232 }
233 }
234}
235
236pin_project_lite::pin_project! {
237 /// XML stream
238 ///
239 /// This struct represents an
240 /// [RFC 6120](https://tools.ietf.org/html/rfc6120) XML stream, where the
241 /// payload consists of items of type `T` implementing [`FromXml`] and
242 /// [`AsXml`].
243 ///
244 /// **Note:** The XML stream is a low-level API which you should probably
245 /// not use directly. You may be looking for
246 /// [`StanzaStream`][`crate::stanzastream::StnazaStream`] instead.
247 pub struct XmlStream<Io, T: FromXml> {
248 #[pin]
249 inner: RawXmlStream<Io>,
250 read_state: Option<ReadXsoState<T>>,
251 write_state: WriteState,
252 }
253}
254
255impl<Io, T: FromXml> XmlStream<Io, T> {
256 /// Obtain a reference to the `Io` stream.
257 pub fn get_stream(&self) -> &Io {
258 self.inner.get_stream()
259 }
260}
261
262impl<Io: AsyncBufRead, T: FromXml + AsXml> XmlStream<Io, T> {
263 fn wrap(inner: RawXmlStream<Io>) -> Self {
264 Self {
265 inner,
266 read_state: Some(ReadXsoState::default()),
267 write_state: WriteState::Open,
268 }
269 }
270
271 fn assert_retypable(&self) {
272 match self.read_state {
273 Some(ReadXsoState::PreData) => (),
274 Some(_) => panic!("cannot reset stream: XSO parsing in progress!"),
275 None => panic!("cannot reset stream: stream footer received!"),
276 }
277 match self.write_state.check_writable() {
278 Ok(()) => (),
279 Err(e) => panic!("cannot reset stream: {}", e),
280 }
281 }
282}
283
284impl<Io: AsyncBufRead + AsyncWrite + Unpin, T: FromXml + AsXml + fmt::Debug> XmlStream<Io, T> {
285 /// Initiate a stream reset
286 ///
287 /// To actually send the stream header, call
288 /// [`send_header`][`InitiatingStream::send_header`] on the result.
289 ///
290 /// # Panics
291 ///
292 /// Attempting to reset the stream while an object is being received will
293 /// panic. This can generally only happen if you call `poll_next`
294 /// directly, as doing that is otherwise prevented by the borrowchecker.
295 ///
296 /// In addition, attempting to reset a stream which has been closed by
297 /// either side or which has had an I/O error will also cause a panic.
298 pub fn initiate_reset(self) -> InitiatingStream<Io> {
299 self.assert_retypable();
300
301 let mut stream = self.inner;
302 Pin::new(&mut stream).reset_state();
303 InitiatingStream(stream)
304 }
305
306 /// Trigger a stream reset on the initiator side and await the new stream
307 /// header.
308 ///
309 /// This is the responder-side counterpart to
310 /// [`initiate_reset`][`Self::initiate_reset`]. The element which causes
311 /// the stream reset must be passed as `barrier` and it will be sent
312 /// right before resetting the parser state. This way, the race condition
313 /// outlined in the [`xmlstream`][`self`] module's documentation is
314 /// guaranteed to be avoided.
315 ///
316 /// Note that you should not send the element passed as `barrier` down the
317 /// stream yourself, as this function takes care of it.
318 ///
319 /// # Stream resets without a triggering element
320 ///
321 /// These are not possible to do safely and not specified in RFC 6120,
322 /// hence they cannot be done in [`XmlStream`].
323 ///
324 /// # Panics
325 ///
326 /// Attempting to reset the stream while an object is being received will
327 /// panic. This can generally only happen if you call `poll_next`
328 /// directly, as doing that is otherwise prevented by the borrowchecker.
329 ///
330 /// In addition, attempting to reset a stream which has been closed by
331 /// either side or which has had an I/O error will also cause a panic.
332 pub async fn accept_reset(mut self, barrier: &T) -> io::Result<AcceptedStream<Io>> {
333 self.assert_retypable();
334 self.send(barrier).await?;
335
336 let mut stream = self.inner;
337 Pin::new(&mut stream).reset_state();
338 let header = StreamHeader::recv(Pin::new(&mut stream)).await?;
339 Ok(AcceptedStream { stream, header })
340 }
341
342 /// Discard all XML state and return the inner I/O object.
343 pub fn into_inner(self) -> Io {
344 self.assert_retypable();
345 self.inner.into_inner()
346 }
347
348 /// Box the underlying transport stream.
349 ///
350 /// This removes the specific type of the transport from the XML stream's
351 /// type signature.
352 pub fn box_stream(self) -> XmlStream<Box<dyn AsyncReadAndWrite + Send + 'static>, T>
353 where
354 Io: AsyncReadAndWrite + Send + 'static,
355 {
356 XmlStream {
357 inner: self.inner.box_stream(),
358 read_state: self.read_state,
359 write_state: self.write_state,
360 }
361 }
362}
363
364impl<Io: AsyncBufRead, T: FromXml + AsXml + fmt::Debug> Stream for XmlStream<Io, T> {
365 type Item = Result<T, ReadError>;
366
367 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
368 let mut this = self.project();
369 let result = match this.read_state.as_mut() {
370 None => {
371 // awaiting eof.
372 return loop {
373 match ready!(this.inner.as_mut().poll_next(cx)) {
374 None => break Poll::Ready(None),
375 Some(Ok(_)) => unreachable!("xml parser allowed data after stream footer"),
376 Some(Err(RawError::Io(e))) => {
377 break Poll::Ready(Some(Err(ReadError::HardError(e))))
378 }
379 // Swallow soft timeout, we don't want the user to trigger
380 // anything here.
381 Some(Err(RawError::SoftTimeout)) => continue,
382 }
383 };
384 }
385 Some(read_state) => ready!(read_state.poll_advance(this.inner, cx)),
386 };
387 let result = match result {
388 Ok(v) => Poll::Ready(Some(Ok(v))),
389 Err(ReadXsoError::Hard(e)) => Poll::Ready(Some(Err(ReadError::HardError(e)))),
390 Err(ReadXsoError::Parse(e)) => Poll::Ready(Some(Err(ReadError::ParseError(e)))),
391 Err(ReadXsoError::Footer) => {
392 *this.read_state = None;
393 // Return early here, because we cannot allow recreation of
394 // another read state.
395 return Poll::Ready(Some(Err(ReadError::StreamFooterReceived)));
396 }
397 Err(ReadXsoError::SoftTimeout) => Poll::Ready(Some(Err(ReadError::SoftTimeout))),
398 };
399 *this.read_state = Some(ReadXsoState::default());
400 result
401 }
402}
403
404impl<Io: AsyncWrite, T: FromXml + AsXml> XmlStream<Io, T> {
405 /// Initiate stream shutdown and poll for completion.
406 ///
407 /// Please see [`Self::shutdown`] for details.
408 pub fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), io::Error>> {
409 let mut this = self.project();
410 this.write_state.check_ok()?;
411 loop {
412 match this.write_state {
413 // Open => initiate closing.
414 WriteState::Open => {
415 *this.write_state = WriteState::SendElementFoot;
416 }
417 // Sending => wait for readiness, then send.
418 WriteState::SendElementFoot => {
419 match ready!(this.inner.as_mut().poll_ready(cx))
420 .and_then(|_| this.inner.as_mut().start_send(Item::ElementFoot))
421 {
422 Ok(()) => {
423 log::trace!("stream footer sent successfully");
424 }
425 // If it fails, we fail the sink immediately.
426 Err(e) => {
427 log::debug!(
428 "omitting stream footer: failed to make stream ready: {}",
429 e
430 );
431 *this.write_state = WriteState::Failed;
432 return Poll::Ready(Err(e));
433 }
434 }
435 *this.write_state = WriteState::FooterSent;
436 }
437 // Footer sent => just close the inner stream.
438 WriteState::FooterSent => break,
439 WriteState::Failed => unreachable!(), // caught by check_ok()
440 }
441 }
442 this.inner.poll_shutdown(cx)
443 }
444}
445
446impl<Io: AsyncWrite + Unpin, T: FromXml + AsXml> XmlStream<Io, T> {
447 /// Send the stream footer and close the sender side of the underlying
448 /// transport.
449 ///
450 /// Unlike `poll_close` (from the `Sink` impls), this will not close the
451 /// receiving side of the underlying the transport. It is advisable to call
452 /// `poll_close` eventually after `poll_shutdown` in order to gracefully
453 /// handle situations where the remote side does not close the stream
454 /// cleanly.
455 pub fn shutdown(&mut self) -> Shutdown<'_, Io, T> {
456 Shutdown {
457 stream: Pin::new(self),
458 }
459 }
460}
461
462impl<'x, Io: AsyncWrite, T: FromXml + AsXml, U: AsXml> Sink<&'x U> for XmlStream<Io, T> {
463 type Error = io::Error;
464
465 fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
466 let this = self.project();
467 this.write_state.check_writable()?;
468 this.inner.poll_ready(cx)
469 }
470
471 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
472 let this = self.project();
473 this.write_state.check_writable()?;
474 this.inner.poll_flush(cx)
475 }
476
477 fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
478 ready!(self.as_mut().poll_shutdown(cx))?;
479 let this = self.project();
480 this.inner.poll_close(cx)
481 }
482
483 fn start_send(self: Pin<&mut Self>, item: &'x U) -> Result<(), Self::Error> {
484 let this = self.project();
485 this.write_state.check_writable()?;
486 this.inner.start_send_xso(item)
487 }
488}
489
490/// Future implementing [`XmlStream::shutdown`] using
491/// [`XmlStream::poll_shutdown`].
492pub struct Shutdown<'a, Io: AsyncWrite, T: FromXml + AsXml> {
493 stream: Pin<&'a mut XmlStream<Io, T>>,
494}
495
496impl<'a, Io: AsyncWrite, T: FromXml + AsXml> Future for Shutdown<'a, Io, T> {
497 type Output = io::Result<()>;
498
499 fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
500 self.stream.as_mut().poll_shutdown(cx)
501 }
502}
503
504/// Convenience alias for an XML stream using [`XmppStreamElement`].
505pub type XmppStream<Io> = XmlStream<Io, XmppStreamElement>;