xmpp/
stream.rs

1// Copyright (c) 2024-2099 xmpp-rs contributors.
2//
3// This Source Code Form is subject to the terms of the Mozilla Public
4// License, v. 2.0. If a copy of the MPL was not distributed with this
5// file, You can obtain one at http://mozilla.org/MPL/2.0/.
6
7use std::io;
8
9use crate::jid::Jid;
10use futures::stream::StreamExt;
11use tokio::{
12    sync::mpsc::{UnboundedReceiver, UnboundedSender},
13    sync::oneshot,
14    time::{timeout, Duration},
15};
16use tokio_xmpp::{
17    parsers::{message::Message, presence::Presence},
18    Client as TokioXmppClient, Event as TokioXmppEvent, IqFailure, IqRequest, IqResponse,
19    IqResponseToken, Stanza,
20};
21
22use crate::Event;
23
24#[derive(Debug)]
25pub enum Request {
26    SendMessage {
27        message: Message,
28        response: oneshot::Sender<io::Result<()>>,
29    },
30    SendPresence {
31        presence: Presence,
32        response: oneshot::Sender<io::Result<()>>,
33    },
34    SendIq {
35        to: Option<Jid>,
36        data: IqRequest,
37        response: oneshot::Sender<Result<IqResponse, IqFailure>>,
38    },
39    Disconnect {
40        response: oneshot::Sender<io::Result<()>>,
41    },
42}
43
44#[derive(Debug)]
45pub enum NonTransactional {
46    Presence(Presence),
47    Message(Message),
48}
49
50impl From<Message> for NonTransactional {
51    fn from(message: Message) -> Self {
52        Self::Message(message)
53    }
54}
55
56impl From<Presence> for NonTransactional {
57    fn from(presence: Presence) -> Self {
58        Self::Presence(presence)
59    }
60}
61
62pub(crate) enum RequestAction {
63    Noop,
64    IqTracking {
65        token: IqResponseToken,
66        response: oneshot::Sender<Result<IqResponse, IqFailure>>,
67    },
68    Disconnect,
69    LostClient,
70}
71
72pub(crate) async fn handle_requests(
73    client: &mut TokioXmppClient,
74    req: Option<Request>,
75) -> RequestAction {
76    match req {
77        Some(Request::Disconnect { response }) => {
78            let _ = response.send(Ok(()));
79            RequestAction::Disconnect
80        }
81        Some(Request::SendPresence { presence, response }) => {
82            let _ = client.send_stanza(presence.into()).await;
83            let _ = response.send(Ok(()));
84            RequestAction::Noop
85        }
86        Some(Request::SendMessage { message, response }) => {
87            let _ = client.send_stanza(message.into()).await;
88            let _ = response.send(Ok(()));
89            RequestAction::Noop
90        }
91        Some(Request::SendIq { to, data, response }) => {
92            let token = client.send_iq(to, data).await;
93            RequestAction::IqTracking { token, response }
94        }
95        None => {
96            // Lost client.
97            RequestAction::LostClient
98        }
99    }
100}
101
102pub(crate) async fn resolve_iqs(
103    pending_iqs: &mut Vec<(
104        IqResponseToken,
105        oneshot::Sender<Result<IqResponse, IqFailure>>,
106    )>,
107) -> () {
108    for (token, response) in pending_iqs.drain(..) {
109        let custom_err = Err(IqFailure::SendError(io::Error::new(
110            io::ErrorKind::ConnectionReset,
111            "Timeout",
112        )));
113        tokio::pin!(token);
114        let result = timeout(Duration::from_secs(10), token).await;
115        let result = result.unwrap_or(custom_err);
116        let _ = response.send(result);
117    }
118}
119
120pub(crate) async fn xml_stream_worker(
121    mut client: TokioXmppClient,
122    mut to_server: UnboundedReceiver<Request>,
123    from_server: UnboundedSender<NonTransactional>,
124    events: UnboundedSender<Event>,
125) {
126    let mut pending_iqs: Vec<(
127        IqResponseToken,
128        oneshot::Sender<Result<IqResponse, IqFailure>>,
129    )> = Vec::new();
130
131    // Get the client online, then continue to normal handling
132    // TODO: Isn't that the first event we should get in any case? No need for a loop.
133    while let Some(event) = client.next().await {
134        match event {
135            TokioXmppEvent::Online { resumed: false, .. } => break,
136            _ => (),
137        }
138    }
139
140    debug!("Sending Event::Online");
141    let _ = events.send(Event::Online);
142
143    let mut disconnect = false;
144    loop {
145        tokio::select! {
146            _ = resolve_iqs(&mut pending_iqs) => (),
147            req = to_server.recv() => match handle_requests(&mut client, req).await {
148                RequestAction::Disconnect | RequestAction::LostClient => {
149                    disconnect = true;
150                    break;
151                }
152                RequestAction::IqTracking { token, response } => {
153                    let _ = pending_iqs.push((token, response));
154                },
155                RequestAction::Noop => (),
156            },
157            msg = client.next() => match msg {
158                Some(TokioXmppEvent::Stanza(stanza)) => {
159                    debug!("Stanza: {:?}", stanza);
160                    let _ = match stanza {
161                        Stanza::Message(st) => from_server.send(st.into()),
162                        Stanza::Presence(st) => from_server.send(st.into()),
163                        _ => Ok(()),
164                    };
165                }
166                Some(event) => debug!("TokioXmppEvent: {event:?}"),
167                None => break,
168            }
169        }
170    }
171
172    if disconnect {
173        let _ = client.send_end().await;
174        return ();
175    }
176}