tokio_xmpp/xmlstream/
common.rs

1// Copyright (c) 2024 Jonas Schäfer <jonas@zombofant.net>
2//
3// This Source Code Form is subject to the terms of the Mozilla Public
4// License, v. 2.0. If a copy of the MPL was not distributed with this
5// file, You can obtain one at http://mozilla.org/MPL/2.0/.
6
7use alloc::borrow::Cow;
8use core::{
9    future::Future,
10    pin::Pin,
11    task::{Context, Poll},
12    time::Duration,
13};
14use std::io;
15
16use futures::{ready, Sink, SinkExt, Stream, StreamExt};
17
18use bytes::{Buf, BytesMut};
19
20use tokio::{
21    io::{AsyncBufRead, AsyncWrite},
22    time::Instant,
23};
24
25use xso::{
26    exports::rxml::{self, writer::TrackNamespace, xml_ncname, Event, Namespace},
27    AsXml, FromEventsBuilder, FromXml, Item,
28};
29
30use crate::connect::AsyncReadAndWrite;
31
32use super::capture::{log_enabled, log_recv, log_send, CaptureBufRead};
33
34use xmpp_parsers::ns::STREAM as XML_STREAM_NS;
35
36/// Configuration for timeouts on an XML stream.
37///
38/// The defaults are tuned toward common desktop/laptop use and may not hold
39/// up to extreme conditions (arctic satellite link, mobile internet on a
40/// train in Brandenburg, Germany, and similar) and may be inefficient in
41/// other conditions (stable server link, localhost communication).
42#[derive(Debug, Clone, Copy)]
43pub struct Timeouts {
44    /// Maximum silence time before a
45    /// [`ReadError::SoftTimeout`][`super::ReadError::SoftTimeout`] is
46    /// returned.
47    ///
48    /// Soft timeouts are not fatal, but they must be handled by user code so
49    /// that more data is read after at most [`Self::response_timeout`],
50    /// starting from the moment the soft timeout is returned.
51    pub read_timeout: Duration,
52
53    /// Maximum silence after a soft timeout.
54    ///
55    /// If the stream is silent for longer than this time after a soft timeout
56    /// has been emitted, a hard [`TimedOut`][`io::ErrorKind::TimedOut`]
57    /// I/O error is returned and the stream is to be considered dead.
58    pub response_timeout: Duration,
59}
60
61impl Default for Timeouts {
62    fn default() -> Self {
63        Self {
64            read_timeout: Duration::new(300, 0),
65            response_timeout: Duration::new(300, 0),
66        }
67    }
68}
69
70impl Timeouts {
71    /// Tight timeouts suitable for communicating on a fast LAN or localhost.
72    pub fn tight() -> Self {
73        Self {
74            read_timeout: Duration::new(60, 0),
75            response_timeout: Duration::new(15, 0),
76        }
77    }
78
79    fn data_to_soft(&self) -> Duration {
80        self.read_timeout
81    }
82
83    fn soft_to_warn(&self) -> Duration {
84        self.response_timeout / 2
85    }
86
87    fn warn_to_hard(&self) -> Duration {
88        self.response_timeout / 2
89    }
90}
91
92#[derive(Clone, Copy)]
93enum TimeoutLevel {
94    Soft,
95    Warn,
96    Hard,
97}
98
99#[derive(Debug)]
100pub(super) enum RawError {
101    Io(io::Error),
102    SoftTimeout,
103}
104
105impl From<io::Error> for RawError {
106    fn from(other: io::Error) -> Self {
107        Self::Io(other)
108    }
109}
110
111struct TimeoutState {
112    /// Configuration for the timeouts.
113    timeouts: Timeouts,
114
115    /// Level of the next timeout which will trip.
116    level: TimeoutLevel,
117
118    /// Sleep timer used for read timeouts.
119    // NOTE: even though we pretend we could deal with an !Unpin
120    // RawXmlStream, we really can't: box_stream for example needs it,
121    // but also all the typestate around the initial stream setup needs
122    // to be able to move the stream around.
123    deadline: Pin<Box<tokio::time::Sleep>>,
124}
125
126impl TimeoutState {
127    fn new(timeouts: Timeouts) -> Self {
128        Self {
129            deadline: Box::pin(tokio::time::sleep(timeouts.data_to_soft())),
130            level: TimeoutLevel::Soft,
131            timeouts,
132        }
133    }
134
135    fn poll(&mut self, cx: &mut Context) -> Poll<TimeoutLevel> {
136        ready!(self.deadline.as_mut().poll(cx));
137        // Deadline elapsed!
138        let to_return = self.level;
139        let (next_level, next_duration) = match self.level {
140            TimeoutLevel::Soft => (TimeoutLevel::Warn, self.timeouts.soft_to_warn()),
141            TimeoutLevel::Warn => (TimeoutLevel::Hard, self.timeouts.warn_to_hard()),
142            // Something short-ish so that we fire this over and over until
143            // someone finally kills the stream for good.
144            TimeoutLevel::Hard => (TimeoutLevel::Hard, Duration::new(1, 0)),
145        };
146        self.level = next_level;
147        self.deadline.as_mut().reset(Instant::now() + next_duration);
148        Poll::Ready(to_return)
149    }
150
151    fn reset(&mut self) {
152        self.level = TimeoutLevel::Soft;
153        self.deadline
154            .as_mut()
155            .reset(Instant::now() + self.timeouts.data_to_soft());
156    }
157}
158
159pin_project_lite::pin_project! {
160    // NOTE: due to limitations of pin_project_lite, the field comments are
161    // no doc comments. Luckily, this struct is only `pub(super)` anyway.
162    #[project = RawXmlStreamProj]
163    pub(super) struct RawXmlStream<Io> {
164        // The parser used for deserialising data.
165        #[pin]
166        parser: rxml::AsyncReader<CaptureBufRead<Io>>,
167
168        // The writer used for serialising data.
169        writer: rxml::writer::Encoder<rxml::writer::SimpleNamespaces>,
170
171        timeouts: TimeoutState,
172
173        // The default namespace to declare on the stream header.
174        stream_ns: &'static str,
175
176        // Buffer containing serialised data which will then be sent through
177        // the inner `Io`. Sending that serialised data happens in
178        // `poll_ready` and `poll_flush`, while appending serialised data
179        // happens in `start_send`.
180        tx_buffer: BytesMut,
181
182        // Position inside tx_buffer up to which to-be-sent data has already
183        // been logged.
184        tx_buffer_logged: usize,
185
186        // This signifies the limit at the point of which the Sink will
187        // refuse to accept more data: if the `tx_buffer`'s size grows beyond
188        // that high water mark, poll_ready will return Poll::Pending until
189        // it has managed to flush enough data down the inner writer.
190        //
191        // Note that poll_ready will always attempt to progress the writes,
192        // which further reduces the chance of hitting this limit unless
193        // either the underlying writer gets stuck (e.g. TCP connection
194        // breaking in a timeouty way) or a lot of data is written in bulk.
195        // In both cases, the backpressure created by poll_ready returning
196        // Pending is desirable.
197        //
198        // However, there is a catch: We don't assert this condition
199        // in `start_send` at all. The reason is that we cannot suspend
200        // serialisation of an XSO in the middle of writing it: it has to be
201        // written in one batch or you have to start over later (this has to
202        // do with the iterator state borrowing the data and futures getting
203        // cancelled e.g. in tokio::select!). In order to facilitate
204        // implementing a `Sink<T: AsXml>` on top of `RawXmlStream`, we
205        // cannot be strict about what is going on in `start_send`:
206        // `poll_ready` does not know what kind of data will be written (so
207        // it could not make a size estimate, even if that was at all
208        // possible with AsXml) and `start_send` is not a coroutine. So if
209        // `Sink<T: AsXml>` wants to use `RawXmlStream`, it must be able to
210        // submit an entire XSO's items in one batch to `RawXmlStream` after
211        // it has reported to be ready once. That may easily make the buffer
212        // reach its high water mark.
213        //
214        // So if we checked that condition in `start_send` (as opposed to
215        // `poll_ready`), we would cause situations where submitting XSOs
216        // failed randomly (with a panic or other errors) and would have to
217        // be retried later.
218        //
219        // While failing with e.g. io::ErrorKind::WouldBlock is something
220        // that could be investigated later, it would still require being
221        // able to make an accurate estimate of the number of bytes needed to
222        // serialise any given `AsXml`, because as pointed out earlier, once
223        // we have started, there is no going back.
224        //
225        // Finally, none of that hurts much because `RawXmlStream` is only an
226        // internal API. The high-level APIs will always call `poll_ready`
227        // before sending an XSO, which means that we won't *grossly* go over
228        // the TX buffer high water mark---unless you send a really large
229        // XSO at once.
230        tx_buffer_high_water_mark: usize,
231    }
232}
233
234impl<Io: AsyncBufRead + AsyncWrite> RawXmlStream<Io> {
235    fn new_writer(
236        stream_ns: &'static str,
237    ) -> rxml::writer::Encoder<rxml::writer::SimpleNamespaces> {
238        let mut writer = rxml::writer::Encoder::new();
239        writer
240            .ns_tracker_mut()
241            .declare_fixed(Some(xml_ncname!("stream")), XML_STREAM_NS.into());
242        writer
243            .ns_tracker_mut()
244            .declare_fixed(None, stream_ns.into());
245        writer
246    }
247
248    pub(super) fn new(io: Io, stream_ns: &'static str, timeouts: Timeouts) -> Self {
249        let parser = rxml::Parser::default();
250        let mut io = CaptureBufRead::wrap(io);
251        if log_enabled() {
252            io.enable_capture();
253        }
254        Self {
255            parser: rxml::AsyncReader::wrap(io, parser),
256            writer: Self::new_writer(stream_ns),
257            timeouts: TimeoutState::new(timeouts),
258            tx_buffer_logged: 0,
259            stream_ns,
260            tx_buffer: BytesMut::new(),
261
262            // This basically means: "if we already have 2 kiB in our send
263            // buffer, do not accept more data".
264            // Please see the extensive words at
265            //`Self::tx_buffer_high_water_mark` for details.
266            tx_buffer_high_water_mark: 2048,
267        }
268    }
269
270    pub(super) fn reset_state(self: Pin<&mut Self>) {
271        let this = self.project();
272        *this.parser.parser_pinned() = rxml::Parser::default();
273        *this.writer = Self::new_writer(this.stream_ns);
274    }
275
276    pub(super) fn into_inner(self) -> Io {
277        self.parser.into_inner().0.into_inner()
278    }
279
280    /// Box the underlying transport stream.
281    ///
282    /// This removes the specific type of the transport from the XML stream's
283    /// type signature.
284    pub(super) fn box_stream(self) -> RawXmlStream<Box<dyn AsyncReadAndWrite + Send + 'static>>
285    where
286        Io: AsyncReadAndWrite + Send + 'static,
287    {
288        let (io, p) = self.parser.into_inner();
289        let mut io = CaptureBufRead::wrap(Box::new(io) as Box<_>);
290        if log_enabled() {
291            io.enable_capture();
292        }
293        let parser = rxml::AsyncReader::wrap(io, p);
294        RawXmlStream {
295            parser,
296            timeouts: self.timeouts,
297            writer: self.writer,
298            tx_buffer: self.tx_buffer,
299            tx_buffer_logged: self.tx_buffer_logged,
300            tx_buffer_high_water_mark: self.tx_buffer_high_water_mark,
301            stream_ns: self.stream_ns,
302        }
303    }
304}
305
306impl<Io: AsyncWrite> RawXmlStream<Io> {
307    /// Start sending an entire XSO.
308    ///
309    /// Unlike the `Sink` implementation, this provides nice syntax
310    /// highlighting for the serialised data in log outputs (if enabled) *and*
311    /// is error safe: if the XSO fails to serialise completely, it will be as
312    /// if it hadn't been attempted to serialise it at all.
313    ///
314    /// Note that, like with `start_send`, the caller is responsible for
315    /// ensuring that the stream is ready by polling
316    /// [`<Self as Sink>::poll_ready`] as needed.
317    pub(super) fn start_send_xso<T: AsXml>(self: Pin<&mut Self>, xso: &T) -> io::Result<()> {
318        let mut this = self.project();
319        let prev_len = this.tx_buffer.len();
320        match this.try_send_xso(xso) {
321            Ok(()) => Ok(()),
322            Err(e) => {
323                let curr_len = this.tx_buffer.len();
324                this.tx_buffer.truncate(prev_len);
325                log::trace!(
326                    "SEND failed: {}. Rewinding buffer by {} bytes.",
327                    e,
328                    curr_len - prev_len
329                );
330                Err(e)
331            }
332        }
333    }
334}
335
336impl<Io> RawXmlStream<Io> {
337    fn parser_pinned(self: Pin<&mut Self>) -> &mut rxml::Parser {
338        self.project().parser.parser_pinned()
339    }
340
341    fn stream_pinned(self: Pin<&mut Self>) -> Pin<&mut CaptureBufRead<Io>> {
342        self.project().parser.inner_pinned()
343    }
344
345    pub(super) fn get_stream(&self) -> &Io {
346        self.parser.inner().inner()
347    }
348}
349
350impl<Io: AsyncBufRead> Stream for RawXmlStream<Io> {
351    type Item = Result<rxml::Event, RawError>;
352
353    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
354        let mut this = self.project();
355        loop {
356            match this.parser.as_mut().poll_read(cx) {
357                Poll::Pending => (),
358                Poll::Ready(v) => {
359                    this.timeouts.reset();
360                    match v.transpose() {
361                        // Skip the XML declaration, nobody wants to hear about that.
362                        Some(Ok(rxml::Event::XmlDeclaration(_, _))) => continue,
363                        other => return Poll::Ready(other.map(|x| x.map_err(RawError::Io))),
364                    }
365                }
366            };
367
368            // poll_read returned pending... what do the timeouts have to say?
369            match ready!(this.timeouts.poll(cx)) {
370                TimeoutLevel::Soft => return Poll::Ready(Some(Err(RawError::SoftTimeout))),
371                TimeoutLevel::Warn => (),
372                TimeoutLevel::Hard => {
373                    return Poll::Ready(Some(Err(RawError::Io(io::Error::new(
374                        io::ErrorKind::TimedOut,
375                        "read and response timeouts elapsed",
376                    )))))
377                }
378            }
379        }
380    }
381}
382
383impl<Io: AsyncWrite> RawXmlStreamProj<'_, Io> {
384    fn flush_tx_log(&mut self) {
385        let range = &self.tx_buffer[*self.tx_buffer_logged..];
386        if range.is_empty() {
387            return;
388        }
389        log_send(range);
390        *self.tx_buffer_logged = self.tx_buffer.len();
391    }
392
393    fn start_send(&mut self, item: &xso::Item<'_>) -> io::Result<()> {
394        self.writer
395            .encode_into_bytes(item.as_rxml_item(), self.tx_buffer)
396            .map_err(|e| io::Error::new(io::ErrorKind::InvalidInput, e))
397    }
398
399    fn try_send_xso<T: AsXml>(&mut self, xso: &T) -> io::Result<()> {
400        let iter = match xso.as_xml_iter() {
401            Ok(v) => v,
402            Err(e) => return Err(io::Error::new(io::ErrorKind::InvalidInput, e)),
403        };
404        for item in iter {
405            let item = match item {
406                Ok(v) => v,
407                Err(e) => return Err(io::Error::new(io::ErrorKind::InvalidInput, e)),
408            };
409            self.start_send(&item)?;
410        }
411        Ok(())
412    }
413
414    fn progress_write(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
415        self.flush_tx_log();
416        while !self.tx_buffer.is_empty() {
417            let written = match ready!(self
418                .parser
419                .as_mut()
420                .inner_pinned()
421                .poll_write(cx, self.tx_buffer))
422            {
423                Ok(v) => v,
424                Err(e) => return Poll::Ready(Err(e)),
425            };
426            self.tx_buffer.advance(written);
427            *self.tx_buffer_logged = self
428                .tx_buffer_logged
429                .checked_sub(written)
430                .expect("Buffer arithmetic error");
431        }
432        Poll::Ready(Ok(()))
433    }
434}
435
436impl<Io: AsyncWrite> RawXmlStream<Io> {
437    /// Flush all buffered data and shut down the sender side of the
438    /// underlying transport.
439    ///
440    /// Unlike `poll_close` (from the `Sink` impls), this will not close the
441    /// receiving side of the underlying the transport. It is advisable to call
442    /// `poll_close` eventually after `poll_shutdown` in order to gracefully
443    /// handle situations where the remote side does not close the stream
444    /// cleanly.
445    pub fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
446        ready!(self.as_mut().poll_flush(cx))?;
447        let this = self.project();
448        this.parser.inner_pinned().poll_shutdown(cx)
449    }
450}
451
452impl<'x, Io: AsyncWrite> Sink<xso::Item<'x>> for RawXmlStream<Io> {
453    type Error = io::Error;
454
455    fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
456        let mut this = self.project();
457        match this.progress_write(cx) {
458            // No progress on write, but if we have enough space in the buffer
459            // it's ok nonetheless.
460            Poll::Pending => (),
461            // Some progress and it went fine, move on.
462            Poll::Ready(Ok(())) => (),
463            // Something went wrong -> return the error.
464            Poll::Ready(Err(e)) => return Poll::Ready(Err(e)),
465        }
466        if this.tx_buffer.len() < *this.tx_buffer_high_water_mark {
467            Poll::Ready(Ok(()))
468        } else {
469            Poll::Pending
470        }
471    }
472
473    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
474        let mut this = self.project();
475        ready!(this.progress_write(cx))?;
476        this.parser.as_mut().inner_pinned().poll_flush(cx)
477    }
478
479    fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
480        let mut this = self.project();
481        ready!(this.progress_write(cx))?;
482        this.parser.as_mut().inner_pinned().poll_shutdown(cx)
483    }
484
485    fn start_send(self: Pin<&mut Self>, item: xso::Item<'x>) -> Result<(), Self::Error> {
486        let mut this = self.project();
487        this.start_send(&item)
488    }
489}
490
491/// Error returned by the [`ReadXso`] future and the [`ReadXsoState`] helper.
492pub(super) enum ReadXsoError {
493    /// The outer element was closed before a child element could be read.
494    ///
495    /// This is typically the stream footer in XML stream applications.
496    Footer,
497
498    /// A hard error occurred.
499    ///
500    /// This is either a real I/O error or an error from the XML parser.
501    /// Neither are recoverable, because the nesting state is lost and
502    /// in addition, XML errors are not recoverable because they indicate a
503    /// not well-formed document.
504    Hard(io::Error),
505
506    /// The underlying stream signalled a soft read timeout before a child
507    /// element could be read.
508    ///
509    /// Note that soft timeouts which are triggered in the middle of receiving
510    /// an element are converted to hard timeouts (i.e. I/O errors).
511    ///
512    /// This masking is intentional, because:
513    /// - Returning a [`Self::SoftTimeout`] from the middle of parsing is not
514    ///   possible without complicating the API.
515    /// - There is no reason why the remote side should interrupt sending data
516    ///   in the middle of an element except if it or the transport has failed
517    ///   fatally.
518    SoftTimeout,
519
520    /// A parse error occurred.
521    ///
522    /// The XML structure was well-formed, but the data contained did not
523    /// match the XSO which was attempted to be parsed. This error is
524    /// recoverable: when this error is emitted, the XML stream is at the same
525    /// nesting level as it was before the XSO was attempted to be read; all
526    /// XML structure which belonged to the XSO which failed to parse has
527    /// been consumed. This allows to read more XSOs even if one fails to
528    /// parse.
529    Parse(xso::error::Error),
530}
531
532impl From<io::Error> for ReadXsoError {
533    fn from(other: io::Error) -> Self {
534        Self::Hard(other)
535    }
536}
537
538impl From<xso::error::Error> for ReadXsoError {
539    fn from(other: xso::error::Error) -> Self {
540        Self::Parse(other)
541    }
542}
543
544/// State for reading an XSO from a `Stream<Item = Result<rxml::Event, ...>>`.
545///
546/// Due to pinning, it is simpler to implement the statemachine in a dedicated
547/// enum and let the actual (pinned) future pass the stream toward this enum's
548/// function.
549///
550/// This is used by both [`ReadXso`] and the [`super::XmlStream`] itself.
551#[derive(Default)]
552pub(super) enum ReadXsoState<T: FromXml> {
553    /// The [`rxml::Event::StartElement`] event was not seen yet.
554    ///
555    /// In this state, XML whitespace is ignored (as per RFC 6120 § 11.7), but
556    /// other text data is rejected.
557    #[default]
558    PreData,
559
560    /// The [`rxml::Event::StartElement`] event was received.
561    ///
562    /// The inner value is the builder for the "return type" of this enum and
563    /// the implementation in the [`xso`] crate does all the heavy lifting:
564    /// we'll only send events in its general direction.
565    // We use the fallible parsing here so that we don't have to do the depth
566    // accounting ourselves.
567    Parsing(<Result<T, xso::error::Error> as FromXml>::Builder),
568
569    /// The parsing has completed (successful or not).
570    ///
571    /// This is a final state and attempting to advance the state will panic.
572    /// This is in accordance with [`core::future::Future::poll`]'s contract,
573    /// for which this enum is primarily used.
574    Done,
575}
576
577impl<T: FromXml> ReadXsoState<T> {
578    /// Progress reading the XSO from the given source.
579    ///
580    /// This attempts to parse a single XSO from the underlying stream,
581    /// while discarding any XML whitespace before the beginning of the XSO.
582    ///
583    /// If the XSO is parsed successfully, the method returns Ready with the
584    /// parsed value. If parsing fails or an I/O error occurs, an appropriate
585    /// error is returned.
586    ///
587    /// If parsing fails, the entire XML subtree belonging to the XSO is
588    /// nonetheless processed. That makes parse errors recoverable: After
589    /// `poll_advance` has returned Ready with either  an Ok result or a
590    /// [`ReadXsoError::Parse`] error variant, another XSO can be read and the
591    /// XML parsing will be at the same nesting depth as it was before the
592    /// first call to `poll_advance`.
593    ///
594    /// Note that this guarantee does not hold for non-parse errors (i.e. for
595    /// the other variants of [`ReadXsoError`]): I/O errors as well as
596    /// occurrence of the outer closing element are fatal.
597    ///
598    /// The `source` passed to `poll_advance` should be the same on every
599    /// call.
600    pub(super) fn poll_advance<Io: AsyncBufRead>(
601        &mut self,
602        mut source: Pin<&mut RawXmlStream<Io>>,
603        cx: &mut Context<'_>,
604    ) -> Poll<Result<T, ReadXsoError>> {
605        loop {
606            // Disable text buffering before the start event. That way, we
607            // don't accumulate infinite amounts of XML whitespace caused by
608            // whitespace keepalives.
609            // (And also, we'll know faster when the remote side sends
610            // non-whitespace garbage.)
611            let text_buffering = !matches!(self, ReadXsoState::PreData);
612            source
613                .as_mut()
614                .parser_pinned()
615                .set_text_buffering(text_buffering);
616
617            let ev = ready!(source.as_mut().poll_next(cx)).transpose();
618            match self {
619                ReadXsoState::PreData => {
620                    log::trace!("ReadXsoState::PreData ev = {:?}", ev);
621                    match ev {
622                        Ok(Some(rxml::Event::XmlDeclaration(_, _))) => (),
623                        Ok(Some(rxml::Event::Text(_, data))) => {
624                            if xso::is_xml_whitespace(data.as_bytes()) {
625                                log::trace!("Received {} bytes of whitespace", data.len());
626                                source.as_mut().stream_pinned().discard_capture();
627                                continue;
628                            } else {
629                                *self = ReadXsoState::Done;
630                                return Poll::Ready(Err(io::Error::new(
631                                    io::ErrorKind::InvalidData,
632                                    "non-whitespace text content before XSO",
633                                )
634                                .into()));
635                            }
636                        }
637                        Ok(Some(rxml::Event::StartElement(_, name, attrs))) => {
638                            *self = ReadXsoState::Parsing(
639                                <Result<T, xso::error::Error> as FromXml>::from_events(name, attrs)
640                                    .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?,
641                            );
642                        }
643                        // Amounts to EOF, as we expect to start on the stream level.
644                        Ok(Some(rxml::Event::EndElement(_))) => {
645                            *self = ReadXsoState::Done;
646                            return Poll::Ready(Err(ReadXsoError::Footer));
647                        }
648                        Ok(None) => {
649                            *self = ReadXsoState::Done;
650                            return Poll::Ready(Err(io::Error::new(
651                                io::ErrorKind::InvalidData,
652                                "eof before XSO started",
653                            )
654                            .into()));
655                        }
656                        Err(RawError::SoftTimeout) => {
657                            *self = ReadXsoState::Done;
658                            return Poll::Ready(Err(ReadXsoError::SoftTimeout));
659                        }
660                        Err(RawError::Io(e)) => {
661                            *self = ReadXsoState::Done;
662                            return Poll::Ready(Err(ReadXsoError::Hard(e)));
663                        }
664                    }
665                }
666                ReadXsoState::Parsing(builder) => {
667                    log::trace!("ReadXsoState::Parsing ev = {:?}", ev);
668                    let ev = match ev {
669                        Ok(Some(ev)) => ev,
670                        Ok(None) => {
671                            *self = ReadXsoState::Done;
672                            return Poll::Ready(Err(io::Error::new(
673                                io::ErrorKind::UnexpectedEof,
674                                "eof during XSO parsing",
675                            )
676                            .into()));
677                        }
678                        Err(RawError::Io(e)) => {
679                            *self = ReadXsoState::Done;
680                            return Poll::Ready(Err(e.into()));
681                        }
682                        Err(RawError::SoftTimeout) => {
683                            // See also [`ReadXsoError::SoftTimeout`] for why
684                            // we mask the SoftTimeout condition here.
685                            *self = ReadXsoState::Done;
686                            return Poll::Ready(Err(io::Error::new(
687                                io::ErrorKind::TimedOut,
688                                "read timeout during XSO parsing",
689                            )
690                            .into()));
691                        }
692                    };
693
694                    match builder.feed(ev) {
695                        Err(err) => {
696                            *self = ReadXsoState::Done;
697                            source.as_mut().stream_pinned().discard_capture();
698                            return Poll::Ready(Err(io::Error::new(
699                                io::ErrorKind::InvalidData,
700                                err,
701                            )
702                            .into()));
703                        }
704                        Ok(Some(Err(err))) => {
705                            *self = ReadXsoState::Done;
706                            log_recv(Some(&err), source.as_mut().stream_pinned().take_capture());
707                            return Poll::Ready(Err(ReadXsoError::Parse(err)));
708                        }
709                        Ok(Some(Ok(value))) => {
710                            *self = ReadXsoState::Done;
711                            log_recv(None, source.as_mut().stream_pinned().take_capture());
712                            return Poll::Ready(Ok(value));
713                        }
714                        Ok(None) => (),
715                    }
716                }
717
718                // The error talks about "future", simply because that is
719                // where `Self` is used (inside `core::future::Future::poll`).
720                ReadXsoState::Done => panic!("future polled after completion"),
721            }
722        }
723    }
724}
725
726/// Future to read a single XSO from a stream.
727pub(super) struct ReadXso<'x, Io, T: FromXml> {
728    /// Stream to read the future from.
729    inner: Pin<&'x mut RawXmlStream<Io>>,
730
731    /// Current state of parsing.
732    state: ReadXsoState<T>,
733}
734
735impl<'x, Io: AsyncBufRead, T: FromXml> ReadXso<'x, Io, T> {
736    /// Start reading a single XSO from a stream.
737    pub(super) fn read_from(stream: Pin<&'x mut RawXmlStream<Io>>) -> Self {
738        Self {
739            inner: stream,
740            state: ReadXsoState::PreData,
741        }
742    }
743}
744
745impl<Io: AsyncBufRead, T: FromXml> Future for ReadXso<'_, Io, T>
746where
747    T::Builder: Unpin,
748{
749    type Output = Result<T, ReadXsoError>;
750
751    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
752        let this = self.get_mut();
753        this.state.poll_advance(this.inner.as_mut(), cx)
754    }
755}
756
757/// Contains metadata from an XML stream header
758#[derive(Default, Debug)]
759pub struct StreamHeader<'x> {
760    /// The optional `from` attribute.
761    pub from: Option<Cow<'x, str>>,
762
763    /// The optional `to` attribute.
764    pub to: Option<Cow<'x, str>>,
765
766    /// The optional `id` attribute.
767    pub id: Option<Cow<'x, str>>,
768}
769
770impl StreamHeader<'_> {
771    /// Take the contents and return them as new object.
772    ///
773    /// `self` will be left with all its parts set to `None`.
774    pub fn take(&mut self) -> Self {
775        Self {
776            from: self.from.take(),
777            to: self.to.take(),
778            id: self.id.take(),
779        }
780    }
781
782    pub(super) async fn send<Io: AsyncWrite>(
783        self,
784        mut stream: Pin<&mut RawXmlStream<Io>>,
785    ) -> io::Result<()> {
786        stream
787            .send(Item::XmlDeclaration(rxml::XmlVersion::V1_0))
788            .await?;
789        stream
790            .send(Item::ElementHeadStart(
791                Namespace::from(XML_STREAM_NS),
792                Cow::Borrowed(xml_ncname!("stream")),
793            ))
794            .await?;
795        if let Some(from) = self.from {
796            stream
797                .send(Item::Attribute(
798                    Namespace::NONE,
799                    Cow::Borrowed(xml_ncname!("from")),
800                    from,
801                ))
802                .await?;
803        }
804        if let Some(to) = self.to {
805            stream
806                .send(Item::Attribute(
807                    Namespace::NONE,
808                    Cow::Borrowed(xml_ncname!("to")),
809                    to,
810                ))
811                .await?;
812        }
813        if let Some(id) = self.id {
814            stream
815                .send(Item::Attribute(
816                    Namespace::NONE,
817                    Cow::Borrowed(xml_ncname!("id")),
818                    id,
819                ))
820                .await?;
821        }
822        stream
823            .send(Item::Attribute(
824                Namespace::NONE,
825                Cow::Borrowed(xml_ncname!("version")),
826                Cow::Borrowed("1.0"),
827            ))
828            .await?;
829        stream.send(Item::ElementHeadEnd).await?;
830        Ok(())
831    }
832}
833
834impl StreamHeader<'static> {
835    pub(super) async fn recv<Io: AsyncBufRead>(
836        mut stream: Pin<&mut RawXmlStream<Io>>,
837    ) -> io::Result<Self> {
838        loop {
839            match stream.as_mut().next().await {
840                Some(Err(RawError::Io(e))) => return Err(e),
841                Some(Err(RawError::SoftTimeout)) => (),
842                Some(Ok(Event::StartElement(_, (ns, name), mut attrs))) => {
843                    if ns != XML_STREAM_NS || name != "stream" {
844                        return Err(io::Error::new(
845                            io::ErrorKind::InvalidData,
846                            "unknown stream header",
847                        ));
848                    }
849
850                    match attrs.remove(Namespace::none(), "version") {
851                        Some(v) => {
852                            if v != "1.0" {
853                                return Err(io::Error::new(
854                                    io::ErrorKind::InvalidData,
855                                    format!("unsuppored stream version: {}", v),
856                                ));
857                            }
858                        }
859                        None => {
860                            return Err(io::Error::new(
861                                io::ErrorKind::InvalidData,
862                                "required `version` attribute missing",
863                            ))
864                        }
865                    }
866
867                    let from = attrs.remove(Namespace::none(), "from");
868                    let to = attrs.remove(Namespace::none(), "to");
869                    let id = attrs.remove(Namespace::none(), "id");
870                    let _ = attrs.remove(Namespace::xml(), "lang");
871
872                    if let Some(((ns, name), _)) = attrs.into_iter().next() {
873                        return Err(io::Error::new(
874                            io::ErrorKind::InvalidData,
875                            format!("unexpected stream header attribute: {{{}}}{}", ns, name),
876                        ));
877                    }
878
879                    return Ok(StreamHeader {
880                        from: from.map(Cow::Owned),
881                        to: to.map(Cow::Owned),
882                        id: id.map(Cow::Owned),
883                    });
884                }
885                Some(Ok(Event::Text(_, _))) | Some(Ok(Event::EndElement(_))) => {
886                    return Err(io::Error::new(
887                        io::ErrorKind::UnexpectedEof,
888                        "unexpected content before stream header",
889                    ))
890                }
891                // We cannot loop infinitely here because the XML parser will
892                // prevent more than one XML declaration from being parsed.
893                Some(Ok(Event::XmlDeclaration(_, _))) => (),
894                None => {
895                    return Err(io::Error::new(
896                        io::ErrorKind::UnexpectedEof,
897                        "eof before stream header",
898                    ))
899                }
900            }
901        }
902    }
903}