tokio_xmpp/client/
worker.rs1use crate::client::iq;
8use crate::stanzastream::StanzaReceiver;
9use crate::stanzastream::{Event as StanzaStreamEvent, StreamEvent};
10use crate::{Event, Stanza};
11use core::ops::ControlFlow;
12use futures::StreamExt;
13use tokio::sync::mpsc;
14use tokio::sync::oneshot;
15use xmpp_parsers::jid::Jid;
16use xmpp_parsers::stream_features::StreamFeatures;
17
18pub struct ClientWorker {
21 stream_rx: StanzaReceiver,
23 stanza_w2f_tx: mpsc::Sender<Event>,
25 shutdown_rx: oneshot::Receiver<()>,
27 bound_jid: Option<Jid>,
29 features: Option<StreamFeatures>,
31 iq_response_tracker: iq::IqResponseTracker,
33}
34
35impl ClientWorker {
36 pub fn new(
37 stream_rx: StanzaReceiver,
38 iq_response_tracker: iq::IqResponseTracker,
39 depth: usize,
40 ) -> (Self, oneshot::Sender<()>, mpsc::Receiver<Event>) {
41 let (shutdown_tx, shutdown_rx) = oneshot::channel();
42
43 let (stanza_w2f_tx, stanza_w2f_rx) = mpsc::channel(depth);
45
46 let worker = Self {
47 stream_rx,
48 stanza_w2f_tx,
49 iq_response_tracker,
50 shutdown_rx,
51 bound_jid: None,
52 features: None,
53 };
54
55 (worker, shutdown_tx, stanza_w2f_rx)
56 }
57
58 pub async fn run(mut self) -> StanzaReceiver {
59 loop {
60 tokio::select! {
61 _ = &mut self.shutdown_rx => {
62 return self.stream_rx;
63 }
64 Some(event) = self.stream_rx.next() => {
65 self.handle_event(event).await;
66 }
67 }
68 }
69 }
70
71 async fn handle_event(&mut self, event: StanzaStreamEvent) {
72 let send_event = match event {
73 StanzaStreamEvent::Stanza(st) => match st {
74 Stanza::Iq(iq) => match self.iq_response_tracker.handle_iq(iq) {
75 ControlFlow::Break(()) => return,
76 ControlFlow::Continue(iq) => Event::Stanza(Stanza::Iq(iq)),
77 },
78 other => Event::Stanza(other),
79 },
80 StanzaStreamEvent::Stream(StreamEvent::Reset {
81 bound_jid,
82 features,
83 }) => {
84 self.iq_response_tracker
85 .set_account_jid(bound_jid.to_bare());
86
87 Event::Online {
88 bound_jid,
89 features,
90 resumed: false,
91 }
92 }
93 StanzaStreamEvent::Stream(StreamEvent::Resumed) => Event::Online {
94 bound_jid: self.bound_jid.as_ref().unwrap().clone(),
95 features: self.features.as_ref().unwrap().clone(),
96 resumed: true,
97 },
98 StanzaStreamEvent::Stream(StreamEvent::Suspended) => return,
99 };
100
101 let Ok(()) = self.stanza_w2f_tx.send(send_event).await else {
102 panic!("All clients have been dropped.");
103 };
104 }
105}