tokio_xmpp/xmlstream/
initiator.rs

1// Copyright (c) 2024 Jonas Schäfer <jonas@zombofant.net>
2//
3// This Source Code Form is subject to the terms of the Mozilla Public
4// License, v. 2.0. If a copy of the MPL was not distributed with this
5// file, You can obtain one at http://mozilla.org/MPL/2.0/.
6
7use core::fmt;
8use core::pin::Pin;
9use std::borrow::Cow;
10use std::io;
11
12use futures::SinkExt;
13
14use tokio::io::{AsyncBufRead, AsyncWrite};
15
16use xmpp_parsers::{
17    stream_error::{ReceivedStreamError, StreamError},
18    stream_features::StreamFeatures,
19};
20
21use xso::{AsXml, FromXml};
22
23use super::{
24    common::{RawXmlStream, ReadXso, ReadXsoError, StreamHeader},
25    XmlStream,
26};
27
28/// Type state for an initiator stream which has not yet sent its stream
29/// header.
30///
31/// To continue stream setup, call [`send_header`][`Self::send_header`].
32pub struct InitiatingStream<Io>(pub(super) RawXmlStream<Io>);
33
34impl<Io: AsyncBufRead + AsyncWrite + Unpin> InitiatingStream<Io> {
35    /// Send the stream header.
36    pub async fn send_header(
37        self,
38        header: StreamHeader<'_>,
39    ) -> io::Result<PendingFeaturesRecv<Io>> {
40        let Self(mut stream) = self;
41
42        header.send(Pin::new(&mut stream)).await?;
43        stream.flush().await?;
44        let header = StreamHeader::recv(Pin::new(&mut stream)).await?;
45        Ok(PendingFeaturesRecv { stream, header })
46    }
47}
48
49#[derive(xso::FromXml)]
50#[xml()]
51enum StreamFeaturesPayload {
52    #[xml(transparent)]
53    Features(StreamFeatures),
54    #[xml(transparent)]
55    Error(StreamError),
56}
57
58/// Error conditions when receiving stream features
59#[derive(Debug)]
60pub enum RecvFeaturesError {
61    /// I/o error while receiving stream features
62    Io(io::Error),
63
64    /// Received a stream error instead of stream features
65    StreamError(ReceivedStreamError),
66}
67
68impl fmt::Display for RecvFeaturesError {
69    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
70        match self {
71            Self::Io(e) => write!(f, "i/o error: {e}"),
72            Self::StreamError(e) => fmt::Display::fmt(&e, f),
73        }
74    }
75}
76
77impl core::error::Error for RecvFeaturesError {
78    fn source(&self) -> Option<&(dyn core::error::Error + 'static)> {
79        match self {
80            Self::Io(e) => Some(e),
81            Self::StreamError(e) => Some(e),
82        }
83    }
84}
85
86impl From<io::Error> for RecvFeaturesError {
87    fn from(other: io::Error) -> Self {
88        Self::Io(other)
89    }
90}
91
92/// Type state for an initiator stream which has sent and received the stream
93/// header.
94///
95/// To continue stream setup, call [`recv_features`][`Self::recv_features`].
96pub struct PendingFeaturesRecv<Io> {
97    pub(super) stream: RawXmlStream<Io>,
98    pub(super) header: StreamHeader<'static>,
99}
100
101impl<Io> PendingFeaturesRecv<Io> {
102    /// The stream header contents as sent by the peer.
103    pub fn header(&self) -> StreamHeader<'_> {
104        StreamHeader {
105            from: self.header.from.as_deref().map(Cow::Borrowed),
106            to: self.header.to.as_deref().map(Cow::Borrowed),
107            id: self.header.id.as_deref().map(Cow::Borrowed),
108        }
109    }
110
111    /// Extract the stream header contents as sent by the peer.
112    pub fn take_header(&mut self) -> StreamHeader<'static> {
113        self.header.take()
114    }
115}
116
117impl<Io: AsyncBufRead + AsyncWrite + Unpin> PendingFeaturesRecv<Io> {
118    /// Receive the responder's stream features.
119    ///
120    /// After the stream features have been received, the stream can be used
121    /// for exchanging stream-level elements (stanzas or "nonzas"). The Rust
122    /// type for these elements must be given as type parameter `T`.
123    ///
124    /// If the peer sends a stream error instead of features, the error is
125    /// returned as [`RecvFeaturesError::StreamError`].
126    ///
127    /// If the peer sends any payload which is neither stream features nor
128    /// a stream error, an [`io::Error`][`std::io::Error`] with
129    /// [`InvalidData`][`io::ErrorKind::InvalidData`] kind is returned.
130    pub async fn recv_features<T: FromXml + AsXml>(
131        self,
132    ) -> Result<(StreamFeatures, XmlStream<Io, T>), RecvFeaturesError> {
133        let Self {
134            mut stream,
135            header: _,
136        } = self;
137        let features = loop {
138            match ReadXso::read_from(Pin::new(&mut stream)).await {
139                Ok(StreamFeaturesPayload::Features(v)) => break v,
140                Ok(StreamFeaturesPayload::Error(v)) => {
141                    return Err(RecvFeaturesError::StreamError(ReceivedStreamError(v)))
142                }
143                Err(ReadXsoError::SoftTimeout) => (),
144                Err(ReadXsoError::Hard(e)) => return Err(RecvFeaturesError::Io(e)),
145                Err(ReadXsoError::Parse(e)) => {
146                    return Err(RecvFeaturesError::Io(io::Error::new(
147                        io::ErrorKind::InvalidData,
148                        e,
149                    )))
150                }
151                Err(ReadXsoError::Footer) => {
152                    return Err(RecvFeaturesError::Io(io::Error::new(
153                        io::ErrorKind::UnexpectedEof,
154                        "unexpected stream footer",
155                    )))
156                }
157            }
158        };
159        Ok((features, XmlStream::wrap(stream)))
160    }
161
162    /// Skip receiving the responder's stream features.
163    ///
164    /// The stream can be used for exchanging stream-level elements (stanzas
165    /// or "nonzas"). The Rust type for these elements must be given as type
166    /// parameter `T`.
167    ///
168    /// **Note:** Using this on RFC 6120 compliant streams where stream
169    /// features **are** sent after the stream header will cause a parse error
170    /// down the road (because the feature stream element cannot be handled).
171    /// The only place where this is useful is in
172    /// [XEP-0114](https://xmpp.org/extensions/xep-0114.html) connections.
173    pub fn skip_features<T: FromXml + AsXml>(self) -> XmlStream<Io, T> {
174        XmlStream::wrap(self.stream)
175    }
176}