xmpp/event_loop.rs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67
// Copyright (c) 2023 xmpp-rs contributors.
//
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
use futures::StreamExt;
use tokio_xmpp::{
parsers::{disco::DiscoInfoQuery, iq::Iq, roster::Roster},
Event as TokioXmppEvent, Stanza,
};
use crate::{iq, message, presence, Agent, Event};
/// Wait for new events, or Error::Disconnected when stream is closed and will not reconnect.
pub async fn wait_for_events(agent: &mut Agent) -> Vec<Event> {
if let Some(event) = agent.client.next().await {
let mut events = Vec::new();
match event {
TokioXmppEvent::Online { resumed: false, .. } => {
let presence =
presence::send::make_initial_presence(&agent.disco, &agent.node).into();
let _ = agent.client.send_stanza(presence).await;
events.push(Event::Online);
// TODO: only send this when the ContactList feature is enabled.
let iq = Iq::from_get(
"roster",
Roster {
ver: None,
items: vec![],
},
)
.into();
let _ = agent.client.send_stanza(iq).await;
// Query account disco to know what bookmarks spec is used
let iq = Iq::from_get("disco-account", DiscoInfoQuery { node: None }).into();
let _ = agent.client.send_stanza(iq).await;
agent.awaiting_disco_bookmarks_type = true;
}
TokioXmppEvent::Online { resumed: true, .. } => {}
TokioXmppEvent::Disconnected(e) => {
events.push(Event::Disconnected(e));
}
TokioXmppEvent::Stanza(Stanza::Iq(iq)) => {
let new_events = iq::handle_iq(agent, iq).await;
events.extend(new_events);
}
TokioXmppEvent::Stanza(Stanza::Message(message)) => {
let new_events = message::receive::handle_message(agent, message).await;
events.extend(new_events);
}
TokioXmppEvent::Stanza(Stanza::Presence(presence)) => {
let new_events = presence::receive::handle_presence(agent, presence).await;
events.extend(new_events);
}
}
events
} else {
// Stream was closed and not opening again because TokioXmppClient reconnect is false
// However we set reconnect true in agent builder so this should never happen and indicates
// logic error in tokio_xmpp::AsyncClient::poll_next
panic!("xmpp::Agent should never receive None event (stream closed, no reconnect)");
}
}