Skip to main content

tokio_xmpp/client/
worker.rs

1// Copyright (c) 2025 xmpp-rs contributors.
2//
3// This Source Code Form is subject to the terms of the Mozilla Public
4// License, v. 2.0. If a copy of the MPL was not distributed with this
5// file, You can obtain one at http://mozilla.org/MPL/2.0/.
6
7use crate::client::iq;
8use crate::stanzastream::StanzaReceiver;
9use crate::stanzastream::{Event as StanzaStreamEvent, StreamEvent};
10use crate::{Event, Stanza};
11use core::ops::ControlFlow;
12use futures::StreamExt;
13use tokio::sync::mpsc;
14use tokio::sync::oneshot;
15use xmpp_parsers::jid::Jid;
16use xmpp_parsers::stream_features::StreamFeatures;
17
18/// Worker to drive the [`crate::stanzastream`] of a client in the background and continue to
19/// acknowledge IQs, even when the client is not polled.
20pub struct ClientWorker {
21    // Receiver from the StanzaStream
22    stream_rx: StanzaReceiver,
23    // Sender to the client (worker-to-frontend)
24    stanza_w2f_tx: mpsc::Sender<Event>,
25    // Shutdown signal receiver from frontend
26    shutdown_rx: oneshot::Receiver<()>,
27    // JID of the logged-in client
28    bound_jid: Option<Jid>,
29    // Stream features of the currently connected stream
30    features: Option<StreamFeatures>,
31    // Response tracker for IQs
32    iq_response_tracker: iq::IqResponseTracker,
33}
34
35impl ClientWorker {
36    pub fn new(
37        stream_rx: StanzaReceiver,
38        iq_response_tracker: iq::IqResponseTracker,
39        depth: usize,
40    ) -> (Self, oneshot::Sender<()>, mpsc::Receiver<Event>) {
41        let (shutdown_tx, shutdown_rx) = oneshot::channel();
42
43        // worker-to-frontend connection
44        let (stanza_w2f_tx, stanza_w2f_rx) = mpsc::channel(depth);
45
46        let worker = Self {
47            stream_rx,
48            stanza_w2f_tx,
49            iq_response_tracker,
50            shutdown_rx,
51            bound_jid: None,
52            features: None,
53        };
54
55        (worker, shutdown_tx, stanza_w2f_rx)
56    }
57
58    pub async fn run(mut self) -> StanzaReceiver {
59        loop {
60            tokio::select! {
61                _ = &mut self.shutdown_rx => {
62                    return self.stream_rx;
63                }
64                Some(event) = self.stream_rx.next() => {
65                    self.handle_event(event).await;
66                }
67            }
68        }
69    }
70
71    async fn handle_event(&mut self, event: StanzaStreamEvent) {
72        let send_event = match event {
73            StanzaStreamEvent::Stanza(st) => match st {
74                Stanza::Iq(iq) => match self.iq_response_tracker.handle_iq(iq) {
75                    ControlFlow::Break(()) => return,
76                    ControlFlow::Continue(iq) => Event::Stanza(Stanza::Iq(iq)),
77                },
78                other => Event::Stanza(other),
79            },
80            StanzaStreamEvent::Stream(StreamEvent::Reset {
81                bound_jid,
82                features,
83            }) => {
84                self.iq_response_tracker
85                    .set_account_jid(bound_jid.to_bare());
86
87                Event::Online {
88                    bound_jid,
89                    features,
90                    resumed: false,
91                }
92            }
93            StanzaStreamEvent::Stream(StreamEvent::Resumed) => Event::Online {
94                bound_jid: self.bound_jid.as_ref().unwrap().clone(),
95                features: self.features.as_ref().unwrap().clone(),
96                resumed: true,
97            },
98            StanzaStreamEvent::Stream(StreamEvent::Suspended) => return,
99        };
100
101        let Ok(()) = self.stanza_w2f_tx.send(send_event).await else {
102            panic!("All clients have been dropped.");
103        };
104    }
105}