tokio_xmpp/component/
stream.rs1use futures::{task::Poll, Sink, Stream};
5use std::pin::Pin;
6use std::task::Context;
7
8use crate::{
9 component::Component,
10 connect::ServerConnector,
11 xmlstream::{XmppStream, XmppStreamElement},
12 Error, Stanza,
13};
14
15impl<C: ServerConnector> Stream for Component<C> {
16 type Item = Stanza;
17
18 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
19 loop {
20 match Pin::new(&mut self.stream).poll_next(cx) {
21 Poll::Ready(v) => match v.map(|v| v.map(|v| v.into_read_error()).flatten()) {
22 Some(Ok(XmppStreamElement::Stanza(stanza))) => {
23 return Poll::Ready(Some(stanza))
24 }
25 Some(Ok(_)) =>
26 {
28 return Poll::Ready(None)
29 }
30 Some(Err(_)) => return Poll::Ready(None),
31 None => return Poll::Ready(None),
32 },
33 Poll::Pending => return Poll::Pending,
34 }
35 }
36 }
37}
38
39impl<C: ServerConnector> Sink<Stanza> for Component<C> {
40 type Error = Error;
41
42 fn start_send(mut self: Pin<&mut Self>, item: Stanza) -> Result<(), Self::Error> {
43 Pin::new(&mut self.stream)
44 .start_send(&item)
45 .map_err(|e| e.into())
46 }
47
48 fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
49 <XmppStream<C::Stream> as Sink<&XmppStreamElement>>::poll_ready(
50 Pin::new(&mut self.stream),
51 cx,
52 )
53 .map_err(|e| e.into())
54 }
55
56 fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
57 <XmppStream<C::Stream> as Sink<&XmppStreamElement>>::poll_flush(
58 Pin::new(&mut self.stream),
59 cx,
60 )
61 .map_err(|e| e.into())
62 }
63
64 fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
65 <XmppStream<C::Stream> as Sink<&XmppStreamElement>>::poll_close(
66 Pin::new(&mut self.stream),
67 cx,
68 )
69 .map_err(|e| e.into())
70 }
71}