tokio_xmpp/xmlstream/
initiator.rs

1// Copyright (c) 2024 Jonas Schäfer <jonas@zombofant.net>
2//
3// This Source Code Form is subject to the terms of the Mozilla Public
4// License, v. 2.0. If a copy of the MPL was not distributed with this
5// file, You can obtain one at http://mozilla.org/MPL/2.0/.
6
7use core::pin::Pin;
8use std::borrow::Cow;
9use std::io;
10
11use futures::SinkExt;
12
13use tokio::io::{AsyncBufRead, AsyncWrite};
14
15use xmpp_parsers::stream_features::StreamFeatures;
16
17use xso::{AsXml, FromXml};
18
19use super::{
20    common::{RawXmlStream, ReadXso, ReadXsoError, StreamHeader},
21    XmlStream,
22};
23
24/// Type state for an initiator stream which has not yet sent its stream
25/// header.
26///
27/// To continue stream setup, call [`send_header`][`Self::send_header`].
28pub struct InitiatingStream<Io>(pub(super) RawXmlStream<Io>);
29
30impl<Io: AsyncBufRead + AsyncWrite + Unpin> InitiatingStream<Io> {
31    /// Send the stream header.
32    pub async fn send_header(
33        self,
34        header: StreamHeader<'_>,
35    ) -> io::Result<PendingFeaturesRecv<Io>> {
36        let Self(mut stream) = self;
37
38        header.send(Pin::new(&mut stream)).await?;
39        stream.flush().await?;
40        let header = StreamHeader::recv(Pin::new(&mut stream)).await?;
41        Ok(PendingFeaturesRecv { stream, header })
42    }
43}
44
45/// Type state for an initiator stream which has sent and received the stream
46/// header.
47///
48/// To continue stream setup, call [`recv_features`][`Self::recv_features`].
49pub struct PendingFeaturesRecv<Io> {
50    pub(super) stream: RawXmlStream<Io>,
51    pub(super) header: StreamHeader<'static>,
52}
53
54impl<Io> PendingFeaturesRecv<Io> {
55    /// The stream header contents as sent by the peer.
56    pub fn header(&self) -> StreamHeader<'_> {
57        StreamHeader {
58            from: self.header.from.as_ref().map(|x| Cow::Borrowed(&**x)),
59            to: self.header.to.as_ref().map(|x| Cow::Borrowed(&**x)),
60            id: self.header.id.as_ref().map(|x| Cow::Borrowed(&**x)),
61        }
62    }
63
64    /// Extract the stream header contents as sent by the peer.
65    pub fn take_header(&mut self) -> StreamHeader<'static> {
66        self.header.take()
67    }
68}
69
70impl<Io: AsyncBufRead + AsyncWrite + Unpin> PendingFeaturesRecv<Io> {
71    /// Receive the responder's stream features.
72    ///
73    /// After the stream features have been received, the stream can be used
74    /// for exchanging stream-level elements (stanzas or "nonzas"). The Rust
75    /// type for these elements must be given as type parameter `T`.
76    pub async fn recv_features<T: FromXml + AsXml>(
77        self,
78    ) -> io::Result<(StreamFeatures, XmlStream<Io, T>)> {
79        let Self {
80            mut stream,
81            header: _,
82        } = self;
83        let features = loop {
84            match ReadXso::read_from(Pin::new(&mut stream)).await {
85                Ok(v) => break v,
86                Err(ReadXsoError::SoftTimeout) => (),
87                Err(ReadXsoError::Hard(e)) => return Err(e),
88                Err(ReadXsoError::Parse(e)) => {
89                    return Err(io::Error::new(io::ErrorKind::InvalidData, e))
90                }
91                Err(ReadXsoError::Footer) => {
92                    return Err(io::Error::new(
93                        io::ErrorKind::UnexpectedEof,
94                        "unexpected stream footer",
95                    ))
96                }
97            }
98        };
99        Ok((features, XmlStream::wrap(stream)))
100    }
101
102    /// Skip receiving the responder's stream features.
103    ///
104    /// The stream can be used for exchanging stream-level elements (stanzas
105    /// or "nonzas"). The Rust type for these elements must be given as type
106    /// parameter `T`.
107    ///
108    /// **Note:** Using this on RFC 6120 compliant streams where stream
109    /// features **are** sent after the stream header will cause a parse error
110    /// down the road (because the feature stream element cannot be handled).
111    /// The only place where this is useful is in
112    /// [XEP-0114](https://xmpp.org/extensions/xep-0114.html) connections.
113    pub fn skip_features<T: FromXml + AsXml>(self) -> XmlStream<Io, T> {
114        XmlStream::wrap(self.stream)
115    }
116}