tokio_xmpp/component/
stream.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
//! Components in XMPP are services/gateways that are logged into an
//! XMPP server under a JID consisting of just a domain name. They are
//! allowed to use any user and resource identifiers in their stanzas.
use futures::{task::Poll, Sink, Stream};
use std::pin::Pin;
use std::task::Context;

use crate::{
    component::Component,
    connect::ServerConnector,
    xmlstream::{XmppStream, XmppStreamElement},
    Error, Stanza,
};

impl<C: ServerConnector> Stream for Component<C> {
    type Item = Stanza;

    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
        loop {
            match Pin::new(&mut self.stream).poll_next(cx) {
                Poll::Ready(Some(Ok(XmppStreamElement::Stanza(stanza)))) => {
                    return Poll::Ready(Some(stanza))
                }
                Poll::Ready(Some(Ok(_))) =>
                // unexpected
                {
                    return Poll::Ready(None)
                }
                Poll::Ready(Some(Err(_))) => return Poll::Ready(None),
                Poll::Ready(None) => return Poll::Ready(None),
                Poll::Pending => return Poll::Pending,
            }
        }
    }
}

impl<C: ServerConnector> Sink<Stanza> for Component<C> {
    type Error = Error;

    fn start_send(mut self: Pin<&mut Self>, item: Stanza) -> Result<(), Self::Error> {
        Pin::new(&mut self.stream)
            .start_send(&item)
            .map_err(|e| e.into())
    }

    fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
        <XmppStream<C::Stream> as Sink<&XmppStreamElement>>::poll_ready(
            Pin::new(&mut self.stream),
            cx,
        )
        .map_err(|e| e.into())
    }

    fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
        <XmppStream<C::Stream> as Sink<&XmppStreamElement>>::poll_flush(
            Pin::new(&mut self.stream),
            cx,
        )
        .map_err(|e| e.into())
    }

    fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
        <XmppStream<C::Stream> as Sink<&XmppStreamElement>>::poll_close(
            Pin::new(&mut self.stream),
            cx,
        )
        .map_err(|e| e.into())
    }
}