tokio_xmpp/xmlstream/
common.rs

1// Copyright (c) 2024 Jonas Schäfer <jonas@zombofant.net>
2//
3// This Source Code Form is subject to the terms of the Mozilla Public
4// License, v. 2.0. If a copy of the MPL was not distributed with this
5// file, You can obtain one at http://mozilla.org/MPL/2.0/.
6
7use alloc::borrow::Cow;
8use core::{
9    future::Future,
10    pin::Pin,
11    task::{Context, Poll},
12    time::Duration,
13};
14use std::io;
15
16use futures::{ready, Sink, SinkExt, Stream, StreamExt};
17
18use bytes::{Buf, BytesMut};
19
20use tokio::{
21    io::{AsyncBufRead, AsyncWrite},
22    time::Instant,
23};
24
25use xso::{
26    exports::rxml::{
27        self, writer::TrackNamespace, xml_lang::XmlLangStack, xml_ncname, Event, Namespace,
28    },
29    AsXml, FromEventsBuilder, FromXml, Item,
30};
31
32use crate::connect::AsyncReadAndWrite;
33
34use super::capture::{log_enabled, log_recv, log_send, CaptureBufRead};
35
36use xmpp_parsers::ns::STREAM as XML_STREAM_NS;
37
38/// Configuration for timeouts on an XML stream.
39///
40/// The defaults are tuned toward common desktop/laptop use and may not hold
41/// up to extreme conditions (arctic satellite link, mobile internet on a
42/// train in Brandenburg, Germany, and similar) and may be inefficient in
43/// other conditions (stable server link, localhost communication).
44#[derive(Debug, Clone, Copy)]
45pub struct Timeouts {
46    /// Maximum silence time before a
47    /// [`ReadError::SoftTimeout`][`super::ReadError::SoftTimeout`] is
48    /// returned.
49    ///
50    /// Soft timeouts are not fatal, but they must be handled by user code so
51    /// that more data is read after at most [`Self::response_timeout`],
52    /// starting from the moment the soft timeout is returned.
53    pub read_timeout: Duration,
54
55    /// Maximum silence after a soft timeout.
56    ///
57    /// If the stream is silent for longer than this time after a soft timeout
58    /// has been emitted, a hard [`TimedOut`][`io::ErrorKind::TimedOut`]
59    /// I/O error is returned and the stream is to be considered dead.
60    pub response_timeout: Duration,
61}
62
63impl Default for Timeouts {
64    fn default() -> Self {
65        Self {
66            read_timeout: Duration::new(300, 0),
67            response_timeout: Duration::new(300, 0),
68        }
69    }
70}
71
72impl Timeouts {
73    /// Tight timeouts suitable for communicating on a fast LAN or localhost.
74    pub fn tight() -> Self {
75        Self {
76            read_timeout: Duration::new(60, 0),
77            response_timeout: Duration::new(15, 0),
78        }
79    }
80
81    fn data_to_soft(&self) -> Duration {
82        self.read_timeout
83    }
84
85    fn soft_to_warn(&self) -> Duration {
86        self.response_timeout / 2
87    }
88
89    fn warn_to_hard(&self) -> Duration {
90        self.response_timeout / 2
91    }
92}
93
94#[derive(Clone, Copy)]
95enum TimeoutLevel {
96    Soft,
97    Warn,
98    Hard,
99}
100
101#[derive(Debug)]
102pub(super) enum RawError {
103    Io(io::Error),
104    SoftTimeout,
105}
106
107impl From<io::Error> for RawError {
108    fn from(other: io::Error) -> Self {
109        Self::Io(other)
110    }
111}
112
113struct TimeoutState {
114    /// Configuration for the timeouts.
115    timeouts: Timeouts,
116
117    /// Level of the next timeout which will trip.
118    level: TimeoutLevel,
119
120    /// Sleep timer used for read timeouts.
121    // NOTE: even though we pretend we could deal with an !Unpin
122    // RawXmlStream, we really can't: box_stream for example needs it,
123    // but also all the typestate around the initial stream setup needs
124    // to be able to move the stream around.
125    deadline: Pin<Box<tokio::time::Sleep>>,
126}
127
128impl TimeoutState {
129    fn new(timeouts: Timeouts) -> Self {
130        Self {
131            deadline: Box::pin(tokio::time::sleep(timeouts.data_to_soft())),
132            level: TimeoutLevel::Soft,
133            timeouts,
134        }
135    }
136
137    fn poll(&mut self, cx: &mut Context) -> Poll<TimeoutLevel> {
138        ready!(self.deadline.as_mut().poll(cx));
139        // Deadline elapsed!
140        let to_return = self.level;
141        let (next_level, next_duration) = match self.level {
142            TimeoutLevel::Soft => (TimeoutLevel::Warn, self.timeouts.soft_to_warn()),
143            TimeoutLevel::Warn => (TimeoutLevel::Hard, self.timeouts.warn_to_hard()),
144            // Something short-ish so that we fire this over and over until
145            // someone finally kills the stream for good.
146            TimeoutLevel::Hard => (TimeoutLevel::Hard, Duration::new(1, 0)),
147        };
148        self.level = next_level;
149        self.deadline.as_mut().reset(Instant::now() + next_duration);
150        Poll::Ready(to_return)
151    }
152
153    fn reset(&mut self) {
154        self.level = TimeoutLevel::Soft;
155        self.deadline
156            .as_mut()
157            .reset(Instant::now() + self.timeouts.data_to_soft());
158    }
159}
160
161pin_project_lite::pin_project! {
162    // NOTE: due to limitations of pin_project_lite, the field comments are
163    // no doc comments. Luckily, this struct is only `pub(super)` anyway.
164    #[project = RawXmlStreamProj]
165    pub(super) struct RawXmlStream<Io> {
166        // The parser used for deserialising data.
167        #[pin]
168        parser: rxml::AsyncReader<CaptureBufRead<Io>>,
169
170        // Tracker for `xml:lang` data.
171        lang_stack: XmlLangStack,
172
173        // The writer used for serialising data.
174        writer: rxml::writer::Encoder<rxml::writer::SimpleNamespaces>,
175
176        timeouts: TimeoutState,
177
178        // The default namespace to declare on the stream header.
179        stream_ns: &'static str,
180
181        // Buffer containing serialised data which will then be sent through
182        // the inner `Io`. Sending that serialised data happens in
183        // `poll_ready` and `poll_flush`, while appending serialised data
184        // happens in `start_send`.
185        tx_buffer: BytesMut,
186
187        // Position inside tx_buffer up to which to-be-sent data has already
188        // been logged.
189        tx_buffer_logged: usize,
190
191        // This signifies the limit at the point of which the Sink will
192        // refuse to accept more data: if the `tx_buffer`'s size grows beyond
193        // that high water mark, poll_ready will return Poll::Pending until
194        // it has managed to flush enough data down the inner writer.
195        //
196        // Note that poll_ready will always attempt to progress the writes,
197        // which further reduces the chance of hitting this limit unless
198        // either the underlying writer gets stuck (e.g. TCP connection
199        // breaking in a timeouty way) or a lot of data is written in bulk.
200        // In both cases, the backpressure created by poll_ready returning
201        // Pending is desirable.
202        //
203        // However, there is a catch: We don't assert this condition
204        // in `start_send` at all. The reason is that we cannot suspend
205        // serialisation of an XSO in the middle of writing it: it has to be
206        // written in one batch or you have to start over later (this has to
207        // do with the iterator state borrowing the data and futures getting
208        // cancelled e.g. in tokio::select!). In order to facilitate
209        // implementing a `Sink<T: AsXml>` on top of `RawXmlStream`, we
210        // cannot be strict about what is going on in `start_send`:
211        // `poll_ready` does not know what kind of data will be written (so
212        // it could not make a size estimate, even if that was at all
213        // possible with AsXml) and `start_send` is not a coroutine. So if
214        // `Sink<T: AsXml>` wants to use `RawXmlStream`, it must be able to
215        // submit an entire XSO's items in one batch to `RawXmlStream` after
216        // it has reported to be ready once. That may easily make the buffer
217        // reach its high water mark.
218        //
219        // So if we checked that condition in `start_send` (as opposed to
220        // `poll_ready`), we would cause situations where submitting XSOs
221        // failed randomly (with a panic or other errors) and would have to
222        // be retried later.
223        //
224        // While failing with e.g. io::ErrorKind::WouldBlock is something
225        // that could be investigated later, it would still require being
226        // able to make an accurate estimate of the number of bytes needed to
227        // serialise any given `AsXml`, because as pointed out earlier, once
228        // we have started, there is no going back.
229        //
230        // Finally, none of that hurts much because `RawXmlStream` is only an
231        // internal API. The high-level APIs will always call `poll_ready`
232        // before sending an XSO, which means that we won't *grossly* go over
233        // the TX buffer high water mark---unless you send a really large
234        // XSO at once.
235        tx_buffer_high_water_mark: usize,
236    }
237}
238
239impl<Io: AsyncBufRead + AsyncWrite> RawXmlStream<Io> {
240    fn new_writer(
241        stream_ns: &'static str,
242    ) -> rxml::writer::Encoder<rxml::writer::SimpleNamespaces> {
243        let mut writer = rxml::writer::Encoder::new();
244        writer
245            .ns_tracker_mut()
246            .declare_fixed(Some(xml_ncname!("stream")), XML_STREAM_NS.into());
247        writer
248            .ns_tracker_mut()
249            .declare_fixed(None, stream_ns.into());
250        writer
251    }
252
253    pub(super) fn new(io: Io, stream_ns: &'static str, timeouts: Timeouts) -> Self {
254        let parser = rxml::Parser::default();
255        let mut io = CaptureBufRead::wrap(io);
256        if log_enabled() {
257            io.enable_capture();
258        }
259        Self {
260            parser: rxml::AsyncReader::wrap(io, parser),
261            writer: Self::new_writer(stream_ns),
262            lang_stack: XmlLangStack::new(),
263            timeouts: TimeoutState::new(timeouts),
264            tx_buffer_logged: 0,
265            stream_ns,
266            tx_buffer: BytesMut::new(),
267
268            // This basically means: "if we already have 2 kiB in our send
269            // buffer, do not accept more data".
270            // Please see the extensive words at
271            //`Self::tx_buffer_high_water_mark` for details.
272            tx_buffer_high_water_mark: 2048,
273        }
274    }
275
276    pub(super) fn reset_state(self: Pin<&mut Self>) {
277        let this = self.project();
278        *this.parser.parser_pinned() = rxml::Parser::default();
279        *this.lang_stack = XmlLangStack::new();
280        *this.writer = Self::new_writer(this.stream_ns);
281    }
282
283    pub(super) fn into_inner(self) -> Io {
284        self.parser.into_inner().0.into_inner()
285    }
286
287    /// Box the underlying transport stream.
288    ///
289    /// This removes the specific type of the transport from the XML stream's
290    /// type signature.
291    pub(super) fn box_stream(self) -> RawXmlStream<Box<dyn AsyncReadAndWrite + Send + 'static>>
292    where
293        Io: AsyncReadAndWrite + Send + 'static,
294    {
295        let (io, p) = self.parser.into_inner();
296        let mut io = CaptureBufRead::wrap(Box::new(io) as Box<_>);
297        if log_enabled() {
298            io.enable_capture();
299        }
300        let parser = rxml::AsyncReader::wrap(io, p);
301        RawXmlStream {
302            parser,
303            lang_stack: XmlLangStack::new(),
304            timeouts: self.timeouts,
305            writer: self.writer,
306            tx_buffer: self.tx_buffer,
307            tx_buffer_logged: self.tx_buffer_logged,
308            tx_buffer_high_water_mark: self.tx_buffer_high_water_mark,
309            stream_ns: self.stream_ns,
310        }
311    }
312}
313
314impl<Io: AsyncWrite> RawXmlStream<Io> {
315    /// Start sending an entire XSO.
316    ///
317    /// Unlike the `Sink` implementation, this provides nice syntax
318    /// highlighting for the serialised data in log outputs (if enabled) *and*
319    /// is error safe: if the XSO fails to serialise completely, it will be as
320    /// if it hadn't been attempted to serialise it at all.
321    ///
322    /// Note that, like with `start_send`, the caller is responsible for
323    /// ensuring that the stream is ready by polling
324    /// [`<Self as Sink>::poll_ready`] as needed.
325    pub(super) fn start_send_xso<T: AsXml>(self: Pin<&mut Self>, xso: &T) -> io::Result<()> {
326        let mut this = self.project();
327        let prev_len = this.tx_buffer.len();
328        match this.try_send_xso(xso) {
329            Ok(()) => Ok(()),
330            Err(e) => {
331                let curr_len = this.tx_buffer.len();
332                this.tx_buffer.truncate(prev_len);
333                log::trace!(
334                    "SEND failed: {}. Rewinding buffer by {} bytes.",
335                    e,
336                    curr_len - prev_len
337                );
338                Err(e)
339            }
340        }
341    }
342}
343
344impl<Io> RawXmlStream<Io> {
345    fn parser_pinned(self: Pin<&mut Self>) -> &mut rxml::Parser {
346        self.project().parser.parser_pinned()
347    }
348
349    fn stream_pinned(self: Pin<&mut Self>) -> Pin<&mut CaptureBufRead<Io>> {
350        self.project().parser.inner_pinned()
351    }
352
353    pub(super) fn get_stream(&self) -> &Io {
354        self.parser.inner().inner()
355    }
356}
357
358impl<Io: AsyncBufRead> Stream for RawXmlStream<Io> {
359    type Item = Result<rxml::Event, RawError>;
360
361    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
362        let mut this = self.project();
363        loop {
364            match this.parser.as_mut().poll_read(cx) {
365                Poll::Pending => (),
366                Poll::Ready(v) => {
367                    this.timeouts.reset();
368                    match v.transpose() {
369                        // Skip the XML declaration, nobody wants to hear about that.
370                        Some(Ok(rxml::Event::XmlDeclaration(_, _))) => continue,
371                        Some(Ok(event)) => {
372                            this.lang_stack.handle_event(&event);
373                            return Poll::Ready(Some(Ok(event)));
374                        }
375                        other => return Poll::Ready(other.map(|x| x.map_err(RawError::Io))),
376                    }
377                }
378            };
379
380            // poll_read returned pending... what do the timeouts have to say?
381            match ready!(this.timeouts.poll(cx)) {
382                TimeoutLevel::Soft => return Poll::Ready(Some(Err(RawError::SoftTimeout))),
383                TimeoutLevel::Warn => (),
384                TimeoutLevel::Hard => {
385                    return Poll::Ready(Some(Err(RawError::Io(io::Error::new(
386                        io::ErrorKind::TimedOut,
387                        "read and response timeouts elapsed",
388                    )))))
389                }
390            }
391        }
392    }
393}
394
395impl<Io: AsyncWrite> RawXmlStreamProj<'_, Io> {
396    fn flush_tx_log(&mut self) {
397        let range = &self.tx_buffer[*self.tx_buffer_logged..];
398        if range.is_empty() {
399            return;
400        }
401        log_send(range);
402        *self.tx_buffer_logged = self.tx_buffer.len();
403    }
404
405    fn start_send(&mut self, item: &xso::Item<'_>) -> io::Result<()> {
406        self.writer
407            .encode_into_bytes(item.as_rxml_item(), self.tx_buffer)
408            .map_err(|e| io::Error::new(io::ErrorKind::InvalidInput, e))
409    }
410
411    fn try_send_xso<T: AsXml>(&mut self, xso: &T) -> io::Result<()> {
412        let iter = match xso.as_xml_iter() {
413            Ok(v) => v,
414            Err(e) => return Err(io::Error::new(io::ErrorKind::InvalidInput, e)),
415        };
416        for item in iter {
417            let item = match item {
418                Ok(v) => v,
419                Err(e) => return Err(io::Error::new(io::ErrorKind::InvalidInput, e)),
420            };
421            self.start_send(&item)?;
422        }
423        Ok(())
424    }
425
426    fn progress_write(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
427        self.flush_tx_log();
428        while !self.tx_buffer.is_empty() {
429            let written = match ready!(self
430                .parser
431                .as_mut()
432                .inner_pinned()
433                .poll_write(cx, self.tx_buffer))
434            {
435                Ok(v) => v,
436                Err(e) => return Poll::Ready(Err(e)),
437            };
438            self.tx_buffer.advance(written);
439            *self.tx_buffer_logged = self
440                .tx_buffer_logged
441                .checked_sub(written)
442                .expect("Buffer arithmetic error");
443        }
444        Poll::Ready(Ok(()))
445    }
446}
447
448impl<Io: AsyncWrite> RawXmlStream<Io> {
449    /// Flush all buffered data and shut down the sender side of the
450    /// underlying transport.
451    ///
452    /// Unlike `poll_close` (from the `Sink` impls), this will not close the
453    /// receiving side of the underlying the transport. It is advisable to call
454    /// `poll_close` eventually after `poll_shutdown` in order to gracefully
455    /// handle situations where the remote side does not close the stream
456    /// cleanly.
457    pub fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
458        ready!(self.as_mut().poll_flush(cx))?;
459        let this = self.project();
460        this.parser.inner_pinned().poll_shutdown(cx)
461    }
462}
463
464impl<'x, Io: AsyncWrite> Sink<xso::Item<'x>> for RawXmlStream<Io> {
465    type Error = io::Error;
466
467    fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
468        let mut this = self.project();
469        match this.progress_write(cx) {
470            // No progress on write, but if we have enough space in the buffer
471            // it's ok nonetheless.
472            Poll::Pending => (),
473            // Some progress and it went fine, move on.
474            Poll::Ready(Ok(())) => (),
475            // Something went wrong -> return the error.
476            Poll::Ready(Err(e)) => return Poll::Ready(Err(e)),
477        }
478        if this.tx_buffer.len() < *this.tx_buffer_high_water_mark {
479            Poll::Ready(Ok(()))
480        } else {
481            Poll::Pending
482        }
483    }
484
485    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
486        let mut this = self.project();
487        ready!(this.progress_write(cx))?;
488        this.parser.as_mut().inner_pinned().poll_flush(cx)
489    }
490
491    fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
492        let mut this = self.project();
493        ready!(this.progress_write(cx))?;
494        this.parser.as_mut().inner_pinned().poll_shutdown(cx)
495    }
496
497    fn start_send(self: Pin<&mut Self>, item: xso::Item<'x>) -> Result<(), Self::Error> {
498        let mut this = self.project();
499        this.start_send(&item)
500    }
501}
502
503/// Error returned by the [`ReadXso`] future and the [`ReadXsoState`] helper.
504pub(super) enum ReadXsoError {
505    /// The outer element was closed before a child element could be read.
506    ///
507    /// This is typically the stream footer in XML stream applications.
508    Footer,
509
510    /// A hard error occurred.
511    ///
512    /// This is either a real I/O error or an error from the XML parser.
513    /// Neither are recoverable, because the nesting state is lost and
514    /// in addition, XML errors are not recoverable because they indicate a
515    /// not well-formed document.
516    Hard(io::Error),
517
518    /// The underlying stream signalled a soft read timeout before a child
519    /// element could be read.
520    ///
521    /// Note that soft timeouts which are triggered in the middle of receiving
522    /// an element are converted to hard timeouts (i.e. I/O errors).
523    ///
524    /// This masking is intentional, because:
525    /// - Returning a [`Self::SoftTimeout`] from the middle of parsing is not
526    ///   possible without complicating the API.
527    /// - There is no reason why the remote side should interrupt sending data
528    ///   in the middle of an element except if it or the transport has failed
529    ///   fatally.
530    SoftTimeout,
531
532    /// A parse error occurred.
533    ///
534    /// The XML structure was well-formed, but the data contained did not
535    /// match the XSO which was attempted to be parsed. This error is
536    /// recoverable: when this error is emitted, the XML stream is at the same
537    /// nesting level as it was before the XSO was attempted to be read; all
538    /// XML structure which belonged to the XSO which failed to parse has
539    /// been consumed. This allows to read more XSOs even if one fails to
540    /// parse.
541    Parse(xso::error::Error),
542}
543
544impl From<io::Error> for ReadXsoError {
545    fn from(other: io::Error) -> Self {
546        Self::Hard(other)
547    }
548}
549
550impl From<xso::error::Error> for ReadXsoError {
551    fn from(other: xso::error::Error) -> Self {
552        Self::Parse(other)
553    }
554}
555
556/// State for reading an XSO from a `Stream<Item = Result<rxml::Event, ...>>`.
557///
558/// Due to pinning, it is simpler to implement the statemachine in a dedicated
559/// enum and let the actual (pinned) future pass the stream toward this enum's
560/// function.
561///
562/// This is used by both [`ReadXso`] and the [`super::XmlStream`] itself.
563#[derive(Default)]
564pub(super) enum ReadXsoState<T: FromXml> {
565    /// The [`rxml::Event::StartElement`] event was not seen yet.
566    ///
567    /// In this state, XML whitespace is ignored (as per RFC 6120 § 11.7), but
568    /// other text data is rejected.
569    #[default]
570    PreData,
571
572    /// The [`rxml::Event::StartElement`] event was received.
573    ///
574    /// The inner value is the builder for the "return type" of this enum and
575    /// the implementation in the [`xso`] crate does all the heavy lifting:
576    /// we'll only send events in its general direction.
577    // We use the fallible parsing here so that we don't have to do the depth
578    // accounting ourselves.
579    Parsing(<Result<T, xso::error::Error> as FromXml>::Builder),
580
581    /// The parsing has completed (successful or not).
582    ///
583    /// This is a final state and attempting to advance the state will panic.
584    /// This is in accordance with [`core::future::Future::poll`]'s contract,
585    /// for which this enum is primarily used.
586    Done,
587}
588
589impl<T: FromXml> ReadXsoState<T> {
590    /// Progress reading the XSO from the given source.
591    ///
592    /// This attempts to parse a single XSO from the underlying stream,
593    /// while discarding any XML whitespace before the beginning of the XSO.
594    ///
595    /// If the XSO is parsed successfully, the method returns Ready with the
596    /// parsed value. If parsing fails or an I/O error occurs, an appropriate
597    /// error is returned.
598    ///
599    /// If parsing fails, the entire XML subtree belonging to the XSO is
600    /// nonetheless processed. That makes parse errors recoverable: After
601    /// `poll_advance` has returned Ready with either  an Ok result or a
602    /// [`ReadXsoError::Parse`] error variant, another XSO can be read and the
603    /// XML parsing will be at the same nesting depth as it was before the
604    /// first call to `poll_advance`.
605    ///
606    /// Note that this guarantee does not hold for non-parse errors (i.e. for
607    /// the other variants of [`ReadXsoError`]): I/O errors as well as
608    /// occurrence of the outer closing element are fatal.
609    ///
610    /// The `source` passed to `poll_advance` should be the same on every
611    /// call.
612    pub(super) fn poll_advance<Io: AsyncBufRead>(
613        &mut self,
614        mut source: Pin<&mut RawXmlStream<Io>>,
615        cx: &mut Context<'_>,
616    ) -> Poll<Result<T, ReadXsoError>> {
617        loop {
618            // Disable text buffering before the start event. That way, we
619            // don't accumulate infinite amounts of XML whitespace caused by
620            // whitespace keepalives.
621            // (And also, we'll know faster when the remote side sends
622            // non-whitespace garbage.)
623            let text_buffering = !matches!(self, ReadXsoState::PreData);
624            source
625                .as_mut()
626                .parser_pinned()
627                .set_text_buffering(text_buffering);
628
629            let ev = ready!(source.as_mut().poll_next(cx)).transpose();
630            match self {
631                ReadXsoState::PreData => {
632                    log::trace!("ReadXsoState::PreData ev = {:?}", ev);
633                    match ev {
634                        Ok(Some(rxml::Event::XmlDeclaration(_, _))) => (),
635                        Ok(Some(rxml::Event::Text(_, data))) => {
636                            if xso::is_xml_whitespace(data.as_bytes()) {
637                                log::trace!("Received {} bytes of whitespace", data.len());
638                                source.as_mut().stream_pinned().discard_capture();
639                                continue;
640                            } else {
641                                *self = ReadXsoState::Done;
642                                return Poll::Ready(Err(io::Error::new(
643                                    io::ErrorKind::InvalidData,
644                                    "non-whitespace text content before XSO",
645                                )
646                                .into()));
647                            }
648                        }
649                        Ok(Some(rxml::Event::StartElement(_, name, attrs))) => {
650                            let source_tmp = source.as_mut();
651                            let ctx = xso::Context::empty()
652                                .with_language(source_tmp.lang_stack.current());
653                            *self = ReadXsoState::Parsing(
654                                <Result<T, xso::error::Error> as FromXml>::from_events(
655                                    name, attrs, &ctx,
656                                )
657                                .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?,
658                            );
659                        }
660                        // Amounts to EOF, as we expect to start on the stream level.
661                        Ok(Some(rxml::Event::EndElement(_))) => {
662                            *self = ReadXsoState::Done;
663                            return Poll::Ready(Err(ReadXsoError::Footer));
664                        }
665                        Ok(None) => {
666                            *self = ReadXsoState::Done;
667                            return Poll::Ready(Err(io::Error::new(
668                                io::ErrorKind::InvalidData,
669                                "eof before XSO started",
670                            )
671                            .into()));
672                        }
673                        Err(RawError::SoftTimeout) => {
674                            *self = ReadXsoState::Done;
675                            return Poll::Ready(Err(ReadXsoError::SoftTimeout));
676                        }
677                        Err(RawError::Io(e)) => {
678                            *self = ReadXsoState::Done;
679                            return Poll::Ready(Err(ReadXsoError::Hard(e)));
680                        }
681                    }
682                }
683                ReadXsoState::Parsing(builder) => {
684                    log::trace!("ReadXsoState::Parsing ev = {:?}", ev);
685                    let ev = match ev {
686                        Ok(Some(ev)) => ev,
687                        Ok(None) => {
688                            *self = ReadXsoState::Done;
689                            return Poll::Ready(Err(io::Error::new(
690                                io::ErrorKind::UnexpectedEof,
691                                "eof during XSO parsing",
692                            )
693                            .into()));
694                        }
695                        Err(RawError::Io(e)) => {
696                            *self = ReadXsoState::Done;
697                            return Poll::Ready(Err(e.into()));
698                        }
699                        Err(RawError::SoftTimeout) => {
700                            // See also [`ReadXsoError::SoftTimeout`] for why
701                            // we mask the SoftTimeout condition here.
702                            *self = ReadXsoState::Done;
703                            return Poll::Ready(Err(io::Error::new(
704                                io::ErrorKind::TimedOut,
705                                "read timeout during XSO parsing",
706                            )
707                            .into()));
708                        }
709                    };
710
711                    let source_tmp = source.as_mut();
712                    let ctx = xso::Context::empty().with_language(source_tmp.lang_stack.current());
713                    match builder.feed(ev, &ctx) {
714                        Err(err) => {
715                            *self = ReadXsoState::Done;
716                            source.as_mut().stream_pinned().discard_capture();
717                            return Poll::Ready(Err(io::Error::new(
718                                io::ErrorKind::InvalidData,
719                                err,
720                            )
721                            .into()));
722                        }
723                        Ok(Some(Err(err))) => {
724                            *self = ReadXsoState::Done;
725                            log_recv(Some(&err), source.as_mut().stream_pinned().take_capture());
726                            return Poll::Ready(Err(ReadXsoError::Parse(err)));
727                        }
728                        Ok(Some(Ok(value))) => {
729                            *self = ReadXsoState::Done;
730                            log_recv(None, source.as_mut().stream_pinned().take_capture());
731                            return Poll::Ready(Ok(value));
732                        }
733                        Ok(None) => (),
734                    }
735                }
736
737                // The error talks about "future", simply because that is
738                // where `Self` is used (inside `core::future::Future::poll`).
739                ReadXsoState::Done => panic!("future polled after completion"),
740            }
741        }
742    }
743}
744
745/// Future to read a single XSO from a stream.
746pub(super) struct ReadXso<'x, Io, T: FromXml> {
747    /// Stream to read the future from.
748    inner: Pin<&'x mut RawXmlStream<Io>>,
749
750    /// Current state of parsing.
751    state: ReadXsoState<T>,
752}
753
754impl<'x, Io: AsyncBufRead, T: FromXml> ReadXso<'x, Io, T> {
755    /// Start reading a single XSO from a stream.
756    pub(super) fn read_from(stream: Pin<&'x mut RawXmlStream<Io>>) -> Self {
757        Self {
758            inner: stream,
759            state: ReadXsoState::PreData,
760        }
761    }
762}
763
764impl<Io: AsyncBufRead, T: FromXml> Future for ReadXso<'_, Io, T>
765where
766    T::Builder: Unpin,
767{
768    type Output = Result<T, ReadXsoError>;
769
770    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
771        let this = self.get_mut();
772        this.state.poll_advance(this.inner.as_mut(), cx)
773    }
774}
775
776/// Contains metadata from an XML stream header
777#[derive(Default, Debug)]
778pub struct StreamHeader<'x> {
779    /// The optional `from` attribute.
780    pub from: Option<Cow<'x, str>>,
781
782    /// The optional `to` attribute.
783    pub to: Option<Cow<'x, str>>,
784
785    /// The optional `id` attribute.
786    pub id: Option<Cow<'x, str>>,
787}
788
789impl StreamHeader<'_> {
790    /// Take the contents and return them as new object.
791    ///
792    /// `self` will be left with all its parts set to `None`.
793    pub fn take(&mut self) -> Self {
794        Self {
795            from: self.from.take(),
796            to: self.to.take(),
797            id: self.id.take(),
798        }
799    }
800
801    pub(super) async fn send<Io: AsyncWrite>(
802        self,
803        mut stream: Pin<&mut RawXmlStream<Io>>,
804    ) -> io::Result<()> {
805        stream
806            .send(Item::XmlDeclaration(rxml::XmlVersion::V1_0))
807            .await?;
808        stream
809            .send(Item::ElementHeadStart(
810                Namespace::from(XML_STREAM_NS),
811                Cow::Borrowed(xml_ncname!("stream")),
812            ))
813            .await?;
814        if let Some(from) = self.from {
815            stream
816                .send(Item::Attribute(
817                    Namespace::NONE,
818                    Cow::Borrowed(xml_ncname!("from")),
819                    from,
820                ))
821                .await?;
822        }
823        if let Some(to) = self.to {
824            stream
825                .send(Item::Attribute(
826                    Namespace::NONE,
827                    Cow::Borrowed(xml_ncname!("to")),
828                    to,
829                ))
830                .await?;
831        }
832        if let Some(id) = self.id {
833            stream
834                .send(Item::Attribute(
835                    Namespace::NONE,
836                    Cow::Borrowed(xml_ncname!("id")),
837                    id,
838                ))
839                .await?;
840        }
841        stream
842            .send(Item::Attribute(
843                Namespace::NONE,
844                Cow::Borrowed(xml_ncname!("version")),
845                Cow::Borrowed("1.0"),
846            ))
847            .await?;
848        stream.send(Item::ElementHeadEnd).await?;
849        Ok(())
850    }
851}
852
853impl StreamHeader<'static> {
854    pub(super) async fn recv<Io: AsyncBufRead>(
855        mut stream: Pin<&mut RawXmlStream<Io>>,
856    ) -> io::Result<Self> {
857        loop {
858            match stream.as_mut().next().await {
859                Some(Err(RawError::Io(e))) => return Err(e),
860                Some(Err(RawError::SoftTimeout)) => (),
861                Some(Ok(Event::StartElement(_, (ns, name), mut attrs))) => {
862                    if ns != XML_STREAM_NS || name != "stream" {
863                        return Err(io::Error::new(
864                            io::ErrorKind::InvalidData,
865                            "unknown stream header",
866                        ));
867                    }
868
869                    match attrs.remove(Namespace::none(), "version") {
870                        Some(v) => {
871                            if v != "1.0" {
872                                return Err(io::Error::new(
873                                    io::ErrorKind::InvalidData,
874                                    format!("unsuppored stream version: {}", v),
875                                ));
876                            }
877                        }
878                        None => {
879                            return Err(io::Error::new(
880                                io::ErrorKind::InvalidData,
881                                "required `version` attribute missing",
882                            ))
883                        }
884                    }
885
886                    let from = attrs.remove(Namespace::none(), "from");
887                    let to = attrs.remove(Namespace::none(), "to");
888                    let id = attrs.remove(Namespace::none(), "id");
889                    let _ = attrs.remove(Namespace::xml(), "lang");
890
891                    if let Some(((ns, name), _)) = attrs.into_iter().next() {
892                        return Err(io::Error::new(
893                            io::ErrorKind::InvalidData,
894                            format!("unexpected stream header attribute: {{{}}}{}", ns, name),
895                        ));
896                    }
897
898                    return Ok(StreamHeader {
899                        from: from.map(Cow::Owned),
900                        to: to.map(Cow::Owned),
901                        id: id.map(Cow::Owned),
902                    });
903                }
904                Some(Ok(Event::Text(_, _))) | Some(Ok(Event::EndElement(_))) => {
905                    return Err(io::Error::new(
906                        io::ErrorKind::UnexpectedEof,
907                        "unexpected content before stream header",
908                    ))
909                }
910                // We cannot loop infinitely here because the XML parser will
911                // prevent more than one XML declaration from being parsed.
912                Some(Ok(Event::XmlDeclaration(_, _))) => (),
913                None => {
914                    return Err(io::Error::new(
915                        io::ErrorKind::UnexpectedEof,
916                        "eof before stream header",
917                    ))
918                }
919            }
920        }
921    }
922}