1use std::io;
8
9use crate::jid::Jid;
10use futures::stream::StreamExt;
11use tokio::{
12 sync::mpsc::{UnboundedReceiver, UnboundedSender},
13 sync::oneshot,
14 time::{timeout, Duration},
15};
16use tokio_xmpp::{
17 parsers::{message::Message, presence::Presence},
18 Client as TokioXmppClient, Event as TokioXmppEvent, IqFailure, IqRequest, IqResponse,
19 IqResponseToken, Stanza,
20};
21
22use crate::Event;
23
24#[derive(Debug)]
25pub enum Request {
26 SendMessage {
27 message: Message,
28 response: oneshot::Sender<io::Result<()>>,
29 },
30 SendPresence {
31 presence: Presence,
32 response: oneshot::Sender<io::Result<()>>,
33 },
34 SendIq {
35 to: Option<Jid>,
36 data: IqRequest,
37 response: oneshot::Sender<Result<IqResponse, IqFailure>>,
38 },
39 Disconnect {
40 response: oneshot::Sender<io::Result<()>>,
41 },
42}
43
44#[derive(Debug)]
45pub enum NonTransactional {
46 Presence(Presence),
47 Message(Message),
48}
49
50impl From<Message> for NonTransactional {
51 fn from(message: Message) -> Self {
52 Self::Message(message)
53 }
54}
55
56impl From<Presence> for NonTransactional {
57 fn from(presence: Presence) -> Self {
58 Self::Presence(presence)
59 }
60}
61
62pub(crate) enum RequestAction {
63 Noop,
64 IqTracking {
65 token: IqResponseToken,
66 response: oneshot::Sender<Result<IqResponse, IqFailure>>,
67 },
68 Disconnect,
69 LostClient,
70}
71
72pub(crate) async fn handle_requests(
73 client: &mut TokioXmppClient,
74 req: Option<Request>,
75) -> RequestAction {
76 match req {
77 Some(Request::Disconnect { response }) => {
78 let _ = response.send(Ok(()));
79 RequestAction::Disconnect
80 }
81 Some(Request::SendPresence { presence, response }) => {
82 let _ = client.send_stanza(presence.into()).await;
83 let _ = response.send(Ok(()));
84 RequestAction::Noop
85 }
86 Some(Request::SendMessage { message, response }) => {
87 let _ = client.send_stanza(message.into()).await;
88 let _ = response.send(Ok(()));
89 RequestAction::Noop
90 }
91 Some(Request::SendIq { to, data, response }) => {
92 let token = client.send_iq(to, data).await;
93 RequestAction::IqTracking { token, response }
94 }
95 None => {
96 RequestAction::LostClient
98 }
99 }
100}
101
102pub(crate) async fn resolve_iqs(
103 pending_iqs: &mut Vec<(
104 IqResponseToken,
105 oneshot::Sender<Result<IqResponse, IqFailure>>,
106 )>,
107) -> () {
108 for (token, response) in pending_iqs.drain(..) {
109 let custom_err = Err(IqFailure::SendError(io::Error::new(
110 io::ErrorKind::ConnectionReset,
111 "Timeout",
112 )));
113 tokio::pin!(token);
114 let result = timeout(Duration::from_secs(10), token).await;
115 let result = result.unwrap_or(custom_err);
116 let _ = response.send(result);
117 }
118}
119
120pub(crate) async fn xml_stream_worker(
121 mut client: TokioXmppClient,
122 mut to_server: UnboundedReceiver<Request>,
123 from_server: UnboundedSender<NonTransactional>,
124 events: UnboundedSender<Event>,
125) {
126 let mut pending_iqs: Vec<(
127 IqResponseToken,
128 oneshot::Sender<Result<IqResponse, IqFailure>>,
129 )> = Vec::new();
130
131 while let Some(event) = client.next().await {
134 match event {
135 TokioXmppEvent::Online { resumed: false, .. } => break,
136 _ => (),
137 }
138 }
139
140 debug!("Sending Event::Online");
141 let _ = events.send(Event::Online);
142
143 let mut disconnect = false;
144 loop {
145 tokio::select! {
146 _ = resolve_iqs(&mut pending_iqs) => (),
147 req = to_server.recv() => match handle_requests(&mut client, req).await {
148 RequestAction::Disconnect | RequestAction::LostClient => {
149 disconnect = true;
150 break;
151 }
152 RequestAction::IqTracking { token, response } => {
153 let _ = pending_iqs.push((token, response));
154 },
155 RequestAction::Noop => (),
156 },
157 msg = client.next() => match msg {
158 Some(TokioXmppEvent::Stanza(stanza)) => {
159 debug!("Stanza: {:?}", stanza);
160 let _ = match stanza {
161 Stanza::Message(st) => from_server.send(st.into()),
162 Stanza::Presence(st) => from_server.send(st.into()),
163 _ => Ok(()),
164 };
165 }
166 Some(event) => debug!("TokioXmppEvent: {event:?}"),
167 None => break,
168 }
169 }
170 }
171
172 if disconnect {
173 let _ = client.send_end().await;
174 return ();
175 }
176}