tokio_xmpp/xmlstream/
common.rs

1// Copyright (c) 2024 Jonas Schäfer <jonas@zombofant.net>
2//
3// This Source Code Form is subject to the terms of the Mozilla Public
4// License, v. 2.0. If a copy of the MPL was not distributed with this
5// file, You can obtain one at http://mozilla.org/MPL/2.0/.
6
7use alloc::borrow::Cow;
8use core::{
9    future::Future,
10    pin::Pin,
11    task::{Context, Poll},
12    time::Duration,
13};
14use std::io;
15
16use futures::{ready, Sink, SinkExt, Stream, StreamExt};
17
18use bytes::{Buf, BytesMut};
19
20use tokio::{
21    io::{AsyncBufRead, AsyncWrite},
22    time::Instant,
23};
24
25use xso::{
26    exports::rxml::{self, writer::TrackNamespace, xml_ncname, Event, Namespace},
27    AsXml, FromEventsBuilder, FromXml, Item,
28};
29
30use crate::connect::AsyncReadAndWrite;
31
32use super::capture::{log_enabled, log_recv, log_send, CaptureBufRead};
33
34use xmpp_parsers::ns::STREAM as XML_STREAM_NS;
35
36/// Configuration for timeouts on an XML stream.
37///
38/// The defaults are tuned toward common desktop/laptop use and may not hold
39/// up to extreme conditions (arctic satellite link, mobile internet on a
40/// train in Brandenburg, Germany, and similar) and may be inefficient in
41/// other conditions (stable server link, localhost communication).
42#[derive(Debug, Clone, Copy)]
43pub struct Timeouts {
44    /// Maximum silence time before a
45    /// [`ReadError::SoftTimeout`][`super::ReadError::SoftTimeout`] is
46    /// returned.
47    ///
48    /// Soft timeouts are not fatal, but they must be handled by user code so
49    /// that more data is read after at most [`Self::response_timeout`],
50    /// starting from the moment the soft timeout is returned.
51    pub read_timeout: Duration,
52
53    /// Maximum silence after a soft timeout.
54    ///
55    /// If the stream is silent for longer than this time after a soft timeout
56    /// has been emitted, a hard [`TimedOut`][`io::ErrorKind::TimedOut`]
57    /// I/O error is returned and the stream is to be considered dead.
58    pub response_timeout: Duration,
59}
60
61impl Default for Timeouts {
62    fn default() -> Self {
63        Self {
64            read_timeout: Duration::new(300, 0),
65            response_timeout: Duration::new(300, 0),
66        }
67    }
68}
69
70impl Timeouts {
71    /// Tight timeouts suitable for communicating on a fast LAN or localhost.
72    pub fn tight() -> Self {
73        Self {
74            read_timeout: Duration::new(60, 0),
75            response_timeout: Duration::new(15, 0),
76        }
77    }
78
79    fn data_to_soft(&self) -> Duration {
80        self.read_timeout
81    }
82
83    fn soft_to_warn(&self) -> Duration {
84        self.response_timeout / 2
85    }
86
87    fn warn_to_hard(&self) -> Duration {
88        self.response_timeout / 2
89    }
90}
91
92#[derive(Clone, Copy)]
93enum TimeoutLevel {
94    Soft,
95    Warn,
96    Hard,
97}
98
99#[derive(Debug)]
100pub(super) enum RawError {
101    Io(io::Error),
102    SoftTimeout,
103}
104
105impl From<io::Error> for RawError {
106    fn from(other: io::Error) -> Self {
107        Self::Io(other)
108    }
109}
110
111struct TimeoutState {
112    /// Configuration for the timeouts.
113    timeouts: Timeouts,
114
115    /// Level of the next timeout which will trip.
116    level: TimeoutLevel,
117
118    /// Sleep timer used for read timeouts.
119    // NOTE: even though we pretend we could deal with an !Unpin
120    // RawXmlStream, we really can't: box_stream for example needs it,
121    // but also all the typestate around the initial stream setup needs
122    // to be able to move the stream around.
123    deadline: Pin<Box<tokio::time::Sleep>>,
124}
125
126impl TimeoutState {
127    fn new(timeouts: Timeouts) -> Self {
128        Self {
129            deadline: Box::pin(tokio::time::sleep(timeouts.data_to_soft())),
130            level: TimeoutLevel::Soft,
131            timeouts,
132        }
133    }
134
135    fn poll(&mut self, cx: &mut Context) -> Poll<TimeoutLevel> {
136        ready!(self.deadline.as_mut().poll(cx));
137        // Deadline elapsed!
138        let to_return = self.level;
139        let (next_level, next_duration) = match self.level {
140            TimeoutLevel::Soft => (TimeoutLevel::Warn, self.timeouts.soft_to_warn()),
141            TimeoutLevel::Warn => (TimeoutLevel::Hard, self.timeouts.warn_to_hard()),
142            // Something short-ish so that we fire this over and over until
143            // someone finally kills the stream for good.
144            TimeoutLevel::Hard => (TimeoutLevel::Hard, Duration::new(1, 0)),
145        };
146        self.level = next_level;
147        self.deadline.as_mut().reset(Instant::now() + next_duration);
148        Poll::Ready(to_return)
149    }
150
151    fn reset(&mut self) {
152        self.level = TimeoutLevel::Soft;
153        self.deadline
154            .as_mut()
155            .reset((Instant::now() + self.timeouts.data_to_soft()).into());
156    }
157}
158
159pin_project_lite::pin_project! {
160    // NOTE: due to limitations of pin_project_lite, the field comments are
161    // no doc comments. Luckily, this struct is only `pub(super)` anyway.
162    #[project = RawXmlStreamProj]
163    pub(super) struct RawXmlStream<Io> {
164        // The parser used for deserialising data.
165        #[pin]
166        parser: rxml::AsyncReader<CaptureBufRead<Io>>,
167
168        // The writer used for serialising data.
169        writer: rxml::writer::Encoder<rxml::writer::SimpleNamespaces>,
170
171        timeouts: TimeoutState,
172
173        // The default namespace to declare on the stream header.
174        stream_ns: &'static str,
175
176        // Buffer containing serialised data which will then be sent through
177        // the inner `Io`. Sending that serialised data happens in
178        // `poll_ready` and `poll_flush`, while appending serialised data
179        // happens in `start_send`.
180        tx_buffer: BytesMut,
181
182        // Position inside tx_buffer up to which to-be-sent data has already
183        // been logged.
184        tx_buffer_logged: usize,
185
186        // This signifies the limit at the point of which the Sink will
187        // refuse to accept more data: if the `tx_buffer`'s size grows beyond
188        // that high water mark, poll_ready will return Poll::Pending until
189        // it has managed to flush enough data down the inner writer.
190        //
191        // Note that poll_ready will always attempt to progress the writes,
192        // which further reduces the chance of hitting this limit unless
193        // either the underlying writer gets stuck (e.g. TCP connection
194        // breaking in a timeouty way) or a lot of data is written in bulk.
195        // In both cases, the backpressure created by poll_ready returning
196        // Pending is desirable.
197        //
198        // However, there is a catch: We don't assert this condition
199        // in `start_send` at all. The reason is that we cannot suspend
200        // serialisation of an XSO in the middle of writing it: it has to be
201        // written in one batch or you have to start over later (this has to
202        // do with the iterator state borrowing the data and futures getting
203        // cancelled e.g. in tokio::select!). In order to facilitate
204        // implementing a `Sink<T: AsXml>` on top of `RawXmlStream`, we
205        // cannot be strict about what is going on in `start_send`:
206        // `poll_ready` does not know what kind of data will be written (so
207        // it could not make a size estimate, even if that was at all
208        // possible with AsXml) and `start_send` is not a coroutine. So if
209        // `Sink<T: AsXml>` wants to use `RawXmlStream`, it must be able to
210        // submit an entire XSO's items in one batch to `RawXmlStream` after
211        // it has reported to be ready once. That may easily make the buffer
212        // reach its high water mark.
213        //
214        // So if we checked that condition in `start_send` (as opposed to
215        // `poll_ready`), we would cause situations where submitting XSOs
216        // failed randomly (with a panic or other errors) and would have to
217        // be retried later.
218        //
219        // While failing with e.g. io::ErrorKind::WouldBlock is something
220        // that could be investigated later, it would still require being
221        // able to make an accurate estimate of the number of bytes needed to
222        // serialise any given `AsXml`, because as pointed out earlier, once
223        // we have started, there is no going back.
224        //
225        // Finally, none of that hurts much because `RawXmlStream` is only an
226        // internal API. The high-level APIs will always call `poll_ready`
227        // before sending an XSO, which means that we won't *grossly* go over
228        // the TX buffer high water mark---unless you send a really large
229        // XSO at once.
230        tx_buffer_high_water_mark: usize,
231    }
232}
233
234impl<Io: AsyncBufRead + AsyncWrite> RawXmlStream<Io> {
235    fn new_writer(
236        stream_ns: &'static str,
237    ) -> rxml::writer::Encoder<rxml::writer::SimpleNamespaces> {
238        let mut writer = rxml::writer::Encoder::new();
239        writer
240            .ns_tracker_mut()
241            .declare_fixed(Some(xml_ncname!("stream")), XML_STREAM_NS.into());
242        writer
243            .ns_tracker_mut()
244            .declare_fixed(None, stream_ns.into());
245        writer
246    }
247
248    pub(super) fn new(io: Io, stream_ns: &'static str, timeouts: Timeouts) -> Self {
249        let parser = rxml::Parser::default();
250        let mut io = CaptureBufRead::wrap(io);
251        if log_enabled() {
252            io.enable_capture();
253        }
254        Self {
255            parser: rxml::AsyncReader::wrap(io, parser),
256            writer: Self::new_writer(stream_ns),
257            timeouts: TimeoutState::new(timeouts),
258            tx_buffer_logged: 0,
259            stream_ns,
260            tx_buffer: BytesMut::new(),
261
262            // This basically means: "if we already have 2 kiB in our send
263            // buffer, do not accept more data".
264            // Please see the extensive words at
265            //`Self::tx_buffer_high_water_mark` for details.
266            tx_buffer_high_water_mark: 2048,
267        }
268    }
269
270    pub(super) fn reset_state(self: Pin<&mut Self>) {
271        let this = self.project();
272        *this.parser.parser_pinned() = rxml::Parser::default();
273        *this.writer = Self::new_writer(this.stream_ns);
274    }
275
276    pub(super) fn into_inner(self) -> Io {
277        self.parser.into_inner().0.into_inner()
278    }
279
280    /// Box the underlying transport stream.
281    ///
282    /// This removes the specific type of the transport from the XML stream's
283    /// type signature.
284    pub(super) fn box_stream(self) -> RawXmlStream<Box<dyn AsyncReadAndWrite + Send + 'static>>
285    where
286        Io: AsyncReadAndWrite + Send + 'static,
287    {
288        let (io, p) = self.parser.into_inner();
289        let mut io = CaptureBufRead::wrap(Box::new(io) as Box<_>);
290        if log_enabled() {
291            io.enable_capture();
292        }
293        let parser = rxml::AsyncReader::wrap(io, p);
294        RawXmlStream {
295            parser,
296            timeouts: self.timeouts,
297            writer: self.writer,
298            tx_buffer: self.tx_buffer,
299            tx_buffer_logged: self.tx_buffer_logged,
300            tx_buffer_high_water_mark: self.tx_buffer_high_water_mark,
301            stream_ns: self.stream_ns,
302        }
303    }
304}
305
306impl<Io: AsyncWrite> RawXmlStream<Io> {
307    /// Start sending an entire XSO.
308    ///
309    /// Unlike the `Sink` implementation, this provides nice syntax
310    /// highlighting for the serialised data in log outputs (if enabled) *and*
311    /// is error safe: if the XSO fails to serialise completely, it will be as
312    /// if it hadn't been attempted to serialise it at all.
313    ///
314    /// Note that, like with `start_send`, the caller is responsible for
315    /// ensuring that the stream is ready by polling
316    /// [`<Self as Sink>::poll_ready`] as needed.
317    pub(super) fn start_send_xso<T: AsXml>(self: Pin<&mut Self>, xso: &T) -> io::Result<()> {
318        let mut this = self.project();
319        let prev_len = this.tx_buffer.len();
320        match this.try_send_xso(xso) {
321            Ok(()) => Ok(()),
322            Err(e) => {
323                let curr_len = this.tx_buffer.len();
324                this.tx_buffer.truncate(prev_len);
325                log::trace!(
326                    "SEND failed: {}. Rewinding buffer by {} bytes.",
327                    e,
328                    curr_len - prev_len
329                );
330                Err(e)
331            }
332        }
333    }
334}
335
336impl<Io> RawXmlStream<Io> {
337    fn parser_pinned(self: Pin<&mut Self>) -> &mut rxml::Parser {
338        self.project().parser.parser_pinned()
339    }
340
341    fn stream_pinned(self: Pin<&mut Self>) -> Pin<&mut CaptureBufRead<Io>> {
342        self.project().parser.inner_pinned()
343    }
344
345    pub(super) fn get_stream(&self) -> &Io {
346        self.parser.inner().inner()
347    }
348}
349
350impl<Io: AsyncBufRead> Stream for RawXmlStream<Io> {
351    type Item = Result<rxml::Event, RawError>;
352
353    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
354        let mut this = self.project();
355        loop {
356            match this.parser.as_mut().poll_read(cx) {
357                Poll::Pending => (),
358                Poll::Ready(v) => {
359                    this.timeouts.reset();
360                    match v.transpose() {
361                        // Skip the XML declaration, nobody wants to hear about that.
362                        Some(Ok(rxml::Event::XmlDeclaration(_, _))) => continue,
363                        other => return Poll::Ready(other.map(|x| x.map_err(RawError::Io))),
364                    }
365                }
366            };
367
368            // poll_read returned pending... what do the timeouts have to say?
369            match ready!(this.timeouts.poll(cx)) {
370                TimeoutLevel::Soft => return Poll::Ready(Some(Err(RawError::SoftTimeout))),
371                TimeoutLevel::Warn => (),
372                TimeoutLevel::Hard => {
373                    return Poll::Ready(Some(Err(RawError::Io(io::Error::new(
374                        io::ErrorKind::TimedOut,
375                        "read and response timeouts elapsed",
376                    )))))
377                }
378            }
379        }
380    }
381}
382
383impl<'x, Io: AsyncWrite> RawXmlStreamProj<'x, Io> {
384    fn flush_tx_log(&mut self) {
385        let range = &self.tx_buffer[*self.tx_buffer_logged..];
386        if range.len() == 0 {
387            return;
388        }
389        log_send(range);
390        *self.tx_buffer_logged = self.tx_buffer.len();
391    }
392
393    fn start_send(&mut self, item: &xso::Item<'_>) -> io::Result<()> {
394        self.writer
395            .encode_into_bytes(item.as_rxml_item(), self.tx_buffer)
396            .map_err(|e| io::Error::new(io::ErrorKind::InvalidInput, e))
397    }
398
399    fn try_send_xso<T: AsXml>(&mut self, xso: &T) -> io::Result<()> {
400        let iter = match xso.as_xml_iter() {
401            Ok(v) => v,
402            Err(e) => return Err(io::Error::new(io::ErrorKind::InvalidInput, e)),
403        };
404        for item in iter {
405            let item = match item {
406                Ok(v) => v,
407                Err(e) => return Err(io::Error::new(io::ErrorKind::InvalidInput, e)),
408            };
409            self.start_send(&item)?;
410        }
411        Ok(())
412    }
413
414    fn progress_write(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
415        self.flush_tx_log();
416        while self.tx_buffer.len() > 0 {
417            let written = match ready!(self
418                .parser
419                .as_mut()
420                .inner_pinned()
421                .poll_write(cx, &self.tx_buffer))
422            {
423                Ok(v) => v,
424                Err(e) => return Poll::Ready(Err(e)),
425            };
426            self.tx_buffer.advance(written);
427            *self.tx_buffer_logged = self
428                .tx_buffer_logged
429                .checked_sub(written)
430                .expect("Buffer arithmetic error");
431        }
432        Poll::Ready(Ok(()))
433    }
434}
435
436impl<Io: AsyncWrite> RawXmlStream<Io> {
437    /// Flush all buffered data and shut down the sender side of the
438    /// underlying transport.
439    ///
440    /// Unlike `poll_close` (from the `Sink` impls), this will not close the
441    /// receiving side of the underlying the transport. It is advisable to call
442    /// `poll_close` eventually after `poll_shutdown` in order to gracefully
443    /// handle situations where the remote side does not close the stream
444    /// cleanly.
445    pub fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
446        ready!(self.as_mut().poll_flush(cx))?;
447        let this = self.project();
448        this.parser.inner_pinned().poll_shutdown(cx)
449    }
450}
451
452impl<'x, Io: AsyncWrite> Sink<xso::Item<'x>> for RawXmlStream<Io> {
453    type Error = io::Error;
454
455    fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
456        let mut this = self.project();
457        match this.progress_write(cx) {
458            // No progress on write, but if we have enough space in the buffer
459            // it's ok nonetheless.
460            Poll::Pending => (),
461            // Some progress and it went fine, move on.
462            Poll::Ready(Ok(())) => (),
463            // Something went wrong -> return the error.
464            Poll::Ready(Err(e)) => return Poll::Ready(Err(e.into())),
465        }
466        if this.tx_buffer.len() < *this.tx_buffer_high_water_mark {
467            Poll::Ready(Ok(()))
468        } else {
469            Poll::Pending
470        }
471    }
472
473    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
474        let mut this = self.project();
475        ready!(this.progress_write(cx))?;
476        this.parser.as_mut().inner_pinned().poll_flush(cx)
477    }
478
479    fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
480        let mut this = self.project();
481        ready!(this.progress_write(cx))?;
482        this.parser.as_mut().inner_pinned().poll_shutdown(cx)
483    }
484
485    fn start_send(self: Pin<&mut Self>, item: xso::Item<'x>) -> Result<(), Self::Error> {
486        let mut this = self.project();
487        this.start_send(&item)
488    }
489}
490
491/// Error returned by the [`ReadXso`] future and the [`ReadXsoState`] helper.
492pub(super) enum ReadXsoError {
493    /// The outer element was closed before a child element could be read.
494    ///
495    /// This is typically the stream footer in XML stream applications.
496    Footer,
497
498    /// A hard error occurred.
499    ///
500    /// This is either a real I/O error or an error from the XML parser.
501    /// Neither are recoverable, because the nesting state is lost and
502    /// in addition, XML errors are not recoverable because they indicate a
503    /// not well-formed document.
504    Hard(io::Error),
505
506    /// The underlying stream signalled a soft read timeout before a child
507    /// element could be read.
508    ///
509    /// Note that soft timeouts which are triggered in the middle of receiving
510    /// an element are converted to hard timeouts (i.e. I/O errors).
511    ///
512    /// This masking is intentional, because:
513    /// - Returning a [`Self::SoftTimeout`] from the middle of parsing is not
514    ///   possible without complicating the API.
515    /// - There is no reason why the remote side should interrupt sending data
516    ///   in the middle of an element except if it or the transport has failed
517    ///   fatally.
518    SoftTimeout,
519
520    /// A parse error occurred.
521    ///
522    /// The XML structure was well-formed, but the data contained did not
523    /// match the XSO which was attempted to be parsed. This error is
524    /// recoverable: when this error is emitted, the XML stream is at the same
525    /// nesting level as it was before the XSO was attempted to be read; all
526    /// XML structure which belonged to the XSO which failed to parse has
527    /// been consumed. This allows to read more XSOs even if one fails to
528    /// parse.
529    Parse(xso::error::Error),
530}
531
532impl From<io::Error> for ReadXsoError {
533    fn from(other: io::Error) -> Self {
534        Self::Hard(other)
535    }
536}
537
538impl From<xso::error::Error> for ReadXsoError {
539    fn from(other: xso::error::Error) -> Self {
540        Self::Parse(other)
541    }
542}
543
544/// State for reading an XSO from a `Stream<Item = Result<rxml::Event, ...>>`.
545///
546/// Due to pinning, it is simpler to implement the statemachine in a dedicated
547/// enum and let the actual (pinned) future pass the stream toward this enum's
548/// function.
549///
550/// This is used by both [`ReadXso`] and the [`super::XmlStream`] itself.
551#[derive(Default)]
552pub(super) enum ReadXsoState<T: FromXml> {
553    /// The [`rxml::Event::StartElement`] event was not seen yet.
554    ///
555    /// In this state, XML whitespace is ignored (as per RFC 6120 § 11.7), but
556    /// other text data is rejected.
557    #[default]
558    PreData,
559
560    /// The [`rxml::Event::StartElement`] event was received.
561    ///
562    /// The inner value is the builder for the "return type" of this enum and
563    /// the implementation in the [`xso`] crate does all the heavy lifting:
564    /// we'll only send events in its general direction.
565    // We use the fallible parsing here so that we don't have to do the depth
566    // accounting ourselves.
567    Parsing(<Result<T, xso::error::Error> as FromXml>::Builder),
568
569    /// The parsing has completed (successful or not).
570    ///
571    /// This is a final state and attempting to advance the state will panic.
572    /// This is in accordance with [`core::future::Future::poll`]'s contract,
573    /// for which this enum is primarily used.
574    Done,
575}
576
577impl<T: FromXml> ReadXsoState<T> {
578    /// Progress reading the XSO from the given source.
579    ///
580    /// This attempts to parse a single XSO from the underlying stream,
581    /// while discarding any XML whitespace before the beginning of the XSO.
582    ///
583    /// If the XSO is parsed successfully, the method returns Ready with the
584    /// parsed value. If parsing fails or an I/O error occurs, an appropriate
585    /// error is returned.
586    ///
587    /// If parsing fails, the entire XML subtree belonging to the XSO is
588    /// nonetheless processed. That makes parse errors recoverable: After
589    /// `poll_advance` has returned Ready with either  an Ok result or a
590    /// [`ReadXsoError::Parse`] error variant, another XSO can be read and the
591    /// XML parsing will be at the same nesting depth as it was before the
592    /// first call to `poll_advance`.
593    ///
594    /// Note that this guarantee does not hold for non-parse errors (i.e. for
595    /// the other variants of [`ReadXsoError`]): I/O errors as well as
596    /// occurrence of the outer closing element are fatal.
597    ///
598    /// The `source` passed to `poll_advance` should be the same on every
599    /// call.
600    pub(super) fn poll_advance<Io: AsyncBufRead>(
601        &mut self,
602        mut source: Pin<&mut RawXmlStream<Io>>,
603        cx: &mut Context<'_>,
604    ) -> Poll<Result<T, ReadXsoError>> {
605        loop {
606            // Disable text buffering before the start event. That way, we
607            // don't accumulate infinite amounts of XML whitespace caused by
608            // whitespace keepalives.
609            // (And also, we'll know faster when the remote side sends
610            // non-whitespace garbage.)
611            let text_buffering = match self {
612                ReadXsoState::PreData => false,
613                _ => true,
614            };
615            source
616                .as_mut()
617                .parser_pinned()
618                .set_text_buffering(text_buffering);
619
620            let ev = ready!(source.as_mut().poll_next(cx)).transpose();
621            match self {
622                ReadXsoState::PreData => {
623                    log::trace!("ReadXsoState::PreData ev = {:?}", ev);
624                    match ev {
625                        Ok(Some(rxml::Event::XmlDeclaration(_, _))) => (),
626                        Ok(Some(rxml::Event::Text(_, data))) => {
627                            if xso::is_xml_whitespace(data.as_bytes()) {
628                                log::trace!("Received {} bytes of whitespace", data.len());
629                                source.as_mut().stream_pinned().discard_capture();
630                                continue;
631                            } else {
632                                *self = ReadXsoState::Done;
633                                return Poll::Ready(Err(io::Error::new(
634                                    io::ErrorKind::InvalidData,
635                                    "non-whitespace text content before XSO",
636                                )
637                                .into()));
638                            }
639                        }
640                        Ok(Some(rxml::Event::StartElement(_, name, attrs))) => {
641                            *self = ReadXsoState::Parsing(
642                                <Result<T, xso::error::Error> as FromXml>::from_events(name, attrs)
643                                    .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?,
644                            );
645                        }
646                        // Amounts to EOF, as we expect to start on the stream level.
647                        Ok(Some(rxml::Event::EndElement(_))) => {
648                            *self = ReadXsoState::Done;
649                            return Poll::Ready(Err(ReadXsoError::Footer));
650                        }
651                        Ok(None) => {
652                            *self = ReadXsoState::Done;
653                            return Poll::Ready(Err(io::Error::new(
654                                io::ErrorKind::InvalidData,
655                                "eof before XSO started",
656                            )
657                            .into()));
658                        }
659                        Err(RawError::SoftTimeout) => {
660                            *self = ReadXsoState::Done;
661                            return Poll::Ready(Err(ReadXsoError::SoftTimeout));
662                        }
663                        Err(RawError::Io(e)) => {
664                            *self = ReadXsoState::Done;
665                            return Poll::Ready(Err(ReadXsoError::Hard(e)));
666                        }
667                    }
668                }
669                ReadXsoState::Parsing(builder) => {
670                    log::trace!("ReadXsoState::Parsing ev = {:?}", ev);
671                    let ev = match ev {
672                        Ok(Some(ev)) => ev,
673                        Ok(None) => {
674                            *self = ReadXsoState::Done;
675                            return Poll::Ready(Err(io::Error::new(
676                                io::ErrorKind::UnexpectedEof,
677                                "eof during XSO parsing",
678                            )
679                            .into()));
680                        }
681                        Err(RawError::Io(e)) => {
682                            *self = ReadXsoState::Done;
683                            return Poll::Ready(Err(e.into()));
684                        }
685                        Err(RawError::SoftTimeout) => {
686                            // See also [`ReadXsoError::SoftTimeout`] for why
687                            // we mask the SoftTimeout condition here.
688                            *self = ReadXsoState::Done;
689                            return Poll::Ready(Err(io::Error::new(
690                                io::ErrorKind::TimedOut,
691                                "read timeout during XSO parsing",
692                            )
693                            .into()));
694                        }
695                    };
696
697                    match builder.feed(ev) {
698                        Err(err) => {
699                            *self = ReadXsoState::Done;
700                            source.as_mut().stream_pinned().discard_capture();
701                            return Poll::Ready(Err(io::Error::new(
702                                io::ErrorKind::InvalidData,
703                                err,
704                            )
705                            .into()));
706                        }
707                        Ok(Some(Err(err))) => {
708                            *self = ReadXsoState::Done;
709                            log_recv(Some(&err), source.as_mut().stream_pinned().take_capture());
710                            return Poll::Ready(Err(ReadXsoError::Parse(err)));
711                        }
712                        Ok(Some(Ok(value))) => {
713                            *self = ReadXsoState::Done;
714                            log_recv(None, source.as_mut().stream_pinned().take_capture());
715                            return Poll::Ready(Ok(value));
716                        }
717                        Ok(None) => (),
718                    }
719                }
720
721                // The error talks about "future", simply because that is
722                // where `Self` is used (inside `core::future::Future::poll`).
723                ReadXsoState::Done => panic!("future polled after completion"),
724            }
725        }
726    }
727}
728
729/// Future to read a single XSO from a stream.
730pub(super) struct ReadXso<'x, Io, T: FromXml> {
731    /// Stream to read the future from.
732    inner: Pin<&'x mut RawXmlStream<Io>>,
733
734    /// Current state of parsing.
735    state: ReadXsoState<T>,
736}
737
738impl<'x, Io: AsyncBufRead, T: FromXml> ReadXso<'x, Io, T> {
739    /// Start reading a single XSO from a stream.
740    pub(super) fn read_from(stream: Pin<&'x mut RawXmlStream<Io>>) -> Self {
741        Self {
742            inner: stream,
743            state: ReadXsoState::PreData,
744        }
745    }
746}
747
748impl<'x, Io: AsyncBufRead, T: FromXml> Future for ReadXso<'x, Io, T>
749where
750    T::Builder: Unpin,
751{
752    type Output = Result<T, ReadXsoError>;
753
754    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
755        let this = self.get_mut();
756        this.state.poll_advance(this.inner.as_mut(), cx)
757    }
758}
759
760/// Contains metadata from an XML stream header
761#[derive(Default, Debug)]
762pub struct StreamHeader<'x> {
763    /// The optional `from` attribute.
764    pub from: Option<Cow<'x, str>>,
765
766    /// The optional `to` attribute.
767    pub to: Option<Cow<'x, str>>,
768
769    /// The optional `id` attribute.
770    pub id: Option<Cow<'x, str>>,
771}
772
773impl<'x> StreamHeader<'x> {
774    /// Take the contents and return them as new object.
775    ///
776    /// `self` will be left with all its parts set to `None`.
777    pub fn take(&mut self) -> Self {
778        Self {
779            from: self.from.take(),
780            to: self.to.take(),
781            id: self.id.take(),
782        }
783    }
784
785    pub(super) async fn send<Io: AsyncWrite>(
786        self,
787        mut stream: Pin<&mut RawXmlStream<Io>>,
788    ) -> io::Result<()> {
789        stream
790            .send(Item::XmlDeclaration(rxml::XmlVersion::V1_0))
791            .await?;
792        stream
793            .send(Item::ElementHeadStart(
794                Namespace::from(XML_STREAM_NS),
795                Cow::Borrowed(xml_ncname!("stream")),
796            ))
797            .await?;
798        if let Some(from) = self.from {
799            stream
800                .send(Item::Attribute(
801                    Namespace::NONE,
802                    Cow::Borrowed(xml_ncname!("from")),
803                    from,
804                ))
805                .await?;
806        }
807        if let Some(to) = self.to {
808            stream
809                .send(Item::Attribute(
810                    Namespace::NONE,
811                    Cow::Borrowed(xml_ncname!("to")),
812                    to,
813                ))
814                .await?;
815        }
816        if let Some(id) = self.id {
817            stream
818                .send(Item::Attribute(
819                    Namespace::NONE,
820                    Cow::Borrowed(xml_ncname!("id")),
821                    id,
822                ))
823                .await?;
824        }
825        stream
826            .send(Item::Attribute(
827                Namespace::NONE,
828                Cow::Borrowed(xml_ncname!("version")),
829                Cow::Borrowed("1.0"),
830            ))
831            .await?;
832        stream.send(Item::ElementHeadEnd).await?;
833        Ok(())
834    }
835}
836
837impl StreamHeader<'static> {
838    pub(super) async fn recv<Io: AsyncBufRead>(
839        mut stream: Pin<&mut RawXmlStream<Io>>,
840    ) -> io::Result<Self> {
841        loop {
842            match stream.as_mut().next().await {
843                Some(Err(RawError::Io(e))) => return Err(e),
844                Some(Err(RawError::SoftTimeout)) => (),
845                Some(Ok(Event::StartElement(_, (ns, name), mut attrs))) => {
846                    if ns != XML_STREAM_NS || name != "stream" {
847                        return Err(io::Error::new(
848                            io::ErrorKind::InvalidData,
849                            "unknown stream header",
850                        ));
851                    }
852
853                    match attrs.remove(Namespace::none(), "version") {
854                        Some(v) => {
855                            if v != "1.0" {
856                                return Err(io::Error::new(
857                                    io::ErrorKind::InvalidData,
858                                    format!("unsuppored stream version: {}", v),
859                                ));
860                            }
861                        }
862                        None => {
863                            return Err(io::Error::new(
864                                io::ErrorKind::InvalidData,
865                                "required `version` attribute missing",
866                            ))
867                        }
868                    }
869
870                    let from = attrs.remove(Namespace::none(), "from");
871                    let to = attrs.remove(Namespace::none(), "to");
872                    let id = attrs.remove(Namespace::none(), "id");
873                    let _ = attrs.remove(Namespace::xml(), "lang");
874
875                    if let Some(((ns, name), _)) = attrs.into_iter().next() {
876                        return Err(io::Error::new(
877                            io::ErrorKind::InvalidData,
878                            format!("unexpected stream header attribute: {{{}}}{}", ns, name),
879                        ));
880                    }
881
882                    return Ok(StreamHeader {
883                        from: from.map(Cow::Owned),
884                        to: to.map(Cow::Owned),
885                        id: id.map(Cow::Owned),
886                    });
887                }
888                Some(Ok(Event::Text(_, _))) | Some(Ok(Event::EndElement(_))) => {
889                    return Err(io::Error::new(
890                        io::ErrorKind::UnexpectedEof,
891                        "unexpected content before stream header",
892                    ))
893                }
894                // We cannot loop infinitely here because the XML parser will
895                // prevent more than one XML declaration from being parsed.
896                Some(Ok(Event::XmlDeclaration(_, _))) => (),
897                None => {
898                    return Err(io::Error::new(
899                        io::ErrorKind::UnexpectedEof,
900                        "eof before stream header",
901                    ))
902                }
903            }
904        }
905    }
906}