xmpp/
stream.rs

1// Copyright (c) 2024-2099 xmpp-rs contributors.
2//
3// This Source Code Form is subject to the terms of the Mozilla Public
4// License, v. 2.0. If a copy of the MPL was not distributed with this
5// file, You can obtain one at http://mozilla.org/MPL/2.0/.
6
7use std::io;
8use std::ops::ControlFlow;
9
10use crate::jid::Jid;
11use futures::stream::StreamExt;
12use tokio::{
13    sync::mpsc::{UnboundedReceiver, UnboundedSender},
14    sync::oneshot,
15    time::{timeout, Duration},
16};
17use tokio_xmpp::{
18    parsers::{message::Message, presence::Presence},
19    Client as TokioXmppClient, Event as TokioXmppEvent, IqFailure, IqRequest, IqResponse, Stanza,
20};
21
22use crate::Event;
23
24#[derive(Debug)]
25pub enum Request {
26    SendMessage {
27        message: Message,
28        response: oneshot::Sender<io::Result<()>>,
29    },
30    SendPresence {
31        presence: Presence,
32        response: oneshot::Sender<io::Result<()>>,
33    },
34    SendIq {
35        to: Option<Jid>,
36        data: IqRequest,
37        response: oneshot::Sender<Result<IqResponse, IqFailure>>,
38    },
39    Disconnect {
40        response: oneshot::Sender<io::Result<()>>,
41    },
42}
43
44#[derive(Debug)]
45pub enum NonTransactional {
46    Presence(Presence),
47    Message(Message),
48}
49
50impl From<Message> for NonTransactional {
51    fn from(message: Message) -> Self {
52        Self::Message(message)
53    }
54}
55
56impl From<Presence> for NonTransactional {
57    fn from(presence: Presence) -> Self {
58        Self::Presence(presence)
59    }
60}
61
62pub(crate) enum RequestAction {
63    Disconnect,
64    LostClient,
65}
66
67pub(crate) async fn handle_requests(
68    client: &mut TokioXmppClient,
69    req: Option<Request>,
70) -> ControlFlow<RequestAction, ()> {
71    match req {
72        Some(Request::Disconnect { response }) => {
73            let _ = response.send(Ok(()));
74            ControlFlow::Break(RequestAction::Disconnect)
75        }
76        Some(Request::SendPresence { presence, response }) => {
77            let _ = client.send_stanza(presence.into()).await;
78            let _ = response.send(Ok(()));
79            ControlFlow::Continue(())
80        }
81        Some(Request::SendMessage { message, response }) => {
82            let _ = client.send_stanza(message.into()).await;
83            let _ = response.send(Ok(()));
84            ControlFlow::Continue(())
85        }
86        Some(Request::SendIq { to, data, response }) => {
87            let token = client.send_iq(to, data).await;
88            tokio::pin!(token);
89            let custom_err = Err(IqFailure::SendError(io::Error::new(
90                io::ErrorKind::ConnectionReset,
91                "Timeout",
92            )));
93            let result = timeout(Duration::from_secs(10), token).await;
94            debug!("BAR000: {result:?}");
95            let result2 = result.unwrap_or(custom_err);
96            let _ = response.send(result2);
97            ControlFlow::Continue(())
98        }
99        None => {
100            // Lost client.
101            ControlFlow::Break(RequestAction::LostClient)
102        }
103    }
104}
105
106pub(crate) async fn xml_stream_worker(
107    mut client: TokioXmppClient,
108    mut to_server: UnboundedReceiver<Request>,
109    mut from_server: UnboundedSender<NonTransactional>,
110    events: UnboundedSender<Event>,
111) {
112    // Get the client online, then continue to normal handling
113    // TODO: Isn't that the first event we should get in any case? No need for a loop.
114    while let Some(event) = client.next().await {
115        match event {
116            TokioXmppEvent::Online { resumed: false, .. } => break,
117            _ => (),
118        }
119    }
120
121    debug!("Sending Event::Online");
122    let _ = events.send(Event::Online);
123
124    let mut disconnect = false;
125    loop {
126        tokio::select! {
127            req = to_server.recv() => match handle_requests(&mut client, req).await {
128                ControlFlow::Break(_) => disconnect = true,
129                ControlFlow::Continue(_) => (),
130            },
131            msg = client.next() => match msg {
132                Some(TokioXmppEvent::Stanza(stanza)) => {
133                    println!("Stanza: {:?}", stanza);
134                    let _ = match stanza {
135                        Stanza::Message(st) => from_server.send(st.into()),
136                        Stanza::Presence(st) => from_server.send(st.into()),
137                        _ => Ok(()),
138                    };
139                }
140                Some(event) => println!("TokioXmppEvent: {event:?}"),
141                None => break,
142            }
143        }
144    }
145
146    if disconnect {
147        let _ = client.send_end().await;
148        return ();
149    }
150}