tokio_xmpp/xmlstream/
common.rs

1// Copyright (c) 2024 Jonas Schäfer <jonas@zombofant.net>
2//
3// This Source Code Form is subject to the terms of the Mozilla Public
4// License, v. 2.0. If a copy of the MPL was not distributed with this
5// file, You can obtain one at http://mozilla.org/MPL/2.0/.
6
7use alloc::borrow::Cow;
8use core::{
9    future::Future,
10    pin::Pin,
11    task::{Context, Poll},
12    time::Duration,
13};
14use std::io;
15
16use futures::{ready, Sink, SinkExt, Stream, StreamExt};
17
18use bytes::{Buf, BytesMut};
19
20use tokio::{
21    io::{AsyncBufRead, AsyncWrite},
22    time::Instant,
23};
24
25use xso::{
26    exports::rxml::{
27        self, writer::TrackNamespace, xml_lang::XmlLangStack, xml_ncname, Event, Namespace,
28    },
29    AsXml, FromEventsBuilder, FromXml, Item,
30};
31
32use crate::connect::AsyncReadAndWrite;
33
34use super::capture::{log_enabled, log_recv, log_send, CaptureBufRead};
35
36use xmpp_parsers::ns::STREAM as XML_STREAM_NS;
37
38/// Configuration for timeouts on an XML stream.
39///
40/// The defaults are tuned toward common desktop/laptop use and may not hold
41/// up to extreme conditions (arctic satellite link, mobile internet on a
42/// train in Brandenburg, Germany, and similar) and may be inefficient in
43/// other conditions (stable server link, localhost communication).
44#[derive(Debug, Clone, Copy)]
45pub struct Timeouts {
46    /// Maximum silence time before a
47    /// [`ReadError::SoftTimeout`][`super::ReadError::SoftTimeout`] is
48    /// returned.
49    ///
50    /// Soft timeouts are not fatal, but they must be handled by user code so
51    /// that more data is read after at most [`Self::response_timeout`],
52    /// starting from the moment the soft timeout is returned.
53    pub read_timeout: Duration,
54
55    /// Maximum silence after a soft timeout.
56    ///
57    /// If the stream is silent for longer than this time after a soft timeout
58    /// has been emitted, a hard [`TimedOut`][`io::ErrorKind::TimedOut`]
59    /// I/O error is returned and the stream is to be considered dead.
60    pub response_timeout: Duration,
61}
62
63impl Default for Timeouts {
64    fn default() -> Self {
65        Self {
66            read_timeout: Duration::new(300, 0),
67            response_timeout: Duration::new(300, 0),
68        }
69    }
70}
71
72impl Timeouts {
73    /// Tight timeouts suitable for communicating on a fast LAN or localhost.
74    pub fn tight() -> Self {
75        Self {
76            read_timeout: Duration::new(60, 0),
77            response_timeout: Duration::new(15, 0),
78        }
79    }
80
81    fn data_to_soft(&self) -> Duration {
82        self.read_timeout
83    }
84
85    fn soft_to_warn(&self) -> Duration {
86        self.response_timeout / 2
87    }
88
89    fn warn_to_hard(&self) -> Duration {
90        self.response_timeout / 2
91    }
92}
93
94#[derive(Clone, Copy)]
95enum TimeoutLevel {
96    Soft,
97    Warn,
98    Hard,
99}
100
101#[derive(Debug)]
102pub(super) enum RawError {
103    Io(io::Error),
104    SoftTimeout,
105}
106
107impl From<io::Error> for RawError {
108    fn from(other: io::Error) -> Self {
109        Self::Io(other)
110    }
111}
112
113struct TimeoutState {
114    /// Configuration for the timeouts.
115    timeouts: Timeouts,
116
117    /// Level of the next timeout which will trip.
118    level: TimeoutLevel,
119
120    /// Sleep timer used for read timeouts.
121    // NOTE: even though we pretend we could deal with an !Unpin
122    // RawXmlStream, we really can't: box_stream for example needs it,
123    // but also all the typestate around the initial stream setup needs
124    // to be able to move the stream around.
125    deadline: Pin<Box<tokio::time::Sleep>>,
126}
127
128impl TimeoutState {
129    fn new(timeouts: Timeouts) -> Self {
130        Self {
131            deadline: Box::pin(tokio::time::sleep(timeouts.data_to_soft())),
132            level: TimeoutLevel::Soft,
133            timeouts,
134        }
135    }
136
137    fn poll(&mut self, cx: &mut Context) -> Poll<TimeoutLevel> {
138        ready!(self.deadline.as_mut().poll(cx));
139        // Deadline elapsed!
140        let to_return = self.level;
141        let (next_level, next_duration) = match self.level {
142            TimeoutLevel::Soft => (TimeoutLevel::Warn, self.timeouts.soft_to_warn()),
143            TimeoutLevel::Warn => (TimeoutLevel::Hard, self.timeouts.warn_to_hard()),
144            // Something short-ish so that we fire this over and over until
145            // someone finally kills the stream for good.
146            TimeoutLevel::Hard => (TimeoutLevel::Hard, Duration::new(1, 0)),
147        };
148        self.level = next_level;
149        self.deadline.as_mut().reset(Instant::now() + next_duration);
150        Poll::Ready(to_return)
151    }
152
153    fn reset(&mut self) {
154        self.level = TimeoutLevel::Soft;
155        self.deadline
156            .as_mut()
157            .reset(Instant::now() + self.timeouts.data_to_soft());
158    }
159}
160
161pin_project_lite::pin_project! {
162    // NOTE: due to limitations of pin_project_lite, the field comments are
163    // no doc comments. Luckily, this struct is only `pub(super)` anyway.
164    #[project = RawXmlStreamProj]
165    pub(super) struct RawXmlStream<Io> {
166        // The parser used for deserialising data.
167        #[pin]
168        parser: rxml::AsyncReader<CaptureBufRead<Io>>,
169
170        // Tracker for `xml:lang` data.
171        lang_stack: XmlLangStack,
172
173        // The writer used for serialising data.
174        writer: rxml::writer::Encoder<rxml::writer::SimpleNamespaces>,
175
176        timeouts: TimeoutState,
177
178        // The default namespace to declare on the stream header.
179        stream_ns: &'static str,
180
181        // Buffer containing serialised data which will then be sent through
182        // the inner `Io`. Sending that serialised data happens in
183        // `poll_ready` and `poll_flush`, while appending serialised data
184        // happens in `start_send`.
185        tx_buffer: BytesMut,
186
187        // Position inside tx_buffer up to which to-be-sent data has already
188        // been logged.
189        tx_buffer_logged: usize,
190
191        // This signifies the limit at the point of which the Sink will
192        // refuse to accept more data: if the `tx_buffer`'s size grows beyond
193        // that high water mark, poll_ready will return Poll::Pending until
194        // it has managed to flush enough data down the inner writer.
195        //
196        // Note that poll_ready will always attempt to progress the writes,
197        // which further reduces the chance of hitting this limit unless
198        // either the underlying writer gets stuck (e.g. TCP connection
199        // breaking in a timeouty way) or a lot of data is written in bulk.
200        // In both cases, the backpressure created by poll_ready returning
201        // Pending is desirable.
202        //
203        // However, there is a catch: We don't assert this condition
204        // in `start_send` at all. The reason is that we cannot suspend
205        // serialisation of an XSO in the middle of writing it: it has to be
206        // written in one batch or you have to start over later (this has to
207        // do with the iterator state borrowing the data and futures getting
208        // cancelled e.g. in tokio::select!). In order to facilitate
209        // implementing a `Sink<T: AsXml>` on top of `RawXmlStream`, we
210        // cannot be strict about what is going on in `start_send`:
211        // `poll_ready` does not know what kind of data will be written (so
212        // it could not make a size estimate, even if that was at all
213        // possible with AsXml) and `start_send` is not a coroutine. So if
214        // `Sink<T: AsXml>` wants to use `RawXmlStream`, it must be able to
215        // submit an entire XSO's items in one batch to `RawXmlStream` after
216        // it has reported to be ready once. That may easily make the buffer
217        // reach its high water mark.
218        //
219        // So if we checked that condition in `start_send` (as opposed to
220        // `poll_ready`), we would cause situations where submitting XSOs
221        // failed randomly (with a panic or other errors) and would have to
222        // be retried later.
223        //
224        // While failing with e.g. io::ErrorKind::WouldBlock is something
225        // that could be investigated later, it would still require being
226        // able to make an accurate estimate of the number of bytes needed to
227        // serialise any given `AsXml`, because as pointed out earlier, once
228        // we have started, there is no going back.
229        //
230        // Finally, none of that hurts much because `RawXmlStream` is only an
231        // internal API. The high-level APIs will always call `poll_ready`
232        // before sending an XSO, which means that we won't *grossly* go over
233        // the TX buffer high water mark---unless you send a really large
234        // XSO at once.
235        tx_buffer_high_water_mark: usize,
236    }
237}
238
239impl<Io: AsyncBufRead + AsyncWrite> RawXmlStream<Io> {
240    fn new_writer(
241        stream_ns: &'static str,
242    ) -> rxml::writer::Encoder<rxml::writer::SimpleNamespaces> {
243        let mut writer = rxml::writer::Encoder::new();
244        writer
245            .ns_tracker_mut()
246            .declare_fixed(Some(xml_ncname!("stream")), XML_STREAM_NS.into());
247        writer
248            .ns_tracker_mut()
249            .declare_fixed(None, stream_ns.into());
250        writer
251    }
252
253    pub(super) fn new(io: Io, stream_ns: &'static str, timeouts: Timeouts) -> Self {
254        let parser = rxml::Parser::default();
255        let mut io = CaptureBufRead::wrap(io);
256        if log_enabled() {
257            io.enable_capture();
258        }
259        Self {
260            parser: rxml::AsyncReader::wrap(io, parser),
261            writer: Self::new_writer(stream_ns),
262            lang_stack: XmlLangStack::new(),
263            timeouts: TimeoutState::new(timeouts),
264            tx_buffer_logged: 0,
265            stream_ns,
266            tx_buffer: BytesMut::new(),
267
268            // This basically means: "if we already have 2 kiB in our send
269            // buffer, do not accept more data".
270            // Please see the extensive words at
271            //`Self::tx_buffer_high_water_mark` for details.
272            tx_buffer_high_water_mark: 2048,
273        }
274    }
275
276    pub(super) fn reset_state(self: Pin<&mut Self>) {
277        let this = self.project();
278        *this.parser.parser_pinned() = rxml::Parser::default();
279        *this.lang_stack = XmlLangStack::new();
280        *this.writer = Self::new_writer(this.stream_ns);
281    }
282
283    pub(super) fn into_inner(self) -> Io {
284        self.parser.into_inner().0.into_inner()
285    }
286
287    /// Box the underlying transport stream.
288    ///
289    /// This removes the specific type of the transport from the XML stream's
290    /// type signature.
291    pub(super) fn box_stream(self) -> RawXmlStream<Box<dyn AsyncReadAndWrite + Send + 'static>>
292    where
293        Io: AsyncReadAndWrite + Send + 'static,
294    {
295        let (io, p) = self.parser.into_inner();
296        let mut io = CaptureBufRead::wrap(Box::new(io) as Box<_>);
297        if log_enabled() {
298            io.enable_capture();
299        }
300        let parser = rxml::AsyncReader::wrap(io, p);
301        RawXmlStream {
302            parser,
303            lang_stack: XmlLangStack::new(),
304            timeouts: self.timeouts,
305            writer: self.writer,
306            tx_buffer: self.tx_buffer,
307            tx_buffer_logged: self.tx_buffer_logged,
308            tx_buffer_high_water_mark: self.tx_buffer_high_water_mark,
309            stream_ns: self.stream_ns,
310        }
311    }
312}
313
314impl<Io: AsyncWrite> RawXmlStream<Io> {
315    /// Start sending an entire XSO.
316    ///
317    /// Unlike the `Sink` implementation, this provides nice syntax
318    /// highlighting for the serialised data in log outputs (if enabled) *and*
319    /// is error safe: if the XSO fails to serialise completely, it will be as
320    /// if it hadn't been attempted to serialise it at all.
321    ///
322    /// Note that, like with `start_send`, the caller is responsible for
323    /// ensuring that the stream is ready by polling
324    /// [`<Self as Sink>::poll_ready`] as needed.
325    pub(super) fn start_send_xso<T: AsXml>(self: Pin<&mut Self>, xso: &T) -> io::Result<()> {
326        let mut this = self.project();
327        let prev_len = this.tx_buffer.len();
328        match this.try_send_xso(xso) {
329            Ok(()) => Ok(()),
330            Err(e) => {
331                let curr_len = this.tx_buffer.len();
332                this.tx_buffer.truncate(prev_len);
333                log::trace!(
334                    "SEND failed: {}. Rewinding buffer by {} bytes.",
335                    e,
336                    curr_len - prev_len
337                );
338                Err(e)
339            }
340        }
341    }
342}
343
344impl<Io> RawXmlStream<Io> {
345    fn parser_pinned(self: Pin<&mut Self>) -> &mut rxml::Parser {
346        self.project().parser.parser_pinned()
347    }
348
349    fn stream_pinned(self: Pin<&mut Self>) -> Pin<&mut CaptureBufRead<Io>> {
350        self.project().parser.inner_pinned()
351    }
352
353    pub(super) fn get_stream(&self) -> &Io {
354        self.parser.inner().inner()
355    }
356}
357
358impl<Io: AsyncBufRead> Stream for RawXmlStream<Io> {
359    type Item = Result<rxml::Event, RawError>;
360
361    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
362        let mut this = self.project();
363        loop {
364            match this.parser.as_mut().poll_read(cx) {
365                Poll::Pending => (),
366                Poll::Ready(v) => {
367                    this.timeouts.reset();
368                    match v.transpose() {
369                        // Skip the XML declaration, nobody wants to hear about that.
370                        Some(Ok(rxml::Event::XmlDeclaration(_, _))) => continue,
371                        Some(Ok(event)) => {
372                            this.lang_stack.handle_event(&event);
373                            return Poll::Ready(Some(Ok(event)));
374                        }
375                        other => return Poll::Ready(other.map(|x| x.map_err(RawError::Io))),
376                    }
377                }
378            };
379
380            // poll_read returned pending... what do the timeouts have to say?
381            match ready!(this.timeouts.poll(cx)) {
382                TimeoutLevel::Soft => return Poll::Ready(Some(Err(RawError::SoftTimeout))),
383                TimeoutLevel::Warn => (),
384                TimeoutLevel::Hard => {
385                    return Poll::Ready(Some(Err(RawError::Io(io::Error::new(
386                        io::ErrorKind::TimedOut,
387                        "read and response timeouts elapsed",
388                    )))))
389                }
390            }
391        }
392    }
393}
394
395impl<Io: AsyncWrite> RawXmlStreamProj<'_, Io> {
396    fn flush_tx_log(&mut self) {
397        let range = &self.tx_buffer[*self.tx_buffer_logged..];
398        if range.is_empty() {
399            return;
400        }
401        log_send(range);
402        *self.tx_buffer_logged = self.tx_buffer.len();
403    }
404
405    fn start_send(&mut self, item: &xso::Item<'_>) -> io::Result<()> {
406        self.writer
407            .encode_into_bytes(item.as_rxml_item(), self.tx_buffer)
408            .map_err(|e| io::Error::new(io::ErrorKind::InvalidInput, e))
409    }
410
411    fn try_send_xso<T: AsXml>(&mut self, xso: &T) -> io::Result<()> {
412        let iter = match xso.as_xml_iter() {
413            Ok(v) => v,
414            Err(e) => return Err(io::Error::new(io::ErrorKind::InvalidInput, e)),
415        };
416        for item in iter {
417            let item = match item {
418                Ok(v) => v,
419                Err(e) => return Err(io::Error::new(io::ErrorKind::InvalidInput, e)),
420            };
421            self.start_send(&item)?;
422        }
423        Ok(())
424    }
425
426    fn progress_write(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
427        self.flush_tx_log();
428        while !self.tx_buffer.is_empty() {
429            let written = match ready!(self
430                .parser
431                .as_mut()
432                .inner_pinned()
433                .poll_write(cx, self.tx_buffer))
434            {
435                Ok(v) => v,
436                Err(e) => return Poll::Ready(Err(e)),
437            };
438            self.tx_buffer.advance(written);
439            *self.tx_buffer_logged = self
440                .tx_buffer_logged
441                .checked_sub(written)
442                .expect("Buffer arithmetic error");
443        }
444        Poll::Ready(Ok(()))
445    }
446}
447
448impl<Io: AsyncWrite> RawXmlStream<Io> {
449    /// Flush all buffered data and shut down the sender side of the
450    /// underlying transport.
451    ///
452    /// Unlike `poll_close` (from the `Sink` impls), this will not close the
453    /// receiving side of the underlying the transport. It is advisable to call
454    /// `poll_close` eventually after `poll_shutdown` in order to gracefully
455    /// handle situations where the remote side does not close the stream
456    /// cleanly.
457    pub fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
458        ready!(self.as_mut().poll_flush(cx))?;
459        let this = self.project();
460        this.parser.inner_pinned().poll_shutdown(cx)
461    }
462}
463
464impl<'x, Io: AsyncWrite> Sink<xso::Item<'x>> for RawXmlStream<Io> {
465    type Error = io::Error;
466
467    fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
468        let mut this = self.project();
469        match this.progress_write(cx) {
470            // No progress on write, but if we have enough space in the buffer
471            // it's ok nonetheless.
472            Poll::Pending => (),
473            // Some progress and it went fine, move on.
474            Poll::Ready(Ok(())) => (),
475            // Something went wrong -> return the error.
476            Poll::Ready(Err(e)) => return Poll::Ready(Err(e)),
477        }
478        if this.tx_buffer.len() < *this.tx_buffer_high_water_mark {
479            Poll::Ready(Ok(()))
480        } else {
481            Poll::Pending
482        }
483    }
484
485    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
486        let mut this = self.project();
487        ready!(this.progress_write(cx))?;
488        this.parser.as_mut().inner_pinned().poll_flush(cx)
489    }
490
491    fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
492        let mut this = self.project();
493        ready!(this.progress_write(cx))?;
494        this.parser.as_mut().inner_pinned().poll_shutdown(cx)
495    }
496
497    fn start_send(self: Pin<&mut Self>, item: xso::Item<'x>) -> Result<(), Self::Error> {
498        let mut this = self.project();
499        this.start_send(&item)
500    }
501}
502
503/// Error returned by the [`ReadXso`] future and the [`ReadXsoState`] helper.
504pub(super) enum ReadXsoError {
505    /// The outer element was closed before a child element could be read.
506    ///
507    /// This is typically the stream footer in XML stream applications.
508    Footer,
509
510    /// A hard error occurred.
511    ///
512    /// This is either a real I/O error or an error from the XML parser.
513    /// Neither are recoverable, because the nesting state is lost and
514    /// in addition, XML errors are not recoverable because they indicate a
515    /// not well-formed document.
516    Hard(io::Error),
517
518    /// The underlying stream signalled a soft read timeout before a child
519    /// element could be read.
520    ///
521    /// Note that soft timeouts which are triggered in the middle of receiving
522    /// an element are converted to hard timeouts (i.e. I/O errors).
523    ///
524    /// This masking is intentional, because:
525    /// - Returning a [`Self::SoftTimeout`] from the middle of parsing is not
526    ///   possible without complicating the API.
527    /// - There is no reason why the remote side should interrupt sending data
528    ///   in the middle of an element except if it or the transport has failed
529    ///   fatally.
530    SoftTimeout,
531
532    /// A parse error occurred.
533    ///
534    /// The XML structure was well-formed, but the data contained did not
535    /// match the XSO which was attempted to be parsed. This error is
536    /// recoverable: when this error is emitted, the XML stream is at the same
537    /// nesting level as it was before the XSO was attempted to be read; all
538    /// XML structure which belonged to the XSO which failed to parse has
539    /// been consumed. This allows to read more XSOs even if one fails to
540    /// parse.
541    Parse(xso::error::Error),
542}
543
544impl From<io::Error> for ReadXsoError {
545    fn from(other: io::Error) -> Self {
546        Self::Hard(other)
547    }
548}
549
550impl From<xso::error::Error> for ReadXsoError {
551    fn from(other: xso::error::Error) -> Self {
552        Self::Parse(other)
553    }
554}
555
556/// State for reading an XSO from a `Stream<Item = Result<rxml::Event, ...>>`.
557///
558/// Due to pinning, it is simpler to implement the statemachine in a dedicated
559/// enum and let the actual (pinned) future pass the stream toward this enum's
560/// function.
561///
562/// This is used by both [`ReadXso`] and the [`super::XmlStream`] itself.
563#[derive(Default)]
564pub(super) enum ReadXsoState<T: FromXml> {
565    /// The [`rxml::Event::StartElement`] event was not seen yet.
566    ///
567    /// In this state, XML whitespace is ignored (as per RFC 6120 § 11.7), but
568    /// other text data is rejected.
569    #[default]
570    PreData,
571
572    /// The [`rxml::Event::StartElement`] event was received.
573    ///
574    /// The inner value is the builder for the "return type" of this enum and
575    /// the implementation in the [`xso`] crate does all the heavy lifting:
576    /// we'll only send events in its general direction.
577    // We use the fallible parsing here so that we don't have to do the depth
578    // accounting ourselves.
579    Parsing(<Result<T, xso::error::Error> as FromXml>::Builder),
580
581    /// The parsing has completed (successful or not).
582    ///
583    /// This is a final state and attempting to advance the state will panic.
584    /// This is in accordance with [`core::future::Future::poll`]'s contract,
585    /// for which this enum is primarily used.
586    Done,
587}
588
589impl<T: FromXml> ReadXsoState<T> {
590    /// Progress reading the XSO from the given source.
591    ///
592    /// This attempts to parse a single XSO from the underlying stream,
593    /// while discarding any XML whitespace before the beginning of the XSO.
594    ///
595    /// If the XSO is parsed successfully, the method returns Ready with the
596    /// parsed value. If parsing fails or an I/O error occurs, an appropriate
597    /// error is returned.
598    ///
599    /// If parsing fails, the entire XML subtree belonging to the XSO is
600    /// nonetheless processed. That makes parse errors recoverable: After
601    /// `poll_advance` has returned Ready with either  an Ok result or a
602    /// [`ReadXsoError::Parse`] error variant, another XSO can be read and the
603    /// XML parsing will be at the same nesting depth as it was before the
604    /// first call to `poll_advance`.
605    ///
606    /// Note that this guarantee does not hold for non-parse errors (i.e. for
607    /// the other variants of [`ReadXsoError`]): I/O errors as well as
608    /// occurrence of the outer closing element are fatal.
609    ///
610    /// The `source` passed to `poll_advance` should be the same on every
611    /// call.
612    pub(super) fn poll_advance<Io: AsyncBufRead>(
613        &mut self,
614        mut source: Pin<&mut RawXmlStream<Io>>,
615        cx: &mut Context<'_>,
616    ) -> Poll<Result<T, ReadXsoError>> {
617        loop {
618            // Disable text buffering before the start event. That way, we
619            // don't accumulate infinite amounts of XML whitespace caused by
620            // whitespace keepalives.
621            // (And also, we'll know faster when the remote side sends
622            // non-whitespace garbage.)
623            let text_buffering = !matches!(self, ReadXsoState::PreData);
624            source
625                .as_mut()
626                .parser_pinned()
627                .set_text_buffering(text_buffering);
628
629            let ev = ready!(source.as_mut().poll_next(cx)).transpose();
630            match self {
631                ReadXsoState::PreData => {
632                    log::trace!("ReadXsoState::PreData ev = {:?}", ev);
633                    match ev {
634                        Ok(Some(rxml::Event::XmlDeclaration(_, _))) => (),
635                        Ok(Some(rxml::Event::Text(_, data))) => {
636                            if xso::is_xml_whitespace(data.as_bytes()) {
637                                log::trace!("Received {} bytes of whitespace", data.len());
638                                source.as_mut().stream_pinned().discard_capture();
639                                continue;
640                            } else {
641                                *self = ReadXsoState::Done;
642                                return Poll::Ready(Err(io::Error::new(
643                                    io::ErrorKind::InvalidData,
644                                    "non-whitespace text content before XSO",
645                                )
646                                .into()));
647                            }
648                        }
649                        Ok(Some(rxml::Event::StartElement(_, name, attrs))) => {
650                            let source_tmp = source.as_mut();
651                            let ctx = xso::Context::new(source_tmp.lang_stack.current());
652                            *self = ReadXsoState::Parsing(
653                                <Result<T, xso::error::Error> as FromXml>::from_events(
654                                    name, attrs, &ctx,
655                                )
656                                .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?,
657                            );
658                        }
659                        // Amounts to EOF, as we expect to start on the stream level.
660                        Ok(Some(rxml::Event::EndElement(_))) => {
661                            *self = ReadXsoState::Done;
662                            return Poll::Ready(Err(ReadXsoError::Footer));
663                        }
664                        Ok(None) => {
665                            *self = ReadXsoState::Done;
666                            return Poll::Ready(Err(io::Error::new(
667                                io::ErrorKind::InvalidData,
668                                "eof before XSO started",
669                            )
670                            .into()));
671                        }
672                        Err(RawError::SoftTimeout) => {
673                            *self = ReadXsoState::Done;
674                            return Poll::Ready(Err(ReadXsoError::SoftTimeout));
675                        }
676                        Err(RawError::Io(e)) => {
677                            *self = ReadXsoState::Done;
678                            return Poll::Ready(Err(ReadXsoError::Hard(e)));
679                        }
680                    }
681                }
682                ReadXsoState::Parsing(builder) => {
683                    log::trace!("ReadXsoState::Parsing ev = {:?}", ev);
684                    let ev = match ev {
685                        Ok(Some(ev)) => ev,
686                        Ok(None) => {
687                            *self = ReadXsoState::Done;
688                            return Poll::Ready(Err(io::Error::new(
689                                io::ErrorKind::UnexpectedEof,
690                                "eof during XSO parsing",
691                            )
692                            .into()));
693                        }
694                        Err(RawError::Io(e)) => {
695                            *self = ReadXsoState::Done;
696                            return Poll::Ready(Err(e.into()));
697                        }
698                        Err(RawError::SoftTimeout) => {
699                            // See also [`ReadXsoError::SoftTimeout`] for why
700                            // we mask the SoftTimeout condition here.
701                            *self = ReadXsoState::Done;
702                            return Poll::Ready(Err(io::Error::new(
703                                io::ErrorKind::TimedOut,
704                                "read timeout during XSO parsing",
705                            )
706                            .into()));
707                        }
708                    };
709
710                    let source_tmp = source.as_mut();
711                    let ctx = xso::Context::new(source_tmp.lang_stack.current());
712                    match builder.feed(ev, &ctx) {
713                        Err(err) => {
714                            *self = ReadXsoState::Done;
715                            source.as_mut().stream_pinned().discard_capture();
716                            return Poll::Ready(Err(io::Error::new(
717                                io::ErrorKind::InvalidData,
718                                err,
719                            )
720                            .into()));
721                        }
722                        Ok(Some(Err(err))) => {
723                            *self = ReadXsoState::Done;
724                            log_recv(Some(&err), source.as_mut().stream_pinned().take_capture());
725                            return Poll::Ready(Err(ReadXsoError::Parse(err)));
726                        }
727                        Ok(Some(Ok(value))) => {
728                            *self = ReadXsoState::Done;
729                            log_recv(None, source.as_mut().stream_pinned().take_capture());
730                            return Poll::Ready(Ok(value));
731                        }
732                        Ok(None) => (),
733                    }
734                }
735
736                // The error talks about "future", simply because that is
737                // where `Self` is used (inside `core::future::Future::poll`).
738                ReadXsoState::Done => panic!("future polled after completion"),
739            }
740        }
741    }
742}
743
744/// Future to read a single XSO from a stream.
745pub(super) struct ReadXso<'x, Io, T: FromXml> {
746    /// Stream to read the future from.
747    inner: Pin<&'x mut RawXmlStream<Io>>,
748
749    /// Current state of parsing.
750    state: ReadXsoState<T>,
751}
752
753impl<'x, Io: AsyncBufRead, T: FromXml> ReadXso<'x, Io, T> {
754    /// Start reading a single XSO from a stream.
755    pub(super) fn read_from(stream: Pin<&'x mut RawXmlStream<Io>>) -> Self {
756        Self {
757            inner: stream,
758            state: ReadXsoState::PreData,
759        }
760    }
761}
762
763impl<Io: AsyncBufRead, T: FromXml> Future for ReadXso<'_, Io, T>
764where
765    T::Builder: Unpin,
766{
767    type Output = Result<T, ReadXsoError>;
768
769    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
770        let this = self.get_mut();
771        this.state.poll_advance(this.inner.as_mut(), cx)
772    }
773}
774
775/// Contains metadata from an XML stream header
776#[derive(Default, Debug)]
777pub struct StreamHeader<'x> {
778    /// The optional `from` attribute.
779    pub from: Option<Cow<'x, str>>,
780
781    /// The optional `to` attribute.
782    pub to: Option<Cow<'x, str>>,
783
784    /// The optional `id` attribute.
785    pub id: Option<Cow<'x, str>>,
786}
787
788impl StreamHeader<'_> {
789    /// Take the contents and return them as new object.
790    ///
791    /// `self` will be left with all its parts set to `None`.
792    pub fn take(&mut self) -> Self {
793        Self {
794            from: self.from.take(),
795            to: self.to.take(),
796            id: self.id.take(),
797        }
798    }
799
800    pub(super) async fn send<Io: AsyncWrite>(
801        self,
802        mut stream: Pin<&mut RawXmlStream<Io>>,
803    ) -> io::Result<()> {
804        stream
805            .send(Item::XmlDeclaration(rxml::XmlVersion::V1_0))
806            .await?;
807        stream
808            .send(Item::ElementHeadStart(
809                Namespace::from(XML_STREAM_NS),
810                Cow::Borrowed(xml_ncname!("stream")),
811            ))
812            .await?;
813        if let Some(from) = self.from {
814            stream
815                .send(Item::Attribute(
816                    Namespace::NONE,
817                    Cow::Borrowed(xml_ncname!("from")),
818                    from,
819                ))
820                .await?;
821        }
822        if let Some(to) = self.to {
823            stream
824                .send(Item::Attribute(
825                    Namespace::NONE,
826                    Cow::Borrowed(xml_ncname!("to")),
827                    to,
828                ))
829                .await?;
830        }
831        if let Some(id) = self.id {
832            stream
833                .send(Item::Attribute(
834                    Namespace::NONE,
835                    Cow::Borrowed(xml_ncname!("id")),
836                    id,
837                ))
838                .await?;
839        }
840        stream
841            .send(Item::Attribute(
842                Namespace::NONE,
843                Cow::Borrowed(xml_ncname!("version")),
844                Cow::Borrowed("1.0"),
845            ))
846            .await?;
847        stream.send(Item::ElementHeadEnd).await?;
848        Ok(())
849    }
850}
851
852impl StreamHeader<'static> {
853    pub(super) async fn recv<Io: AsyncBufRead>(
854        mut stream: Pin<&mut RawXmlStream<Io>>,
855    ) -> io::Result<Self> {
856        loop {
857            match stream.as_mut().next().await {
858                Some(Err(RawError::Io(e))) => return Err(e),
859                Some(Err(RawError::SoftTimeout)) => (),
860                Some(Ok(Event::StartElement(_, (ns, name), mut attrs))) => {
861                    if ns != XML_STREAM_NS || name != "stream" {
862                        return Err(io::Error::new(
863                            io::ErrorKind::InvalidData,
864                            "unknown stream header",
865                        ));
866                    }
867
868                    match attrs.remove(Namespace::none(), "version") {
869                        Some(v) => {
870                            if v != "1.0" {
871                                return Err(io::Error::new(
872                                    io::ErrorKind::InvalidData,
873                                    format!("unsuppored stream version: {}", v),
874                                ));
875                            }
876                        }
877                        None => {
878                            return Err(io::Error::new(
879                                io::ErrorKind::InvalidData,
880                                "required `version` attribute missing",
881                            ))
882                        }
883                    }
884
885                    let from = attrs.remove(Namespace::none(), "from");
886                    let to = attrs.remove(Namespace::none(), "to");
887                    let id = attrs.remove(Namespace::none(), "id");
888                    let _ = attrs.remove(Namespace::xml(), "lang");
889
890                    if let Some(((ns, name), _)) = attrs.into_iter().next() {
891                        return Err(io::Error::new(
892                            io::ErrorKind::InvalidData,
893                            format!("unexpected stream header attribute: {{{}}}{}", ns, name),
894                        ));
895                    }
896
897                    return Ok(StreamHeader {
898                        from: from.map(Cow::Owned),
899                        to: to.map(Cow::Owned),
900                        id: id.map(Cow::Owned),
901                    });
902                }
903                Some(Ok(Event::Text(_, _))) | Some(Ok(Event::EndElement(_))) => {
904                    return Err(io::Error::new(
905                        io::ErrorKind::UnexpectedEof,
906                        "unexpected content before stream header",
907                    ))
908                }
909                // We cannot loop infinitely here because the XML parser will
910                // prevent more than one XML declaration from being parsed.
911                Some(Ok(Event::XmlDeclaration(_, _))) => (),
912                None => {
913                    return Err(io::Error::new(
914                        io::ErrorKind::UnexpectedEof,
915                        "eof before stream header",
916                    ))
917                }
918            }
919        }
920    }
921}