tokio_xmpp/client/
stream.rs1use core::ops::ControlFlow;
8use core::{pin::Pin, task::Context};
9use futures::{ready, task::Poll, Stream};
10
11use crate::{
12 client::Client,
13 stanzastream::{Event as StanzaStreamEvent, StreamEvent},
14 Event, Stanza,
15};
16
17impl Stream for Client {
22 type Item = Event;
23
24 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
35 loop {
36 return Poll::Ready(match ready!(Pin::new(&mut self.stream).poll_next(cx)) {
37 None => None,
38 Some(StanzaStreamEvent::Stanza(st)) => match st {
39 Stanza::Iq(iq) => match self.iq_response_tracker.handle_iq(iq) {
40 ControlFlow::Break(()) => continue,
41 ControlFlow::Continue(iq) => Some(Event::Stanza(Stanza::Iq(iq))),
42 },
43 other => Some(Event::Stanza(other)),
44 },
45 Some(StanzaStreamEvent::Stream(StreamEvent::Reset {
46 bound_jid,
47 features,
48 })) => {
49 self.features = Some(features);
50 self.bound_jid = Some(bound_jid.clone());
51
52 self.iq_response_tracker
53 .set_account_jid(bound_jid.to_bare());
54
55 Some(Event::Online {
56 bound_jid,
57 resumed: false,
58 })
59 }
60 Some(StanzaStreamEvent::Stream(StreamEvent::Resumed)) => Some(Event::Online {
61 bound_jid: self.bound_jid.as_ref().unwrap().clone(),
62 resumed: true,
63 }),
64 Some(StanzaStreamEvent::Stream(StreamEvent::Suspended)) => continue,
65 });
66 }
67 }
68}