1use futures::StreamExt;
8use tokio_xmpp::{
9 Event as TokioXmppEvent, Stanza,
10 parsers::{disco::DiscoInfoQuery, iq::Iq, roster::Roster},
11};
12
13use crate::{Agent, Event, iq, message, presence};
14
15pub async fn wait_for_events(agent: &mut Agent) -> Vec<Event> {
17 if let Some(event) = agent.client.next().await {
18 let mut events = Vec::new();
19
20 match event {
21 TokioXmppEvent::Online { resumed: false, .. } => {
22 let presence = presence::send::make_initial_presence(
23 &agent.disco,
24 &agent.get_config().await.website,
25 )
26 .into();
27 let _ = agent.client.send_stanza(presence).await;
28 events.push(Event::Online);
29 let iq = Iq::from_get(
31 "roster",
32 Roster {
33 ver: None,
34 items: vec![],
35 },
36 )
37 .into();
38 let _ = agent.client.send_stanza(iq).await;
39
40 let iq = Iq::from_get("disco-account", DiscoInfoQuery { node: None }).into();
42 let _ = agent.client.send_stanza(iq).await;
43 agent.awaiting_disco_bookmarks_type = true;
44 }
45 TokioXmppEvent::Online { resumed: true, .. } => {}
46 TokioXmppEvent::Disconnected(e) => {
47 events.push(Event::Disconnected(e));
48 }
49 TokioXmppEvent::Stanza(Stanza::Iq(iq)) => {
50 #[cfg(feature = "escape-hatch")]
51 events.push(Event::Iq(iq.clone()));
52
53 let new_events = iq::handle_iq(agent, iq).await;
54 events.extend(new_events);
55 }
56 TokioXmppEvent::Stanza(Stanza::Message(message)) => {
57 #[cfg(feature = "escape-hatch")]
58 events.push(Event::Message(message.clone()));
59
60 let new_events = message::receive::handle_message(agent, message).await;
61 events.extend(new_events);
62 }
63 TokioXmppEvent::Stanza(Stanza::Presence(presence)) => {
64 #[cfg(feature = "escape-hatch")]
65 events.push(Event::Presence(presence.clone()));
66
67 let new_events = presence::receive::handle_presence(agent, presence).await;
68 events.extend(new_events);
69 }
70 }
71
72 events
73 } else {
74 panic!("xmpp::Agent should never receive None event (stream closed, no reconnect)");
78 }
79}