1use std::io;
8use std::ops::ControlFlow;
9
10use crate::jid::Jid;
11use futures::stream::StreamExt;
12use tokio::{
13 sync::mpsc::{UnboundedReceiver, UnboundedSender},
14 sync::oneshot,
15 time::{timeout, Duration},
16};
17use tokio_xmpp::{
18 parsers::{message::Message, presence::Presence},
19 Client as TokioXmppClient, Event as TokioXmppEvent, IqFailure, IqRequest, IqResponse, Stanza,
20};
21
22use crate::Event;
23
24#[derive(Debug)]
25pub enum Request {
26 SendMessage {
27 message: Message,
28 response: oneshot::Sender<io::Result<()>>,
29 },
30 SendPresence {
31 presence: Presence,
32 response: oneshot::Sender<io::Result<()>>,
33 },
34 SendIq {
35 to: Option<Jid>,
36 data: IqRequest,
37 response: oneshot::Sender<Result<IqResponse, IqFailure>>,
38 },
39 Disconnect {
40 response: oneshot::Sender<io::Result<()>>,
41 },
42}
43
44#[derive(Debug)]
45pub enum NonTransactional {
46 Presence(Presence),
47 Message(Message),
48}
49
50impl From<Message> for NonTransactional {
51 fn from(message: Message) -> Self {
52 Self::Message(message)
53 }
54}
55
56impl From<Presence> for NonTransactional {
57 fn from(presence: Presence) -> Self {
58 Self::Presence(presence)
59 }
60}
61
62pub(crate) enum RequestAction {
63 Disconnect,
64 LostClient,
65}
66
67pub(crate) async fn handle_requests(
68 client: &mut TokioXmppClient,
69 req: Option<Request>,
70) -> ControlFlow<RequestAction, ()> {
71 match req {
72 Some(Request::Disconnect { response }) => {
73 let _ = response.send(Ok(()));
74 ControlFlow::Break(RequestAction::Disconnect)
75 }
76 Some(Request::SendPresence { presence, response }) => {
77 let _ = client.send_stanza(presence.into()).await;
78 let _ = response.send(Ok(()));
79 ControlFlow::Continue(())
80 }
81 Some(Request::SendMessage { message, response }) => {
82 let _ = client.send_stanza(message.into()).await;
83 let _ = response.send(Ok(()));
84 ControlFlow::Continue(())
85 }
86 Some(Request::SendIq { to, data, response }) => {
87 let token = client.send_iq(to, data).await;
88 tokio::pin!(token);
89 let custom_err = Err(IqFailure::SendError(io::Error::new(
90 io::ErrorKind::ConnectionReset,
91 "Timeout",
92 )));
93 let result = timeout(Duration::from_secs(10), token).await;
94 debug!("BAR000: {result:?}");
95 let result2 = result.unwrap_or(custom_err);
96 let _ = response.send(result2);
97 ControlFlow::Continue(())
98 }
99 None => {
100 ControlFlow::Break(RequestAction::LostClient)
102 }
103 }
104}
105
106pub(crate) async fn xml_stream_worker(
107 mut client: TokioXmppClient,
108 mut to_server: UnboundedReceiver<Request>,
109 mut from_server: UnboundedSender<NonTransactional>,
110 events: UnboundedSender<Event>,
111) {
112 while let Some(event) = client.next().await {
115 match event {
116 TokioXmppEvent::Online { resumed: false, .. } => break,
117 _ => (),
118 }
119 }
120
121 debug!("Sending Event::Online");
122 let _ = events.send(Event::Online);
123
124 let mut disconnect = false;
125 loop {
126 tokio::select! {
127 req = to_server.recv() => match handle_requests(&mut client, req).await {
128 ControlFlow::Break(_) => disconnect = true,
129 ControlFlow::Continue(_) => (),
130 },
131 msg = client.next() => match msg {
132 Some(TokioXmppEvent::Stanza(stanza)) => {
133 println!("Stanza: {:?}", stanza);
134 let _ = match stanza {
135 Stanza::Message(st) => from_server.send(st.into()),
136 Stanza::Presence(st) => from_server.send(st.into()),
137 _ => Ok(()),
138 };
139 }
140 Some(event) => println!("TokioXmppEvent: {event:?}"),
141 None => break,
142 }
143 }
144 }
145
146 if disconnect {
147 let _ = client.send_end().await;
148 return ();
149 }
150}