Skip to main content

tokio_xmpp/component/
stream.rs

1//! Components in XMPP are services/gateways that are logged into an
2//! XMPP server under a JID consisting of just a domain name. They are
3//! allowed to use any user and resource identifiers in their stanzas.
4use futures::{task::Poll, Sink, Stream};
5use std::pin::Pin;
6use std::task::Context;
7
8use crate::{
9    component::Component,
10    connect::ServerConnector,
11    xmlstream::{XmppStream, XmppStreamElement},
12    Error, Stanza,
13};
14
15impl<C: ServerConnector> Stream for Component<C> {
16    type Item = Stanza;
17
18    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
19        loop {
20            match Pin::new(&mut self.stream).poll_next(cx) {
21                Poll::Ready(v) => match v.map(|v| v.map(|v| v.into_read_error()).flatten()) {
22                    Some(Ok(XmppStreamElement::Stanza(stanza))) => {
23                        return Poll::Ready(Some(stanza))
24                    }
25                    Some(Ok(_)) =>
26                    // unexpected
27                    {
28                        return Poll::Ready(None)
29                    }
30                    Some(Err(_)) => return Poll::Ready(None),
31                    None => return Poll::Ready(None),
32                },
33                Poll::Pending => return Poll::Pending,
34            }
35        }
36    }
37}
38
39impl<C: ServerConnector> Sink<Stanza> for Component<C> {
40    type Error = Error;
41
42    fn start_send(mut self: Pin<&mut Self>, item: Stanza) -> Result<(), Self::Error> {
43        Pin::new(&mut self.stream)
44            .start_send(&item)
45            .map_err(|e| e.into())
46    }
47
48    fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
49        <XmppStream<C::Stream> as Sink<&XmppStreamElement>>::poll_ready(
50            Pin::new(&mut self.stream),
51            cx,
52        )
53        .map_err(|e| e.into())
54    }
55
56    fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
57        <XmppStream<C::Stream> as Sink<&XmppStreamElement>>::poll_flush(
58            Pin::new(&mut self.stream),
59            cx,
60        )
61        .map_err(|e| e.into())
62    }
63
64    fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
65        <XmppStream<C::Stream> as Sink<&XmppStreamElement>>::poll_close(
66            Pin::new(&mut self.stream),
67            cx,
68        )
69        .map_err(|e| e.into())
70    }
71}