tokio_xmpp/xmlstream/common.rs
1// Copyright (c) 2024 Jonas Schäfer <jonas@zombofant.net>
2//
3// This Source Code Form is subject to the terms of the Mozilla Public
4// License, v. 2.0. If a copy of the MPL was not distributed with this
5// file, You can obtain one at http://mozilla.org/MPL/2.0/.
6
7use alloc::borrow::Cow;
8use core::{
9 future::Future,
10 pin::Pin,
11 task::{Context, Poll},
12 time::Duration,
13};
14use std::io;
15
16use futures::{ready, Sink, SinkExt, Stream, StreamExt};
17
18use bytes::{Buf, BytesMut};
19
20use tokio::{
21 io::{AsyncBufRead, AsyncWrite},
22 time::Instant,
23};
24
25use xso::{
26 exports::rxml::{self, writer::TrackNamespace, xml_ncname, Event, Namespace},
27 AsXml, FromEventsBuilder, FromXml, Item,
28};
29
30use crate::connect::AsyncReadAndWrite;
31
32use super::capture::{log_enabled, log_recv, log_send, CaptureBufRead};
33
34use xmpp_parsers::ns::STREAM as XML_STREAM_NS;
35
36/// Configuration for timeouts on an XML stream.
37///
38/// The defaults are tuned toward common desktop/laptop use and may not hold
39/// up to extreme conditions (arctic satellite link, mobile internet on a
40/// train in Brandenburg, Germany, and similar) and may be inefficient in
41/// other conditions (stable server link, localhost communication).
42#[derive(Debug, Clone, Copy)]
43pub struct Timeouts {
44 /// Maximum silence time before a
45 /// [`ReadError::SoftTimeout`][`super::ReadError::SoftTimeout`] is
46 /// returned.
47 ///
48 /// Soft timeouts are not fatal, but they must be handled by user code so
49 /// that more data is read after at most [`Self::response_timeout`],
50 /// starting from the moment the soft timeout is returned.
51 pub read_timeout: Duration,
52
53 /// Maximum silence after a soft timeout.
54 ///
55 /// If the stream is silent for longer than this time after a soft timeout
56 /// has been emitted, a hard [`TimedOut`][`io::ErrorKind::TimedOut`]
57 /// I/O error is returned and the stream is to be considered dead.
58 pub response_timeout: Duration,
59}
60
61impl Default for Timeouts {
62 fn default() -> Self {
63 Self {
64 read_timeout: Duration::new(300, 0),
65 response_timeout: Duration::new(300, 0),
66 }
67 }
68}
69
70impl Timeouts {
71 /// Tight timeouts suitable for communicating on a fast LAN or localhost.
72 pub fn tight() -> Self {
73 Self {
74 read_timeout: Duration::new(60, 0),
75 response_timeout: Duration::new(15, 0),
76 }
77 }
78
79 fn data_to_soft(&self) -> Duration {
80 self.read_timeout
81 }
82
83 fn soft_to_warn(&self) -> Duration {
84 self.response_timeout / 2
85 }
86
87 fn warn_to_hard(&self) -> Duration {
88 self.response_timeout / 2
89 }
90}
91
92#[derive(Clone, Copy)]
93enum TimeoutLevel {
94 Soft,
95 Warn,
96 Hard,
97}
98
99#[derive(Debug)]
100pub(super) enum RawError {
101 Io(io::Error),
102 SoftTimeout,
103}
104
105impl From<io::Error> for RawError {
106 fn from(other: io::Error) -> Self {
107 Self::Io(other)
108 }
109}
110
111struct TimeoutState {
112 /// Configuration for the timeouts.
113 timeouts: Timeouts,
114
115 /// Level of the next timeout which will trip.
116 level: TimeoutLevel,
117
118 /// Sleep timer used for read timeouts.
119 // NOTE: even though we pretend we could deal with an !Unpin
120 // RawXmlStream, we really can't: box_stream for example needs it,
121 // but also all the typestate around the initial stream setup needs
122 // to be able to move the stream around.
123 deadline: Pin<Box<tokio::time::Sleep>>,
124}
125
126impl TimeoutState {
127 fn new(timeouts: Timeouts) -> Self {
128 Self {
129 deadline: Box::pin(tokio::time::sleep(timeouts.data_to_soft())),
130 level: TimeoutLevel::Soft,
131 timeouts,
132 }
133 }
134
135 fn poll(&mut self, cx: &mut Context) -> Poll<TimeoutLevel> {
136 ready!(self.deadline.as_mut().poll(cx));
137 // Deadline elapsed!
138 let to_return = self.level;
139 let (next_level, next_duration) = match self.level {
140 TimeoutLevel::Soft => (TimeoutLevel::Warn, self.timeouts.soft_to_warn()),
141 TimeoutLevel::Warn => (TimeoutLevel::Hard, self.timeouts.warn_to_hard()),
142 // Something short-ish so that we fire this over and over until
143 // someone finally kills the stream for good.
144 TimeoutLevel::Hard => (TimeoutLevel::Hard, Duration::new(1, 0)),
145 };
146 self.level = next_level;
147 self.deadline.as_mut().reset(Instant::now() + next_duration);
148 Poll::Ready(to_return)
149 }
150
151 fn reset(&mut self) {
152 self.level = TimeoutLevel::Soft;
153 self.deadline
154 .as_mut()
155 .reset((Instant::now() + self.timeouts.data_to_soft()).into());
156 }
157}
158
159pin_project_lite::pin_project! {
160 // NOTE: due to limitations of pin_project_lite, the field comments are
161 // no doc comments. Luckily, this struct is only `pub(super)` anyway.
162 #[project = RawXmlStreamProj]
163 pub(super) struct RawXmlStream<Io> {
164 // The parser used for deserialising data.
165 #[pin]
166 parser: rxml::AsyncReader<CaptureBufRead<Io>>,
167
168 // The writer used for serialising data.
169 writer: rxml::writer::Encoder<rxml::writer::SimpleNamespaces>,
170
171 timeouts: TimeoutState,
172
173 // The default namespace to declare on the stream header.
174 stream_ns: &'static str,
175
176 // Buffer containing serialised data which will then be sent through
177 // the inner `Io`. Sending that serialised data happens in
178 // `poll_ready` and `poll_flush`, while appending serialised data
179 // happens in `start_send`.
180 tx_buffer: BytesMut,
181
182 // Position inside tx_buffer up to which to-be-sent data has already
183 // been logged.
184 tx_buffer_logged: usize,
185
186 // This signifies the limit at the point of which the Sink will
187 // refuse to accept more data: if the `tx_buffer`'s size grows beyond
188 // that high water mark, poll_ready will return Poll::Pending until
189 // it has managed to flush enough data down the inner writer.
190 //
191 // Note that poll_ready will always attempt to progress the writes,
192 // which further reduces the chance of hitting this limit unless
193 // either the underlying writer gets stuck (e.g. TCP connection
194 // breaking in a timeouty way) or a lot of data is written in bulk.
195 // In both cases, the backpressure created by poll_ready returning
196 // Pending is desirable.
197 //
198 // However, there is a catch: We don't assert this condition
199 // in `start_send` at all. The reason is that we cannot suspend
200 // serialisation of an XSO in the middle of writing it: it has to be
201 // written in one batch or you have to start over later (this has to
202 // do with the iterator state borrowing the data and futures getting
203 // cancelled e.g. in tokio::select!). In order to facilitate
204 // implementing a `Sink<T: AsXml>` on top of `RawXmlStream`, we
205 // cannot be strict about what is going on in `start_send`:
206 // `poll_ready` does not know what kind of data will be written (so
207 // it could not make a size estimate, even if that was at all
208 // possible with AsXml) and `start_send` is not a coroutine. So if
209 // `Sink<T: AsXml>` wants to use `RawXmlStream`, it must be able to
210 // submit an entire XSO's items in one batch to `RawXmlStream` after
211 // it has reported to be ready once. That may easily make the buffer
212 // reach its high water mark.
213 //
214 // So if we checked that condition in `start_send` (as opposed to
215 // `poll_ready`), we would cause situations where submitting XSOs
216 // failed randomly (with a panic or other errors) and would have to
217 // be retried later.
218 //
219 // While failing with e.g. io::ErrorKind::WouldBlock is something
220 // that could be investigated later, it would still require being
221 // able to make an accurate estimate of the number of bytes needed to
222 // serialise any given `AsXml`, because as pointed out earlier, once
223 // we have started, there is no going back.
224 //
225 // Finally, none of that hurts much because `RawXmlStream` is only an
226 // internal API. The high-level APIs will always call `poll_ready`
227 // before sending an XSO, which means that we won't *grossly* go over
228 // the TX buffer high water mark---unless you send a really large
229 // XSO at once.
230 tx_buffer_high_water_mark: usize,
231 }
232}
233
234impl<Io: AsyncBufRead + AsyncWrite> RawXmlStream<Io> {
235 fn new_writer(
236 stream_ns: &'static str,
237 ) -> rxml::writer::Encoder<rxml::writer::SimpleNamespaces> {
238 let mut writer = rxml::writer::Encoder::new();
239 writer
240 .ns_tracker_mut()
241 .declare_fixed(Some(xml_ncname!("stream")), XML_STREAM_NS.into());
242 writer
243 .ns_tracker_mut()
244 .declare_fixed(None, stream_ns.into());
245 writer
246 }
247
248 pub(super) fn new(io: Io, stream_ns: &'static str, timeouts: Timeouts) -> Self {
249 let parser = rxml::Parser::default();
250 let mut io = CaptureBufRead::wrap(io);
251 if log_enabled() {
252 io.enable_capture();
253 }
254 Self {
255 parser: rxml::AsyncReader::wrap(io, parser),
256 writer: Self::new_writer(stream_ns),
257 timeouts: TimeoutState::new(timeouts),
258 tx_buffer_logged: 0,
259 stream_ns,
260 tx_buffer: BytesMut::new(),
261
262 // This basically means: "if we already have 2 kiB in our send
263 // buffer, do not accept more data".
264 // Please see the extensive words at
265 //`Self::tx_buffer_high_water_mark` for details.
266 tx_buffer_high_water_mark: 2048,
267 }
268 }
269
270 pub(super) fn reset_state(self: Pin<&mut Self>) {
271 let this = self.project();
272 *this.parser.parser_pinned() = rxml::Parser::default();
273 *this.writer = Self::new_writer(this.stream_ns);
274 }
275
276 pub(super) fn into_inner(self) -> Io {
277 self.parser.into_inner().0.into_inner()
278 }
279
280 /// Box the underlying transport stream.
281 ///
282 /// This removes the specific type of the transport from the XML stream's
283 /// type signature.
284 pub(super) fn box_stream(self) -> RawXmlStream<Box<dyn AsyncReadAndWrite + Send + 'static>>
285 where
286 Io: AsyncReadAndWrite + Send + 'static,
287 {
288 let (io, p) = self.parser.into_inner();
289 let mut io = CaptureBufRead::wrap(Box::new(io) as Box<_>);
290 if log_enabled() {
291 io.enable_capture();
292 }
293 let parser = rxml::AsyncReader::wrap(io, p);
294 RawXmlStream {
295 parser,
296 timeouts: self.timeouts,
297 writer: self.writer,
298 tx_buffer: self.tx_buffer,
299 tx_buffer_logged: self.tx_buffer_logged,
300 tx_buffer_high_water_mark: self.tx_buffer_high_water_mark,
301 stream_ns: self.stream_ns,
302 }
303 }
304}
305
306impl<Io: AsyncWrite> RawXmlStream<Io> {
307 /// Start sending an entire XSO.
308 ///
309 /// Unlike the `Sink` implementation, this provides nice syntax
310 /// highlighting for the serialised data in log outputs (if enabled) *and*
311 /// is error safe: if the XSO fails to serialise completely, it will be as
312 /// if it hadn't been attempted to serialise it at all.
313 ///
314 /// Note that, like with `start_send`, the caller is responsible for
315 /// ensuring that the stream is ready by polling
316 /// [`<Self as Sink>::poll_ready`] as needed.
317 pub(super) fn start_send_xso<T: AsXml>(self: Pin<&mut Self>, xso: &T) -> io::Result<()> {
318 let mut this = self.project();
319 let prev_len = this.tx_buffer.len();
320 match this.try_send_xso(xso) {
321 Ok(()) => Ok(()),
322 Err(e) => {
323 let curr_len = this.tx_buffer.len();
324 this.tx_buffer.truncate(prev_len);
325 log::trace!(
326 "SEND failed: {}. Rewinding buffer by {} bytes.",
327 e,
328 curr_len - prev_len
329 );
330 Err(e)
331 }
332 }
333 }
334}
335
336impl<Io> RawXmlStream<Io> {
337 fn parser_pinned(self: Pin<&mut Self>) -> &mut rxml::Parser {
338 self.project().parser.parser_pinned()
339 }
340
341 fn stream_pinned(self: Pin<&mut Self>) -> Pin<&mut CaptureBufRead<Io>> {
342 self.project().parser.inner_pinned()
343 }
344
345 pub(super) fn get_stream(&self) -> &Io {
346 self.parser.inner().inner()
347 }
348}
349
350impl<Io: AsyncBufRead> Stream for RawXmlStream<Io> {
351 type Item = Result<rxml::Event, RawError>;
352
353 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
354 let mut this = self.project();
355 loop {
356 match this.parser.as_mut().poll_read(cx) {
357 Poll::Pending => (),
358 Poll::Ready(v) => {
359 this.timeouts.reset();
360 match v.transpose() {
361 // Skip the XML declaration, nobody wants to hear about that.
362 Some(Ok(rxml::Event::XmlDeclaration(_, _))) => continue,
363 other => return Poll::Ready(other.map(|x| x.map_err(RawError::Io))),
364 }
365 }
366 };
367
368 // poll_read returned pending... what do the timeouts have to say?
369 match ready!(this.timeouts.poll(cx)) {
370 TimeoutLevel::Soft => return Poll::Ready(Some(Err(RawError::SoftTimeout))),
371 TimeoutLevel::Warn => (),
372 TimeoutLevel::Hard => {
373 return Poll::Ready(Some(Err(RawError::Io(io::Error::new(
374 io::ErrorKind::TimedOut,
375 "read and response timeouts elapsed",
376 )))))
377 }
378 }
379 }
380 }
381}
382
383impl<'x, Io: AsyncWrite> RawXmlStreamProj<'x, Io> {
384 fn flush_tx_log(&mut self) {
385 let range = &self.tx_buffer[*self.tx_buffer_logged..];
386 if range.len() == 0 {
387 return;
388 }
389 log_send(range);
390 *self.tx_buffer_logged = self.tx_buffer.len();
391 }
392
393 fn start_send(&mut self, item: &xso::Item<'_>) -> io::Result<()> {
394 self.writer
395 .encode_into_bytes(item.as_rxml_item(), self.tx_buffer)
396 .map_err(|e| io::Error::new(io::ErrorKind::InvalidInput, e))
397 }
398
399 fn try_send_xso<T: AsXml>(&mut self, xso: &T) -> io::Result<()> {
400 let iter = match xso.as_xml_iter() {
401 Ok(v) => v,
402 Err(e) => return Err(io::Error::new(io::ErrorKind::InvalidInput, e)),
403 };
404 for item in iter {
405 let item = match item {
406 Ok(v) => v,
407 Err(e) => return Err(io::Error::new(io::ErrorKind::InvalidInput, e)),
408 };
409 self.start_send(&item)?;
410 }
411 Ok(())
412 }
413
414 fn progress_write(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
415 self.flush_tx_log();
416 while self.tx_buffer.len() > 0 {
417 let written = match ready!(self
418 .parser
419 .as_mut()
420 .inner_pinned()
421 .poll_write(cx, &self.tx_buffer))
422 {
423 Ok(v) => v,
424 Err(e) => return Poll::Ready(Err(e)),
425 };
426 self.tx_buffer.advance(written);
427 *self.tx_buffer_logged = self
428 .tx_buffer_logged
429 .checked_sub(written)
430 .expect("Buffer arithmetic error");
431 }
432 Poll::Ready(Ok(()))
433 }
434}
435
436impl<Io: AsyncWrite> RawXmlStream<Io> {
437 /// Flush all buffered data and shut down the sender side of the
438 /// underlying transport.
439 ///
440 /// Unlike `poll_close` (from the `Sink` impls), this will not close the
441 /// receiving side of the underlying the transport. It is advisable to call
442 /// `poll_close` eventually after `poll_shutdown` in order to gracefully
443 /// handle situations where the remote side does not close the stream
444 /// cleanly.
445 pub fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
446 ready!(self.as_mut().poll_flush(cx))?;
447 let this = self.project();
448 this.parser.inner_pinned().poll_shutdown(cx)
449 }
450}
451
452impl<'x, Io: AsyncWrite> Sink<xso::Item<'x>> for RawXmlStream<Io> {
453 type Error = io::Error;
454
455 fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
456 let mut this = self.project();
457 match this.progress_write(cx) {
458 // No progress on write, but if we have enough space in the buffer
459 // it's ok nonetheless.
460 Poll::Pending => (),
461 // Some progress and it went fine, move on.
462 Poll::Ready(Ok(())) => (),
463 // Something went wrong -> return the error.
464 Poll::Ready(Err(e)) => return Poll::Ready(Err(e.into())),
465 }
466 if this.tx_buffer.len() < *this.tx_buffer_high_water_mark {
467 Poll::Ready(Ok(()))
468 } else {
469 Poll::Pending
470 }
471 }
472
473 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
474 let mut this = self.project();
475 ready!(this.progress_write(cx))?;
476 this.parser.as_mut().inner_pinned().poll_flush(cx)
477 }
478
479 fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
480 let mut this = self.project();
481 ready!(this.progress_write(cx))?;
482 this.parser.as_mut().inner_pinned().poll_shutdown(cx)
483 }
484
485 fn start_send(self: Pin<&mut Self>, item: xso::Item<'x>) -> Result<(), Self::Error> {
486 let mut this = self.project();
487 this.start_send(&item)
488 }
489}
490
491/// Error returned by the [`ReadXso`] future and the [`ReadXsoState`] helper.
492pub(super) enum ReadXsoError {
493 /// The outer element was closed before a child element could be read.
494 ///
495 /// This is typically the stream footer in XML stream applications.
496 Footer,
497
498 /// A hard error occurred.
499 ///
500 /// This is either a real I/O error or an error from the XML parser.
501 /// Neither are recoverable, because the nesting state is lost and
502 /// in addition, XML errors are not recoverable because they indicate a
503 /// not well-formed document.
504 Hard(io::Error),
505
506 /// The underlying stream signalled a soft read timeout before a child
507 /// element could be read.
508 ///
509 /// Note that soft timeouts which are triggered in the middle of receiving
510 /// an element are converted to hard timeouts (i.e. I/O errors).
511 ///
512 /// This masking is intentional, because:
513 /// - Returning a [`Self::SoftTimeout`] from the middle of parsing is not
514 /// possible without complicating the API.
515 /// - There is no reason why the remote side should interrupt sending data
516 /// in the middle of an element except if it or the transport has failed
517 /// fatally.
518 SoftTimeout,
519
520 /// A parse error occurred.
521 ///
522 /// The XML structure was well-formed, but the data contained did not
523 /// match the XSO which was attempted to be parsed. This error is
524 /// recoverable: when this error is emitted, the XML stream is at the same
525 /// nesting level as it was before the XSO was attempted to be read; all
526 /// XML structure which belonged to the XSO which failed to parse has
527 /// been consumed. This allows to read more XSOs even if one fails to
528 /// parse.
529 Parse(xso::error::Error),
530}
531
532impl From<io::Error> for ReadXsoError {
533 fn from(other: io::Error) -> Self {
534 Self::Hard(other)
535 }
536}
537
538impl From<xso::error::Error> for ReadXsoError {
539 fn from(other: xso::error::Error) -> Self {
540 Self::Parse(other)
541 }
542}
543
544/// State for reading an XSO from a `Stream<Item = Result<rxml::Event, ...>>`.
545///
546/// Due to pinning, it is simpler to implement the statemachine in a dedicated
547/// enum and let the actual (pinned) future pass the stream toward this enum's
548/// function.
549///
550/// This is used by both [`ReadXso`] and the [`super::XmlStream`] itself.
551#[derive(Default)]
552pub(super) enum ReadXsoState<T: FromXml> {
553 /// The [`rxml::Event::StartElement`] event was not seen yet.
554 ///
555 /// In this state, XML whitespace is ignored (as per RFC 6120 § 11.7), but
556 /// other text data is rejected.
557 #[default]
558 PreData,
559
560 /// The [`rxml::Event::StartElement`] event was received.
561 ///
562 /// The inner value is the builder for the "return type" of this enum and
563 /// the implementation in the [`xso`] crate does all the heavy lifting:
564 /// we'll only send events in its general direction.
565 // We use the fallible parsing here so that we don't have to do the depth
566 // accounting ourselves.
567 Parsing(<Result<T, xso::error::Error> as FromXml>::Builder),
568
569 /// The parsing has completed (successful or not).
570 ///
571 /// This is a final state and attempting to advance the state will panic.
572 /// This is in accordance with [`core::future::Future::poll`]'s contract,
573 /// for which this enum is primarily used.
574 Done,
575}
576
577impl<T: FromXml> ReadXsoState<T> {
578 /// Progress reading the XSO from the given source.
579 ///
580 /// This attempts to parse a single XSO from the underlying stream,
581 /// while discarding any XML whitespace before the beginning of the XSO.
582 ///
583 /// If the XSO is parsed successfully, the method returns Ready with the
584 /// parsed value. If parsing fails or an I/O error occurs, an appropriate
585 /// error is returned.
586 ///
587 /// If parsing fails, the entire XML subtree belonging to the XSO is
588 /// nonetheless processed. That makes parse errors recoverable: After
589 /// `poll_advance` has returned Ready with either an Ok result or a
590 /// [`ReadXsoError::Parse`] error variant, another XSO can be read and the
591 /// XML parsing will be at the same nesting depth as it was before the
592 /// first call to `poll_advance`.
593 ///
594 /// Note that this guarantee does not hold for non-parse errors (i.e. for
595 /// the other variants of [`ReadXsoError`]): I/O errors as well as
596 /// occurrence of the outer closing element are fatal.
597 ///
598 /// The `source` passed to `poll_advance` should be the same on every
599 /// call.
600 pub(super) fn poll_advance<Io: AsyncBufRead>(
601 &mut self,
602 mut source: Pin<&mut RawXmlStream<Io>>,
603 cx: &mut Context<'_>,
604 ) -> Poll<Result<T, ReadXsoError>> {
605 loop {
606 // Disable text buffering before the start event. That way, we
607 // don't accumulate infinite amounts of XML whitespace caused by
608 // whitespace keepalives.
609 // (And also, we'll know faster when the remote side sends
610 // non-whitespace garbage.)
611 let text_buffering = match self {
612 ReadXsoState::PreData => false,
613 _ => true,
614 };
615 source
616 .as_mut()
617 .parser_pinned()
618 .set_text_buffering(text_buffering);
619
620 let ev = ready!(source.as_mut().poll_next(cx)).transpose();
621 match self {
622 ReadXsoState::PreData => {
623 log::trace!("ReadXsoState::PreData ev = {:?}", ev);
624 match ev {
625 Ok(Some(rxml::Event::XmlDeclaration(_, _))) => (),
626 Ok(Some(rxml::Event::Text(_, data))) => {
627 if xso::is_xml_whitespace(data.as_bytes()) {
628 log::trace!("Received {} bytes of whitespace", data.len());
629 source.as_mut().stream_pinned().discard_capture();
630 continue;
631 } else {
632 *self = ReadXsoState::Done;
633 return Poll::Ready(Err(io::Error::new(
634 io::ErrorKind::InvalidData,
635 "non-whitespace text content before XSO",
636 )
637 .into()));
638 }
639 }
640 Ok(Some(rxml::Event::StartElement(_, name, attrs))) => {
641 *self = ReadXsoState::Parsing(
642 <Result<T, xso::error::Error> as FromXml>::from_events(name, attrs)
643 .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?,
644 );
645 }
646 // Amounts to EOF, as we expect to start on the stream level.
647 Ok(Some(rxml::Event::EndElement(_))) => {
648 *self = ReadXsoState::Done;
649 return Poll::Ready(Err(ReadXsoError::Footer));
650 }
651 Ok(None) => {
652 *self = ReadXsoState::Done;
653 return Poll::Ready(Err(io::Error::new(
654 io::ErrorKind::InvalidData,
655 "eof before XSO started",
656 )
657 .into()));
658 }
659 Err(RawError::SoftTimeout) => {
660 *self = ReadXsoState::Done;
661 return Poll::Ready(Err(ReadXsoError::SoftTimeout));
662 }
663 Err(RawError::Io(e)) => {
664 *self = ReadXsoState::Done;
665 return Poll::Ready(Err(ReadXsoError::Hard(e)));
666 }
667 }
668 }
669 ReadXsoState::Parsing(builder) => {
670 log::trace!("ReadXsoState::Parsing ev = {:?}", ev);
671 let ev = match ev {
672 Ok(Some(ev)) => ev,
673 Ok(None) => {
674 *self = ReadXsoState::Done;
675 return Poll::Ready(Err(io::Error::new(
676 io::ErrorKind::UnexpectedEof,
677 "eof during XSO parsing",
678 )
679 .into()));
680 }
681 Err(RawError::Io(e)) => {
682 *self = ReadXsoState::Done;
683 return Poll::Ready(Err(e.into()));
684 }
685 Err(RawError::SoftTimeout) => {
686 // See also [`ReadXsoError::SoftTimeout`] for why
687 // we mask the SoftTimeout condition here.
688 *self = ReadXsoState::Done;
689 return Poll::Ready(Err(io::Error::new(
690 io::ErrorKind::TimedOut,
691 "read timeout during XSO parsing",
692 )
693 .into()));
694 }
695 };
696
697 match builder.feed(ev) {
698 Err(err) => {
699 *self = ReadXsoState::Done;
700 source.as_mut().stream_pinned().discard_capture();
701 return Poll::Ready(Err(io::Error::new(
702 io::ErrorKind::InvalidData,
703 err,
704 )
705 .into()));
706 }
707 Ok(Some(Err(err))) => {
708 *self = ReadXsoState::Done;
709 log_recv(Some(&err), source.as_mut().stream_pinned().take_capture());
710 return Poll::Ready(Err(ReadXsoError::Parse(err)));
711 }
712 Ok(Some(Ok(value))) => {
713 *self = ReadXsoState::Done;
714 log_recv(None, source.as_mut().stream_pinned().take_capture());
715 return Poll::Ready(Ok(value));
716 }
717 Ok(None) => (),
718 }
719 }
720
721 // The error talks about "future", simply because that is
722 // where `Self` is used (inside `core::future::Future::poll`).
723 ReadXsoState::Done => panic!("future polled after completion"),
724 }
725 }
726 }
727}
728
729/// Future to read a single XSO from a stream.
730pub(super) struct ReadXso<'x, Io, T: FromXml> {
731 /// Stream to read the future from.
732 inner: Pin<&'x mut RawXmlStream<Io>>,
733
734 /// Current state of parsing.
735 state: ReadXsoState<T>,
736}
737
738impl<'x, Io: AsyncBufRead, T: FromXml> ReadXso<'x, Io, T> {
739 /// Start reading a single XSO from a stream.
740 pub(super) fn read_from(stream: Pin<&'x mut RawXmlStream<Io>>) -> Self {
741 Self {
742 inner: stream,
743 state: ReadXsoState::PreData,
744 }
745 }
746}
747
748impl<'x, Io: AsyncBufRead, T: FromXml> Future for ReadXso<'x, Io, T>
749where
750 T::Builder: Unpin,
751{
752 type Output = Result<T, ReadXsoError>;
753
754 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
755 let this = self.get_mut();
756 this.state.poll_advance(this.inner.as_mut(), cx)
757 }
758}
759
760/// Contains metadata from an XML stream header
761#[derive(Default, Debug)]
762pub struct StreamHeader<'x> {
763 /// The optional `from` attribute.
764 pub from: Option<Cow<'x, str>>,
765
766 /// The optional `to` attribute.
767 pub to: Option<Cow<'x, str>>,
768
769 /// The optional `id` attribute.
770 pub id: Option<Cow<'x, str>>,
771}
772
773impl<'x> StreamHeader<'x> {
774 /// Take the contents and return them as new object.
775 ///
776 /// `self` will be left with all its parts set to `None`.
777 pub fn take(&mut self) -> Self {
778 Self {
779 from: self.from.take(),
780 to: self.to.take(),
781 id: self.id.take(),
782 }
783 }
784
785 pub(super) async fn send<Io: AsyncWrite>(
786 self,
787 mut stream: Pin<&mut RawXmlStream<Io>>,
788 ) -> io::Result<()> {
789 stream
790 .send(Item::XmlDeclaration(rxml::XmlVersion::V1_0))
791 .await?;
792 stream
793 .send(Item::ElementHeadStart(
794 Namespace::from(XML_STREAM_NS),
795 Cow::Borrowed(xml_ncname!("stream")),
796 ))
797 .await?;
798 if let Some(from) = self.from {
799 stream
800 .send(Item::Attribute(
801 Namespace::NONE,
802 Cow::Borrowed(xml_ncname!("from")),
803 from,
804 ))
805 .await?;
806 }
807 if let Some(to) = self.to {
808 stream
809 .send(Item::Attribute(
810 Namespace::NONE,
811 Cow::Borrowed(xml_ncname!("to")),
812 to,
813 ))
814 .await?;
815 }
816 if let Some(id) = self.id {
817 stream
818 .send(Item::Attribute(
819 Namespace::NONE,
820 Cow::Borrowed(xml_ncname!("id")),
821 id,
822 ))
823 .await?;
824 }
825 stream
826 .send(Item::Attribute(
827 Namespace::NONE,
828 Cow::Borrowed(xml_ncname!("version")),
829 Cow::Borrowed("1.0"),
830 ))
831 .await?;
832 stream.send(Item::ElementHeadEnd).await?;
833 Ok(())
834 }
835}
836
837impl StreamHeader<'static> {
838 pub(super) async fn recv<Io: AsyncBufRead>(
839 mut stream: Pin<&mut RawXmlStream<Io>>,
840 ) -> io::Result<Self> {
841 loop {
842 match stream.as_mut().next().await {
843 Some(Err(RawError::Io(e))) => return Err(e),
844 Some(Err(RawError::SoftTimeout)) => (),
845 Some(Ok(Event::StartElement(_, (ns, name), mut attrs))) => {
846 if ns != XML_STREAM_NS || name != "stream" {
847 return Err(io::Error::new(
848 io::ErrorKind::InvalidData,
849 "unknown stream header",
850 ));
851 }
852
853 match attrs.remove(Namespace::none(), "version") {
854 Some(v) => {
855 if v != "1.0" {
856 return Err(io::Error::new(
857 io::ErrorKind::InvalidData,
858 format!("unsuppored stream version: {}", v),
859 ));
860 }
861 }
862 None => {
863 return Err(io::Error::new(
864 io::ErrorKind::InvalidData,
865 "required `version` attribute missing",
866 ))
867 }
868 }
869
870 let from = attrs.remove(Namespace::none(), "from");
871 let to = attrs.remove(Namespace::none(), "to");
872 let id = attrs.remove(Namespace::none(), "id");
873 let _ = attrs.remove(Namespace::xml(), "lang");
874
875 if let Some(((ns, name), _)) = attrs.into_iter().next() {
876 return Err(io::Error::new(
877 io::ErrorKind::InvalidData,
878 format!("unexpected stream header attribute: {{{}}}{}", ns, name),
879 ));
880 }
881
882 return Ok(StreamHeader {
883 from: from.map(Cow::Owned),
884 to: to.map(Cow::Owned),
885 id: id.map(Cow::Owned),
886 });
887 }
888 Some(Ok(Event::Text(_, _))) | Some(Ok(Event::EndElement(_))) => {
889 return Err(io::Error::new(
890 io::ErrorKind::UnexpectedEof,
891 "unexpected content before stream header",
892 ))
893 }
894 // We cannot loop infinitely here because the XML parser will
895 // prevent more than one XML declaration from being parsed.
896 Some(Ok(Event::XmlDeclaration(_, _))) => (),
897 None => {
898 return Err(io::Error::new(
899 io::ErrorKind::UnexpectedEof,
900 "eof before stream header",
901 ))
902 }
903 }
904 }
905 }
906}