tokio_xmpp/component/
stream.rs1use futures::{task::Poll, Sink, Stream};
5use std::pin::Pin;
6use std::task::Context;
7
8use crate::{
9 component::Component,
10 connect::ServerConnector,
11 xmlstream::{XmppStream, XmppStreamElement},
12 Error, Stanza,
13};
14
15impl<C: ServerConnector> Stream for Component<C> {
16 type Item = Stanza;
17
18 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
19 loop {
20 match Pin::new(&mut self.stream).poll_next(cx) {
21 Poll::Ready(Some(Ok(XmppStreamElement::Stanza(stanza)))) => {
22 return Poll::Ready(Some(stanza))
23 }
24 Poll::Ready(Some(Ok(_))) =>
25 {
27 return Poll::Ready(None)
28 }
29 Poll::Ready(Some(Err(_))) => return Poll::Ready(None),
30 Poll::Ready(None) => return Poll::Ready(None),
31 Poll::Pending => return Poll::Pending,
32 }
33 }
34 }
35}
36
37impl<C: ServerConnector> Sink<Stanza> for Component<C> {
38 type Error = Error;
39
40 fn start_send(mut self: Pin<&mut Self>, item: Stanza) -> Result<(), Self::Error> {
41 Pin::new(&mut self.stream)
42 .start_send(&item)
43 .map_err(|e| e.into())
44 }
45
46 fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
47 <XmppStream<C::Stream> as Sink<&XmppStreamElement>>::poll_ready(
48 Pin::new(&mut self.stream),
49 cx,
50 )
51 .map_err(|e| e.into())
52 }
53
54 fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
55 <XmppStream<C::Stream> as Sink<&XmppStreamElement>>::poll_flush(
56 Pin::new(&mut self.stream),
57 cx,
58 )
59 .map_err(|e| e.into())
60 }
61
62 fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
63 <XmppStream<C::Stream> as Sink<&XmppStreamElement>>::poll_close(
64 Pin::new(&mut self.stream),
65 cx,
66 )
67 .map_err(|e| e.into())
68 }
69}