tokio_xmpp/xmlstream/
initiator.rs1use core::fmt;
8use core::pin::Pin;
9use std::borrow::Cow;
10use std::io;
11
12use futures::SinkExt;
13
14use tokio::io::{AsyncBufRead, AsyncWrite};
15
16use xmpp_parsers::{
17 stream_error::{ReceivedStreamError, StreamError},
18 stream_features::StreamFeatures,
19};
20
21use xso::{AsXml, FromXml};
22
23use super::{
24 common::{RawXmlStream, ReadXso, ReadXsoError, StreamHeader},
25 XmlStream,
26};
27
28pub struct InitiatingStream<Io>(pub(super) RawXmlStream<Io>);
33
34impl<Io: AsyncBufRead + AsyncWrite + Unpin> InitiatingStream<Io> {
35 pub async fn send_header(
37 self,
38 header: StreamHeader<'_>,
39 ) -> io::Result<PendingFeaturesRecv<Io>> {
40 let Self(mut stream) = self;
41
42 header.send(Pin::new(&mut stream)).await?;
43 stream.flush().await?;
44 let header = StreamHeader::recv(Pin::new(&mut stream)).await?;
45 Ok(PendingFeaturesRecv { stream, header })
46 }
47}
48
49#[derive(xso::FromXml)]
50#[xml()]
51enum StreamFeaturesPayload {
52 #[xml(transparent)]
53 Features(StreamFeatures),
54 #[xml(transparent)]
55 Error(StreamError),
56}
57
58#[derive(Debug)]
60pub enum RecvFeaturesError {
61 Io(io::Error),
63
64 StreamError(ReceivedStreamError),
66}
67
68impl fmt::Display for RecvFeaturesError {
69 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
70 match self {
71 Self::Io(e) => write!(f, "i/o error: {e}"),
72 Self::StreamError(e) => fmt::Display::fmt(&e, f),
73 }
74 }
75}
76
77impl core::error::Error for RecvFeaturesError {
78 fn source(&self) -> Option<&(dyn core::error::Error + 'static)> {
79 match self {
80 Self::Io(e) => Some(e),
81 Self::StreamError(e) => Some(e),
82 }
83 }
84}
85
86impl From<io::Error> for RecvFeaturesError {
87 fn from(other: io::Error) -> Self {
88 Self::Io(other)
89 }
90}
91
92pub struct PendingFeaturesRecv<Io> {
97 pub(super) stream: RawXmlStream<Io>,
98 pub(super) header: StreamHeader<'static>,
99}
100
101impl<Io> PendingFeaturesRecv<Io> {
102 pub fn header(&self) -> StreamHeader<'_> {
104 StreamHeader {
105 from: self.header.from.as_deref().map(Cow::Borrowed),
106 to: self.header.to.as_deref().map(Cow::Borrowed),
107 id: self.header.id.as_deref().map(Cow::Borrowed),
108 }
109 }
110
111 pub fn take_header(&mut self) -> StreamHeader<'static> {
113 self.header.take()
114 }
115}
116
117impl<Io: AsyncBufRead + AsyncWrite + Unpin> PendingFeaturesRecv<Io> {
118 pub async fn recv_features<T: FromXml + AsXml>(
131 self,
132 ) -> Result<(StreamFeatures, XmlStream<Io, T>), RecvFeaturesError> {
133 let Self {
134 mut stream,
135 header: _,
136 } = self;
137 let features = loop {
138 match ReadXso::read_from(Pin::new(&mut stream)).await {
139 Ok(StreamFeaturesPayload::Features(v)) => break v,
140 Ok(StreamFeaturesPayload::Error(v)) => {
141 return Err(RecvFeaturesError::StreamError(ReceivedStreamError(v)))
142 }
143 Err(ReadXsoError::SoftTimeout) => (),
144 Err(ReadXsoError::Hard(e)) => return Err(RecvFeaturesError::Io(e)),
145 Err(ReadXsoError::Parse(e)) => {
146 return Err(RecvFeaturesError::Io(io::Error::new(
147 io::ErrorKind::InvalidData,
148 e,
149 )))
150 }
151 Err(ReadXsoError::Footer) => {
152 return Err(RecvFeaturesError::Io(io::Error::new(
153 io::ErrorKind::UnexpectedEof,
154 "unexpected stream footer",
155 )))
156 }
157 }
158 };
159 Ok((features, XmlStream::wrap(stream)))
160 }
161
162 pub fn skip_features<T: FromXml + AsXml>(self) -> XmlStream<Io, T> {
174 XmlStream::wrap(self.stream)
175 }
176}