xmpp/
event_loop.rs

1// Copyright (c) 2023 xmpp-rs contributors.
2//
3// This Source Code Form is subject to the terms of the Mozilla Public
4// License, v. 2.0. If a copy of the MPL was not distributed with this
5// file, You can obtain one at http://mozilla.org/MPL/2.0/.
6
7use futures::StreamExt;
8use tokio_xmpp::{
9    parsers::{disco::DiscoInfoQuery, iq::Iq, roster::Roster},
10    Event as TokioXmppEvent, Stanza,
11};
12
13use crate::{iq, message, presence, Agent, Event};
14
15/// Wait for new events, or Error::Disconnected when stream is closed and will not reconnect.
16pub async fn wait_for_events(agent: &mut Agent) -> Vec<Event> {
17    if let Some(event) = agent.client.next().await {
18        let mut events = Vec::new();
19
20        match event {
21            TokioXmppEvent::Online { resumed: false, .. } => {
22                let presence =
23                    presence::send::make_initial_presence(&agent.disco, &agent.node).into();
24                let _ = agent.client.send_stanza(presence).await;
25                events.push(Event::Online);
26                // TODO: only send this when the ContactList feature is enabled.
27                let iq = Iq::from_get(
28                    "roster",
29                    Roster {
30                        ver: None,
31                        items: vec![],
32                    },
33                )
34                .into();
35                let _ = agent.client.send_stanza(iq).await;
36
37                // Query account disco to know what bookmarks spec is used
38                let iq = Iq::from_get("disco-account", DiscoInfoQuery { node: None }).into();
39                let _ = agent.client.send_stanza(iq).await;
40                agent.awaiting_disco_bookmarks_type = true;
41            }
42            TokioXmppEvent::Online { resumed: true, .. } => {}
43            TokioXmppEvent::Disconnected(e) => {
44                events.push(Event::Disconnected(e));
45            }
46            TokioXmppEvent::Stanza(Stanza::Iq(iq)) => {
47                #[cfg(feature = "escape-hatch")]
48                events.push(Event::Iq(iq.clone()));
49
50                let new_events = iq::handle_iq(agent, iq).await;
51                events.extend(new_events);
52            }
53            TokioXmppEvent::Stanza(Stanza::Message(message)) => {
54                #[cfg(feature = "escape-hatch")]
55                events.push(Event::Message(message.clone()));
56
57                let new_events = message::receive::handle_message(agent, message).await;
58                events.extend(new_events);
59            }
60            TokioXmppEvent::Stanza(Stanza::Presence(presence)) => {
61                #[cfg(feature = "escape-hatch")]
62                events.push(Event::Presence(presence.clone()));
63
64                let new_events = presence::receive::handle_presence(agent, presence).await;
65                events.extend(new_events);
66            }
67        }
68
69        events
70    } else {
71        // Stream was closed and not opening again because TokioXmppClient reconnect is false
72        // However we set reconnect true in agent builder so this should never happen and indicates
73        // logic error in tokio_xmpp::AsyncClient::poll_next
74        panic!("xmpp::Agent should never receive None event (stream closed, no reconnect)");
75    }
76}