tokio_xmpp/xmlstream/common.rs
1// Copyright (c) 2024 Jonas Schäfer <jonas@zombofant.net>
2//
3// This Source Code Form is subject to the terms of the Mozilla Public
4// License, v. 2.0. If a copy of the MPL was not distributed with this
5// file, You can obtain one at http://mozilla.org/MPL/2.0/.
6
7use alloc::borrow::Cow;
8use core::{
9 future::Future,
10 pin::Pin,
11 task::{Context, Poll},
12 time::Duration,
13};
14use std::io;
15
16use futures::{ready, Sink, SinkExt, Stream, StreamExt};
17
18use bytes::{Buf, BytesMut};
19
20use tokio::{
21 io::{AsyncBufRead, AsyncWrite},
22 time::Instant,
23};
24
25use xso::{
26 exports::rxml::{
27 self, writer::TrackNamespace, xml_lang::XmlLangStack, xml_ncname, Event, Namespace,
28 },
29 AsXml, FromEventsBuilder, FromXml, Item,
30};
31
32use crate::connect::AsyncReadAndWrite;
33
34use super::capture::{log_enabled, log_recv, log_send, CaptureBufRead};
35
36use xmpp_parsers::ns::STREAM as XML_STREAM_NS;
37
38/// Configuration for timeouts on an XML stream.
39///
40/// The defaults are tuned toward common desktop/laptop use and may not hold
41/// up to extreme conditions (arctic satellite link, mobile internet on a
42/// train in Brandenburg, Germany, and similar) and may be inefficient in
43/// other conditions (stable server link, localhost communication).
44#[derive(Debug, Clone, Copy)]
45pub struct Timeouts {
46 /// Maximum silence time before a
47 /// [`ReadError::SoftTimeout`][`super::ReadError::SoftTimeout`] is
48 /// returned.
49 ///
50 /// Soft timeouts are not fatal, but they must be handled by user code so
51 /// that more data is read after at most [`Self::response_timeout`],
52 /// starting from the moment the soft timeout is returned.
53 pub read_timeout: Duration,
54
55 /// Maximum silence after a soft timeout.
56 ///
57 /// If the stream is silent for longer than this time after a soft timeout
58 /// has been emitted, a hard [`TimedOut`][`io::ErrorKind::TimedOut`]
59 /// I/O error is returned and the stream is to be considered dead.
60 pub response_timeout: Duration,
61}
62
63impl Default for Timeouts {
64 fn default() -> Self {
65 Self {
66 read_timeout: Duration::new(300, 0),
67 response_timeout: Duration::new(300, 0),
68 }
69 }
70}
71
72impl Timeouts {
73 /// Tight timeouts suitable for communicating on a fast LAN or localhost.
74 pub fn tight() -> Self {
75 Self {
76 read_timeout: Duration::new(60, 0),
77 response_timeout: Duration::new(15, 0),
78 }
79 }
80
81 fn data_to_soft(&self) -> Duration {
82 self.read_timeout
83 }
84
85 fn soft_to_warn(&self) -> Duration {
86 self.response_timeout / 2
87 }
88
89 fn warn_to_hard(&self) -> Duration {
90 self.response_timeout / 2
91 }
92}
93
94#[derive(Clone, Copy)]
95enum TimeoutLevel {
96 Soft,
97 Warn,
98 Hard,
99}
100
101#[derive(Debug)]
102pub(super) enum RawError {
103 Io(io::Error),
104 SoftTimeout,
105}
106
107impl From<io::Error> for RawError {
108 fn from(other: io::Error) -> Self {
109 Self::Io(other)
110 }
111}
112
113struct TimeoutState {
114 /// Configuration for the timeouts.
115 timeouts: Timeouts,
116
117 /// Level of the next timeout which will trip.
118 level: TimeoutLevel,
119
120 /// Sleep timer used for read timeouts.
121 // NOTE: even though we pretend we could deal with an !Unpin
122 // RawXmlStream, we really can't: box_stream for example needs it,
123 // but also all the typestate around the initial stream setup needs
124 // to be able to move the stream around.
125 deadline: Pin<Box<tokio::time::Sleep>>,
126}
127
128impl TimeoutState {
129 fn new(timeouts: Timeouts) -> Self {
130 Self {
131 deadline: Box::pin(tokio::time::sleep(timeouts.data_to_soft())),
132 level: TimeoutLevel::Soft,
133 timeouts,
134 }
135 }
136
137 fn poll(&mut self, cx: &mut Context) -> Poll<TimeoutLevel> {
138 ready!(self.deadline.as_mut().poll(cx));
139 // Deadline elapsed!
140 let to_return = self.level;
141 let (next_level, next_duration) = match self.level {
142 TimeoutLevel::Soft => (TimeoutLevel::Warn, self.timeouts.soft_to_warn()),
143 TimeoutLevel::Warn => (TimeoutLevel::Hard, self.timeouts.warn_to_hard()),
144 // Something short-ish so that we fire this over and over until
145 // someone finally kills the stream for good.
146 TimeoutLevel::Hard => (TimeoutLevel::Hard, Duration::new(1, 0)),
147 };
148 self.level = next_level;
149 self.deadline.as_mut().reset(Instant::now() + next_duration);
150 Poll::Ready(to_return)
151 }
152
153 fn reset(&mut self) {
154 self.level = TimeoutLevel::Soft;
155 self.deadline
156 .as_mut()
157 .reset(Instant::now() + self.timeouts.data_to_soft());
158 }
159}
160
161pin_project_lite::pin_project! {
162 // NOTE: due to limitations of pin_project_lite, the field comments are
163 // no doc comments. Luckily, this struct is only `pub(super)` anyway.
164 #[project = RawXmlStreamProj]
165 pub(super) struct RawXmlStream<Io> {
166 // The parser used for deserialising data.
167 #[pin]
168 parser: rxml::AsyncReader<CaptureBufRead<Io>>,
169
170 // Tracker for `xml:lang` data.
171 lang_stack: XmlLangStack,
172
173 // The writer used for serialising data.
174 writer: rxml::writer::Encoder<rxml::writer::SimpleNamespaces>,
175
176 timeouts: TimeoutState,
177
178 // The default namespace to declare on the stream header.
179 stream_ns: &'static str,
180
181 // Buffer containing serialised data which will then be sent through
182 // the inner `Io`. Sending that serialised data happens in
183 // `poll_ready` and `poll_flush`, while appending serialised data
184 // happens in `start_send`.
185 tx_buffer: BytesMut,
186
187 // Position inside tx_buffer up to which to-be-sent data has already
188 // been logged.
189 tx_buffer_logged: usize,
190
191 // This signifies the limit at the point of which the Sink will
192 // refuse to accept more data: if the `tx_buffer`'s size grows beyond
193 // that high water mark, poll_ready will return Poll::Pending until
194 // it has managed to flush enough data down the inner writer.
195 //
196 // Note that poll_ready will always attempt to progress the writes,
197 // which further reduces the chance of hitting this limit unless
198 // either the underlying writer gets stuck (e.g. TCP connection
199 // breaking in a timeouty way) or a lot of data is written in bulk.
200 // In both cases, the backpressure created by poll_ready returning
201 // Pending is desirable.
202 //
203 // However, there is a catch: We don't assert this condition
204 // in `start_send` at all. The reason is that we cannot suspend
205 // serialisation of an XSO in the middle of writing it: it has to be
206 // written in one batch or you have to start over later (this has to
207 // do with the iterator state borrowing the data and futures getting
208 // cancelled e.g. in tokio::select!). In order to facilitate
209 // implementing a `Sink<T: AsXml>` on top of `RawXmlStream`, we
210 // cannot be strict about what is going on in `start_send`:
211 // `poll_ready` does not know what kind of data will be written (so
212 // it could not make a size estimate, even if that was at all
213 // possible with AsXml) and `start_send` is not a coroutine. So if
214 // `Sink<T: AsXml>` wants to use `RawXmlStream`, it must be able to
215 // submit an entire XSO's items in one batch to `RawXmlStream` after
216 // it has reported to be ready once. That may easily make the buffer
217 // reach its high water mark.
218 //
219 // So if we checked that condition in `start_send` (as opposed to
220 // `poll_ready`), we would cause situations where submitting XSOs
221 // failed randomly (with a panic or other errors) and would have to
222 // be retried later.
223 //
224 // While failing with e.g. io::ErrorKind::WouldBlock is something
225 // that could be investigated later, it would still require being
226 // able to make an accurate estimate of the number of bytes needed to
227 // serialise any given `AsXml`, because as pointed out earlier, once
228 // we have started, there is no going back.
229 //
230 // Finally, none of that hurts much because `RawXmlStream` is only an
231 // internal API. The high-level APIs will always call `poll_ready`
232 // before sending an XSO, which means that we won't *grossly* go over
233 // the TX buffer high water mark---unless you send a really large
234 // XSO at once.
235 tx_buffer_high_water_mark: usize,
236 }
237}
238
239impl<Io: AsyncBufRead + AsyncWrite> RawXmlStream<Io> {
240 fn new_writer(
241 stream_ns: &'static str,
242 ) -> rxml::writer::Encoder<rxml::writer::SimpleNamespaces> {
243 let mut writer = rxml::writer::Encoder::new();
244 writer
245 .ns_tracker_mut()
246 .declare_fixed(Some(xml_ncname!("stream")), XML_STREAM_NS.into());
247 writer
248 .ns_tracker_mut()
249 .declare_fixed(None, stream_ns.into());
250 writer
251 }
252
253 pub(super) fn new(io: Io, stream_ns: &'static str, timeouts: Timeouts) -> Self {
254 let parser = rxml::Parser::default();
255 let mut io = CaptureBufRead::wrap(io);
256 if log_enabled() {
257 io.enable_capture();
258 }
259 Self {
260 parser: rxml::AsyncReader::wrap(io, parser),
261 writer: Self::new_writer(stream_ns),
262 lang_stack: XmlLangStack::new(),
263 timeouts: TimeoutState::new(timeouts),
264 tx_buffer_logged: 0,
265 stream_ns,
266 tx_buffer: BytesMut::new(),
267
268 // This basically means: "if we already have 2 kiB in our send
269 // buffer, do not accept more data".
270 // Please see the extensive words at
271 //`Self::tx_buffer_high_water_mark` for details.
272 tx_buffer_high_water_mark: 2048,
273 }
274 }
275
276 pub(super) fn reset_state(self: Pin<&mut Self>) {
277 let this = self.project();
278 *this.parser.parser_pinned() = rxml::Parser::default();
279 *this.lang_stack = XmlLangStack::new();
280 *this.writer = Self::new_writer(this.stream_ns);
281 }
282
283 pub(super) fn into_inner(self) -> Io {
284 self.parser.into_inner().0.into_inner()
285 }
286
287 /// Box the underlying transport stream.
288 ///
289 /// This removes the specific type of the transport from the XML stream's
290 /// type signature.
291 pub(super) fn box_stream(self) -> RawXmlStream<Box<dyn AsyncReadAndWrite + Send + 'static>>
292 where
293 Io: AsyncReadAndWrite + Send + 'static,
294 {
295 let (io, p) = self.parser.into_inner();
296 let mut io = CaptureBufRead::wrap(Box::new(io) as Box<_>);
297 if log_enabled() {
298 io.enable_capture();
299 }
300 let parser = rxml::AsyncReader::wrap(io, p);
301 RawXmlStream {
302 parser,
303 lang_stack: XmlLangStack::new(),
304 timeouts: self.timeouts,
305 writer: self.writer,
306 tx_buffer: self.tx_buffer,
307 tx_buffer_logged: self.tx_buffer_logged,
308 tx_buffer_high_water_mark: self.tx_buffer_high_water_mark,
309 stream_ns: self.stream_ns,
310 }
311 }
312}
313
314impl<Io: AsyncWrite> RawXmlStream<Io> {
315 /// Start sending an entire XSO.
316 ///
317 /// Unlike the `Sink` implementation, this provides nice syntax
318 /// highlighting for the serialised data in log outputs (if enabled) *and*
319 /// is error safe: if the XSO fails to serialise completely, it will be as
320 /// if it hadn't been attempted to serialise it at all.
321 ///
322 /// Note that, like with `start_send`, the caller is responsible for
323 /// ensuring that the stream is ready by polling
324 /// [`<Self as Sink>::poll_ready`] as needed.
325 pub(super) fn start_send_xso<T: AsXml>(self: Pin<&mut Self>, xso: &T) -> io::Result<()> {
326 let mut this = self.project();
327 let prev_len = this.tx_buffer.len();
328 match this.try_send_xso(xso) {
329 Ok(()) => Ok(()),
330 Err(e) => {
331 let curr_len = this.tx_buffer.len();
332 this.tx_buffer.truncate(prev_len);
333 log::trace!(
334 "SEND failed: {}. Rewinding buffer by {} bytes.",
335 e,
336 curr_len - prev_len
337 );
338 Err(e)
339 }
340 }
341 }
342}
343
344impl<Io> RawXmlStream<Io> {
345 fn parser_pinned(self: Pin<&mut Self>) -> &mut rxml::Parser {
346 self.project().parser.parser_pinned()
347 }
348
349 fn stream_pinned(self: Pin<&mut Self>) -> Pin<&mut CaptureBufRead<Io>> {
350 self.project().parser.inner_pinned()
351 }
352
353 pub(super) fn get_stream(&self) -> &Io {
354 self.parser.inner().inner()
355 }
356}
357
358impl<Io: AsyncBufRead> Stream for RawXmlStream<Io> {
359 type Item = Result<rxml::Event, RawError>;
360
361 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
362 let mut this = self.project();
363 loop {
364 match this.parser.as_mut().poll_read(cx) {
365 Poll::Pending => (),
366 Poll::Ready(v) => {
367 this.timeouts.reset();
368 match v.transpose() {
369 // Skip the XML declaration, nobody wants to hear about that.
370 Some(Ok(rxml::Event::XmlDeclaration(_, _))) => continue,
371 Some(Ok(event)) => {
372 this.lang_stack.handle_event(&event);
373 return Poll::Ready(Some(Ok(event)));
374 }
375 other => return Poll::Ready(other.map(|x| x.map_err(RawError::Io))),
376 }
377 }
378 };
379
380 // poll_read returned pending... what do the timeouts have to say?
381 match ready!(this.timeouts.poll(cx)) {
382 TimeoutLevel::Soft => return Poll::Ready(Some(Err(RawError::SoftTimeout))),
383 TimeoutLevel::Warn => (),
384 TimeoutLevel::Hard => {
385 return Poll::Ready(Some(Err(RawError::Io(io::Error::new(
386 io::ErrorKind::TimedOut,
387 "read and response timeouts elapsed",
388 )))))
389 }
390 }
391 }
392 }
393}
394
395impl<Io: AsyncWrite> RawXmlStreamProj<'_, Io> {
396 fn flush_tx_log(&mut self) {
397 let range = &self.tx_buffer[*self.tx_buffer_logged..];
398 if range.is_empty() {
399 return;
400 }
401 log_send(range);
402 *self.tx_buffer_logged = self.tx_buffer.len();
403 }
404
405 fn start_send(&mut self, item: &xso::Item<'_>) -> io::Result<()> {
406 self.writer
407 .encode_into_bytes(item.as_rxml_item(), self.tx_buffer)
408 .map_err(|e| io::Error::new(io::ErrorKind::InvalidInput, e))
409 }
410
411 fn try_send_xso<T: AsXml>(&mut self, xso: &T) -> io::Result<()> {
412 let iter = match xso.as_xml_iter() {
413 Ok(v) => v,
414 Err(e) => return Err(io::Error::new(io::ErrorKind::InvalidInput, e)),
415 };
416 for item in iter {
417 let item = match item {
418 Ok(v) => v,
419 Err(e) => return Err(io::Error::new(io::ErrorKind::InvalidInput, e)),
420 };
421 self.start_send(&item)?;
422 }
423 Ok(())
424 }
425
426 fn progress_write(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
427 self.flush_tx_log();
428 while !self.tx_buffer.is_empty() {
429 let written = match ready!(self
430 .parser
431 .as_mut()
432 .inner_pinned()
433 .poll_write(cx, self.tx_buffer))
434 {
435 Ok(v) => v,
436 Err(e) => return Poll::Ready(Err(e)),
437 };
438 self.tx_buffer.advance(written);
439 *self.tx_buffer_logged = self
440 .tx_buffer_logged
441 .checked_sub(written)
442 .expect("Buffer arithmetic error");
443 }
444 Poll::Ready(Ok(()))
445 }
446}
447
448impl<Io: AsyncWrite> RawXmlStream<Io> {
449 /// Flush all buffered data and shut down the sender side of the
450 /// underlying transport.
451 ///
452 /// Unlike `poll_close` (from the `Sink` impls), this will not close the
453 /// receiving side of the underlying the transport. It is advisable to call
454 /// `poll_close` eventually after `poll_shutdown` in order to gracefully
455 /// handle situations where the remote side does not close the stream
456 /// cleanly.
457 pub fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
458 ready!(self.as_mut().poll_flush(cx))?;
459 let this = self.project();
460 this.parser.inner_pinned().poll_shutdown(cx)
461 }
462}
463
464impl<'x, Io: AsyncWrite> Sink<xso::Item<'x>> for RawXmlStream<Io> {
465 type Error = io::Error;
466
467 fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
468 let mut this = self.project();
469 match this.progress_write(cx) {
470 // No progress on write, but if we have enough space in the buffer
471 // it's ok nonetheless.
472 Poll::Pending => (),
473 // Some progress and it went fine, move on.
474 Poll::Ready(Ok(())) => (),
475 // Something went wrong -> return the error.
476 Poll::Ready(Err(e)) => return Poll::Ready(Err(e)),
477 }
478 if this.tx_buffer.len() < *this.tx_buffer_high_water_mark {
479 Poll::Ready(Ok(()))
480 } else {
481 Poll::Pending
482 }
483 }
484
485 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
486 let mut this = self.project();
487 ready!(this.progress_write(cx))?;
488 this.parser.as_mut().inner_pinned().poll_flush(cx)
489 }
490
491 fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
492 let mut this = self.project();
493 ready!(this.progress_write(cx))?;
494 this.parser.as_mut().inner_pinned().poll_shutdown(cx)
495 }
496
497 fn start_send(self: Pin<&mut Self>, item: xso::Item<'x>) -> Result<(), Self::Error> {
498 let mut this = self.project();
499 this.start_send(&item)
500 }
501}
502
503/// Error returned by the [`ReadXso`] future and the [`ReadXsoState`] helper.
504pub(super) enum ReadXsoError {
505 /// The outer element was closed before a child element could be read.
506 ///
507 /// This is typically the stream footer in XML stream applications.
508 Footer,
509
510 /// A hard error occurred.
511 ///
512 /// This is either a real I/O error or an error from the XML parser.
513 /// Neither are recoverable, because the nesting state is lost and
514 /// in addition, XML errors are not recoverable because they indicate a
515 /// not well-formed document.
516 Hard(io::Error),
517
518 /// The underlying stream signalled a soft read timeout before a child
519 /// element could be read.
520 ///
521 /// Note that soft timeouts which are triggered in the middle of receiving
522 /// an element are converted to hard timeouts (i.e. I/O errors).
523 ///
524 /// This masking is intentional, because:
525 /// - Returning a [`Self::SoftTimeout`] from the middle of parsing is not
526 /// possible without complicating the API.
527 /// - There is no reason why the remote side should interrupt sending data
528 /// in the middle of an element except if it or the transport has failed
529 /// fatally.
530 SoftTimeout,
531
532 /// A parse error occurred.
533 ///
534 /// The XML structure was well-formed, but the data contained did not
535 /// match the XSO which was attempted to be parsed. This error is
536 /// recoverable: when this error is emitted, the XML stream is at the same
537 /// nesting level as it was before the XSO was attempted to be read; all
538 /// XML structure which belonged to the XSO which failed to parse has
539 /// been consumed. This allows to read more XSOs even if one fails to
540 /// parse.
541 Parse(xso::error::Error),
542}
543
544impl From<io::Error> for ReadXsoError {
545 fn from(other: io::Error) -> Self {
546 Self::Hard(other)
547 }
548}
549
550impl From<xso::error::Error> for ReadXsoError {
551 fn from(other: xso::error::Error) -> Self {
552 Self::Parse(other)
553 }
554}
555
556/// State for reading an XSO from a `Stream<Item = Result<rxml::Event, ...>>`.
557///
558/// Due to pinning, it is simpler to implement the statemachine in a dedicated
559/// enum and let the actual (pinned) future pass the stream toward this enum's
560/// function.
561///
562/// This is used by both [`ReadXso`] and the [`super::XmlStream`] itself.
563#[derive(Default)]
564pub(super) enum ReadXsoState<T: FromXml> {
565 /// The [`rxml::Event::StartElement`] event was not seen yet.
566 ///
567 /// In this state, XML whitespace is ignored (as per RFC 6120 § 11.7), but
568 /// other text data is rejected.
569 #[default]
570 PreData,
571
572 /// The [`rxml::Event::StartElement`] event was received.
573 ///
574 /// The inner value is the builder for the "return type" of this enum and
575 /// the implementation in the [`xso`] crate does all the heavy lifting:
576 /// we'll only send events in its general direction.
577 // We use the fallible parsing here so that we don't have to do the depth
578 // accounting ourselves.
579 Parsing(<Result<T, xso::error::Error> as FromXml>::Builder),
580
581 /// The parsing has completed (successful or not).
582 ///
583 /// This is a final state and attempting to advance the state will panic.
584 /// This is in accordance with [`core::future::Future::poll`]'s contract,
585 /// for which this enum is primarily used.
586 Done,
587}
588
589impl<T: FromXml> ReadXsoState<T> {
590 /// Progress reading the XSO from the given source.
591 ///
592 /// This attempts to parse a single XSO from the underlying stream,
593 /// while discarding any XML whitespace before the beginning of the XSO.
594 ///
595 /// If the XSO is parsed successfully, the method returns Ready with the
596 /// parsed value. If parsing fails or an I/O error occurs, an appropriate
597 /// error is returned.
598 ///
599 /// If parsing fails, the entire XML subtree belonging to the XSO is
600 /// nonetheless processed. That makes parse errors recoverable: After
601 /// `poll_advance` has returned Ready with either an Ok result or a
602 /// [`ReadXsoError::Parse`] error variant, another XSO can be read and the
603 /// XML parsing will be at the same nesting depth as it was before the
604 /// first call to `poll_advance`.
605 ///
606 /// Note that this guarantee does not hold for non-parse errors (i.e. for
607 /// the other variants of [`ReadXsoError`]): I/O errors as well as
608 /// occurrence of the outer closing element are fatal.
609 ///
610 /// The `source` passed to `poll_advance` should be the same on every
611 /// call.
612 pub(super) fn poll_advance<Io: AsyncBufRead>(
613 &mut self,
614 mut source: Pin<&mut RawXmlStream<Io>>,
615 cx: &mut Context<'_>,
616 ) -> Poll<Result<T, ReadXsoError>> {
617 loop {
618 // Disable text buffering before the start event. That way, we
619 // don't accumulate infinite amounts of XML whitespace caused by
620 // whitespace keepalives.
621 // (And also, we'll know faster when the remote side sends
622 // non-whitespace garbage.)
623 let text_buffering = !matches!(self, ReadXsoState::PreData);
624 source
625 .as_mut()
626 .parser_pinned()
627 .set_text_buffering(text_buffering);
628
629 let ev = ready!(source.as_mut().poll_next(cx)).transpose();
630 match self {
631 ReadXsoState::PreData => {
632 log::trace!("ReadXsoState::PreData ev = {:?}", ev);
633 match ev {
634 Ok(Some(rxml::Event::XmlDeclaration(_, _))) => (),
635 Ok(Some(rxml::Event::Text(_, data))) => {
636 if xso::is_xml_whitespace(data.as_bytes()) {
637 log::trace!("Received {} bytes of whitespace", data.len());
638 source.as_mut().stream_pinned().discard_capture();
639 continue;
640 } else {
641 *self = ReadXsoState::Done;
642 return Poll::Ready(Err(io::Error::new(
643 io::ErrorKind::InvalidData,
644 "non-whitespace text content before XSO",
645 )
646 .into()));
647 }
648 }
649 Ok(Some(rxml::Event::StartElement(_, name, attrs))) => {
650 let source_tmp = source.as_mut();
651 let ctx = xso::Context::empty()
652 .with_language(source_tmp.lang_stack.current());
653 *self = ReadXsoState::Parsing(
654 <Result<T, xso::error::Error> as FromXml>::from_events(
655 name, attrs, &ctx,
656 )
657 .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?,
658 );
659 }
660 // Amounts to EOF, as we expect to start on the stream level.
661 Ok(Some(rxml::Event::EndElement(_))) => {
662 *self = ReadXsoState::Done;
663 return Poll::Ready(Err(ReadXsoError::Footer));
664 }
665 Ok(None) => {
666 *self = ReadXsoState::Done;
667 return Poll::Ready(Err(io::Error::new(
668 io::ErrorKind::InvalidData,
669 "eof before XSO started",
670 )
671 .into()));
672 }
673 Err(RawError::SoftTimeout) => {
674 *self = ReadXsoState::Done;
675 return Poll::Ready(Err(ReadXsoError::SoftTimeout));
676 }
677 Err(RawError::Io(e)) => {
678 *self = ReadXsoState::Done;
679 return Poll::Ready(Err(ReadXsoError::Hard(e)));
680 }
681 }
682 }
683 ReadXsoState::Parsing(builder) => {
684 log::trace!("ReadXsoState::Parsing ev = {:?}", ev);
685 let ev = match ev {
686 Ok(Some(ev)) => ev,
687 Ok(None) => {
688 *self = ReadXsoState::Done;
689 return Poll::Ready(Err(io::Error::new(
690 io::ErrorKind::UnexpectedEof,
691 "eof during XSO parsing",
692 )
693 .into()));
694 }
695 Err(RawError::Io(e)) => {
696 *self = ReadXsoState::Done;
697 return Poll::Ready(Err(e.into()));
698 }
699 Err(RawError::SoftTimeout) => {
700 // See also [`ReadXsoError::SoftTimeout`] for why
701 // we mask the SoftTimeout condition here.
702 *self = ReadXsoState::Done;
703 return Poll::Ready(Err(io::Error::new(
704 io::ErrorKind::TimedOut,
705 "read timeout during XSO parsing",
706 )
707 .into()));
708 }
709 };
710
711 let source_tmp = source.as_mut();
712 let ctx = xso::Context::empty().with_language(source_tmp.lang_stack.current());
713 match builder.feed(ev, &ctx) {
714 Err(err) => {
715 *self = ReadXsoState::Done;
716 source.as_mut().stream_pinned().discard_capture();
717 return Poll::Ready(Err(io::Error::new(
718 io::ErrorKind::InvalidData,
719 err,
720 )
721 .into()));
722 }
723 Ok(Some(Err(err))) => {
724 *self = ReadXsoState::Done;
725 log_recv(Some(&err), source.as_mut().stream_pinned().take_capture());
726 return Poll::Ready(Err(ReadXsoError::Parse(err)));
727 }
728 Ok(Some(Ok(value))) => {
729 *self = ReadXsoState::Done;
730 log_recv(None, source.as_mut().stream_pinned().take_capture());
731 return Poll::Ready(Ok(value));
732 }
733 Ok(None) => (),
734 }
735 }
736
737 // The error talks about "future", simply because that is
738 // where `Self` is used (inside `core::future::Future::poll`).
739 ReadXsoState::Done => panic!("future polled after completion"),
740 }
741 }
742 }
743}
744
745/// Future to read a single XSO from a stream.
746pub(super) struct ReadXso<'x, Io, T: FromXml> {
747 /// Stream to read the future from.
748 inner: Pin<&'x mut RawXmlStream<Io>>,
749
750 /// Current state of parsing.
751 state: ReadXsoState<T>,
752}
753
754impl<'x, Io: AsyncBufRead, T: FromXml> ReadXso<'x, Io, T> {
755 /// Start reading a single XSO from a stream.
756 pub(super) fn read_from(stream: Pin<&'x mut RawXmlStream<Io>>) -> Self {
757 Self {
758 inner: stream,
759 state: ReadXsoState::PreData,
760 }
761 }
762}
763
764impl<Io: AsyncBufRead, T: FromXml> Future for ReadXso<'_, Io, T>
765where
766 T::Builder: Unpin,
767{
768 type Output = Result<T, ReadXsoError>;
769
770 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
771 let this = self.get_mut();
772 this.state.poll_advance(this.inner.as_mut(), cx)
773 }
774}
775
776/// Contains metadata from an XML stream header
777#[derive(Default, Debug)]
778pub struct StreamHeader<'x> {
779 /// The optional `from` attribute.
780 pub from: Option<Cow<'x, str>>,
781
782 /// The optional `to` attribute.
783 pub to: Option<Cow<'x, str>>,
784
785 /// The optional `id` attribute.
786 pub id: Option<Cow<'x, str>>,
787}
788
789impl StreamHeader<'_> {
790 /// Take the contents and return them as new object.
791 ///
792 /// `self` will be left with all its parts set to `None`.
793 pub fn take(&mut self) -> Self {
794 Self {
795 from: self.from.take(),
796 to: self.to.take(),
797 id: self.id.take(),
798 }
799 }
800
801 pub(super) async fn send<Io: AsyncWrite>(
802 self,
803 mut stream: Pin<&mut RawXmlStream<Io>>,
804 ) -> io::Result<()> {
805 stream
806 .send(Item::XmlDeclaration(rxml::XmlVersion::V1_0))
807 .await?;
808 stream
809 .send(Item::ElementHeadStart(
810 Namespace::from(XML_STREAM_NS),
811 Cow::Borrowed(xml_ncname!("stream")),
812 ))
813 .await?;
814 if let Some(from) = self.from {
815 stream
816 .send(Item::Attribute(
817 Namespace::NONE,
818 Cow::Borrowed(xml_ncname!("from")),
819 from,
820 ))
821 .await?;
822 }
823 if let Some(to) = self.to {
824 stream
825 .send(Item::Attribute(
826 Namespace::NONE,
827 Cow::Borrowed(xml_ncname!("to")),
828 to,
829 ))
830 .await?;
831 }
832 if let Some(id) = self.id {
833 stream
834 .send(Item::Attribute(
835 Namespace::NONE,
836 Cow::Borrowed(xml_ncname!("id")),
837 id,
838 ))
839 .await?;
840 }
841 stream
842 .send(Item::Attribute(
843 Namespace::NONE,
844 Cow::Borrowed(xml_ncname!("version")),
845 Cow::Borrowed("1.0"),
846 ))
847 .await?;
848 stream.send(Item::ElementHeadEnd).await?;
849 Ok(())
850 }
851}
852
853impl StreamHeader<'static> {
854 pub(super) async fn recv<Io: AsyncBufRead>(
855 mut stream: Pin<&mut RawXmlStream<Io>>,
856 ) -> io::Result<Self> {
857 loop {
858 match stream.as_mut().next().await {
859 Some(Err(RawError::Io(e))) => return Err(e),
860 Some(Err(RawError::SoftTimeout)) => (),
861 Some(Ok(Event::StartElement(_, (ns, name), mut attrs))) => {
862 if ns != XML_STREAM_NS || name != "stream" {
863 return Err(io::Error::new(
864 io::ErrorKind::InvalidData,
865 "unknown stream header",
866 ));
867 }
868
869 match attrs.remove(Namespace::none(), "version") {
870 Some(v) => {
871 if v != "1.0" {
872 return Err(io::Error::new(
873 io::ErrorKind::InvalidData,
874 format!("unsuppored stream version: {}", v),
875 ));
876 }
877 }
878 None => {
879 return Err(io::Error::new(
880 io::ErrorKind::InvalidData,
881 "required `version` attribute missing",
882 ))
883 }
884 }
885
886 let from = attrs.remove(Namespace::none(), "from");
887 let to = attrs.remove(Namespace::none(), "to");
888 let id = attrs.remove(Namespace::none(), "id");
889 let _ = attrs.remove(Namespace::xml(), "lang");
890
891 if let Some(((ns, name), _)) = attrs.into_iter().next() {
892 return Err(io::Error::new(
893 io::ErrorKind::InvalidData,
894 format!("unexpected stream header attribute: {{{}}}{}", ns, name),
895 ));
896 }
897
898 return Ok(StreamHeader {
899 from: from.map(Cow::Owned),
900 to: to.map(Cow::Owned),
901 id: id.map(Cow::Owned),
902 });
903 }
904 Some(Ok(Event::Text(_, _))) | Some(Ok(Event::EndElement(_))) => {
905 return Err(io::Error::new(
906 io::ErrorKind::UnexpectedEof,
907 "unexpected content before stream header",
908 ))
909 }
910 // We cannot loop infinitely here because the XML parser will
911 // prevent more than one XML declaration from being parsed.
912 Some(Ok(Event::XmlDeclaration(_, _))) => (),
913 None => {
914 return Err(io::Error::new(
915 io::ErrorKind::UnexpectedEof,
916 "eof before stream header",
917 ))
918 }
919 }
920 }
921 }
922}