tokio_xmpp/xmlstream/
initiator.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
// Copyright (c) 2024 Jonas Schäfer <jonas@zombofant.net>
//
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.

use core::pin::Pin;
use std::borrow::Cow;
use std::io;

use futures::SinkExt;

use tokio::io::{AsyncBufRead, AsyncWrite};

use xmpp_parsers::stream_features::StreamFeatures;

use xso::{AsXml, FromXml};

use super::{
    common::{RawXmlStream, ReadXso, ReadXsoError, StreamHeader},
    XmlStream,
};

/// Type state for an initiator stream which has not yet sent its stream
/// header.
///
/// To continue stream setup, call [`send_header`][`Self::send_header`].
pub struct InitiatingStream<Io>(pub(super) RawXmlStream<Io>);

impl<Io: AsyncBufRead + AsyncWrite + Unpin> InitiatingStream<Io> {
    /// Send the stream header.
    pub async fn send_header(
        self,
        header: StreamHeader<'_>,
    ) -> io::Result<PendingFeaturesRecv<Io>> {
        let Self(mut stream) = self;

        header.send(Pin::new(&mut stream)).await?;
        stream.flush().await?;
        let header = StreamHeader::recv(Pin::new(&mut stream)).await?;
        Ok(PendingFeaturesRecv { stream, header })
    }
}

/// Type state for an initiator stream which has sent and received the stream
/// header.
///
/// To continue stream setup, call [`recv_features`][`Self::recv_features`].
pub struct PendingFeaturesRecv<Io> {
    pub(super) stream: RawXmlStream<Io>,
    pub(super) header: StreamHeader<'static>,
}

impl<Io> PendingFeaturesRecv<Io> {
    /// The stream header contents as sent by the peer.
    pub fn header(&self) -> StreamHeader<'_> {
        StreamHeader {
            from: self.header.from.as_ref().map(|x| Cow::Borrowed(&**x)),
            to: self.header.to.as_ref().map(|x| Cow::Borrowed(&**x)),
            id: self.header.id.as_ref().map(|x| Cow::Borrowed(&**x)),
        }
    }

    /// Extract the stream header contents as sent by the peer.
    pub fn take_header(&mut self) -> StreamHeader<'static> {
        self.header.take()
    }
}

impl<Io: AsyncBufRead + AsyncWrite + Unpin> PendingFeaturesRecv<Io> {
    /// Receive the responder's stream features.
    ///
    /// After the stream features have been received, the stream can be used
    /// for exchanging stream-level elements (stanzas or "nonzas"). The Rust
    /// type for these elements must be given as type parameter `T`.
    pub async fn recv_features<T: FromXml + AsXml>(
        self,
    ) -> io::Result<(StreamFeatures, XmlStream<Io, T>)> {
        let Self {
            mut stream,
            header: _,
        } = self;
        let features = loop {
            match ReadXso::read_from(Pin::new(&mut stream)).await {
                Ok(v) => break v,
                Err(ReadXsoError::SoftTimeout) => (),
                Err(ReadXsoError::Hard(e)) => return Err(e),
                Err(ReadXsoError::Parse(e)) => {
                    return Err(io::Error::new(io::ErrorKind::InvalidData, e))
                }
                Err(ReadXsoError::Footer) => {
                    return Err(io::Error::new(
                        io::ErrorKind::UnexpectedEof,
                        "unexpected stream footer",
                    ))
                }
            }
        };
        Ok((features, XmlStream::wrap(stream)))
    }

    /// Skip receiving the responder's stream features.
    ///
    /// The stream can be used for exchanging stream-level elements (stanzas
    /// or "nonzas"). The Rust type for these elements must be given as type
    /// parameter `T`.
    ///
    /// **Note:** Using this on RFC 6120 compliant streams where stream
    /// features **are** sent after the stream header will cause a parse error
    /// down the road (because the feature stream element cannot be handled).
    /// The only place where this is useful is in
    /// [XEP-0114](https://xmpp.org/extensions/xep-0114.html) connections.
    pub fn skip_features<T: FromXml + AsXml>(self) -> XmlStream<Io, T> {
        XmlStream::wrap(self.stream)
    }
}