tokio_xmpp/client/
stream.rs

1// Copyright (c) 2019 Emmanuel Gil Peyrot <linkmauve@linkmauve.fr>
2//
3// This Source Code Form is subject to the terms of the Mozilla Public
4// License, v. 2.0. If a copy of the MPL was not distributed with this
5// file, You can obtain one at http://mozilla.org/MPL/2.0/.
6
7use core::ops::ControlFlow;
8use core::{pin::Pin, task::Context};
9use futures::{ready, task::Poll, Stream};
10
11use crate::{
12    client::Client,
13    stanzastream::{Event as StanzaStreamEvent, StreamEvent},
14    Event, Stanza,
15};
16
17/// Incoming XMPP events
18///
19/// In an `async fn` you may want to use this with `use
20/// futures::stream::StreamExt;`
21impl Stream for Client {
22    type Item = Event;
23
24    /// Low-level read on the XMPP stream, allowing the underlying
25    /// machinery to:
26    ///
27    /// * connect,
28    /// * starttls,
29    /// * authenticate,
30    /// * bind a session, and finally
31    /// * receive stanzas
32    ///
33    /// ...for your client
34    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
35        loop {
36            return Poll::Ready(match ready!(Pin::new(&mut self.stream).poll_next(cx)) {
37                None => None,
38                Some(StanzaStreamEvent::Stanza(st)) => match st {
39                    Stanza::Iq(iq) => match self.iq_response_tracker.handle_iq(iq) {
40                        ControlFlow::Break(()) => continue,
41                        ControlFlow::Continue(iq) => Some(Event::Stanza(Stanza::Iq(iq))),
42                    },
43                    other => Some(Event::Stanza(other)),
44                },
45                Some(StanzaStreamEvent::Stream(StreamEvent::Reset {
46                    bound_jid,
47                    features,
48                })) => {
49                    self.features = Some(features);
50                    self.bound_jid = Some(bound_jid.clone());
51                    Some(Event::Online {
52                        bound_jid,
53                        resumed: false,
54                    })
55                }
56                Some(StanzaStreamEvent::Stream(StreamEvent::Resumed)) => Some(Event::Online {
57                    bound_jid: self.bound_jid.as_ref().unwrap().clone(),
58                    resumed: true,
59                }),
60                Some(StanzaStreamEvent::Stream(StreamEvent::Suspended)) => continue,
61            });
62        }
63    }
64}