tokio_xmpp/client/
iq.rs

1// Copyright (c) 2025 Jonas Schäfer <jonas@zombofant.net>
2//
3// This Source Code Form is subject to the terms of the Mozilla Public
4// License, v. 2.0. If a copy of the MPL was not distributed with this
5// file, You can obtain one at http://mozilla.org/MPL/2.0/.
6
7use alloc::collections::BTreeMap;
8use alloc::sync::{Arc, Weak};
9use core::error::Error;
10use core::fmt;
11use core::future::Future;
12use core::ops::ControlFlow;
13use core::pin::Pin;
14use core::task::{ready, Context, Poll};
15use std::io;
16use std::sync::Mutex;
17
18use futures::Stream;
19use tokio::sync::oneshot;
20
21use xmpp_parsers::{iq::Iq, stanza_error::StanzaError};
22
23use crate::{
24    event::make_id,
25    jid::Jid,
26    minidom::Element,
27    stanzastream::{StanzaState, StanzaToken},
28};
29
30/// An IQ request payload
31#[derive(Debug)]
32pub enum IqRequest {
33    /// Payload for a `type="get"` request
34    Get(Element),
35
36    /// Payload for a `type="set"` request
37    Set(Element),
38}
39
40impl IqRequest {
41    fn into_iq(self, from: Option<Jid>, to: Option<Jid>, id: String) -> Iq {
42        match self {
43            Self::Get(payload) => Iq::Get {
44                from,
45                to,
46                id,
47                payload,
48            },
49            Self::Set(payload) => Iq::Set {
50                from,
51                to,
52                id,
53                payload,
54            },
55        }
56    }
57}
58
59/// An IQ response payload
60#[derive(Debug)]
61pub enum IqResponse {
62    /// Payload for a `type="result"` response.
63    Result(Option<Element>),
64
65    /// Payload for a `type="error"` response.
66    Error(StanzaError),
67}
68
69impl IqResponse {
70    fn into_iq(self, from: Option<Jid>, to: Option<Jid>, id: String) -> Iq {
71        match self {
72            Self::Error(error) => Iq::Error {
73                from,
74                to,
75                id,
76                error,
77                payload: None,
78            },
79            Self::Result(payload) => Iq::Result {
80                from,
81                to,
82                id,
83                payload,
84            },
85        }
86    }
87}
88
89/// Error enumeration for Iq sending failures
90#[derive(Debug)]
91pub enum IqFailure {
92    /// Internal error inside tokio_xmpp which caused the stream worker to
93    /// drop the token before the response was received.
94    ///
95    /// Most likely, this means that the stream has died with a panic.
96    LostWorker,
97
98    /// The IQ failed to send because of an I/O or serialisation error.
99    SendError(io::Error),
100}
101
102impl fmt::Display for IqFailure {
103    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
104        match self {
105            Self::LostWorker => {
106                f.write_str("disconnected from internal connection worker while sending IQ")
107            }
108            Self::SendError(e) => write!(f, "send error: {e}"),
109        }
110    }
111}
112
113impl Error for IqFailure {
114    fn source(&self) -> Option<&(dyn Error + 'static)> {
115        match self {
116            Self::SendError(ref e) => Some(e),
117            Self::LostWorker => None,
118        }
119    }
120}
121
122type IqKey = (Option<Jid>, String);
123type IqMap = BTreeMap<IqKey, IqResponseSink>;
124
125#[derive(Debug)]
126struct IqMapEntryHandle {
127    key: IqKey,
128    map: Weak<Mutex<IqMap>>,
129}
130
131impl Drop for IqMapEntryHandle {
132    fn drop(&mut self) {
133        let Some(map) = self.map.upgrade() else {
134            return;
135        };
136        let Some(mut map) = map.lock().ok() else {
137            return;
138        };
139        map.remove(&self.key);
140    }
141}
142
143pin_project_lite::pin_project! {
144    /// Handle for awaiting an IQ response.
145    ///
146    /// The `IqResponseToken` can be awaited and will generate a result once
147    /// the Iq response has been received. Note that an `Ok(_)` result does
148    /// **not** imply a successful execution of the remote command: It may
149    /// contain a [`IqResponse::Error`] variant.
150    ///
151    /// Note that there are no internal timeouts for Iq responses: If a reply
152    /// never arrives, the [`IqResponseToken`] future will never complete.
153    /// Most of the time, you should combine that token with something like
154    /// [`tokio::time::timeout`].
155    ///
156    /// Dropping (cancelling) an `IqResponseToken` removes the internal
157    /// bookkeeping required for tracking the response.
158    #[derive(Debug)]
159    pub struct IqResponseToken {
160        entry: Option<IqMapEntryHandle>,
161        #[pin]
162        stanza_token: Option<tokio_stream::wrappers::WatchStream<StanzaState>>,
163        #[pin]
164        inner: oneshot::Receiver<Result<IqResponse, IqFailure>>,
165    }
166}
167
168impl IqResponseToken {
169    /// Tie a stanza token to this IQ response token.
170    ///
171    /// The stanza token should point at the IQ **request**, the response of
172    /// which this response token awaits.
173    ///
174    /// Awaiting the response token will then handle error states in the
175    /// stanza token and return IqFailure as appropriate.
176    pub(crate) fn set_stanza_token(&mut self, token: StanzaToken) {
177        assert!(self.stanza_token.is_none());
178        self.stanza_token = Some(token.into_stream());
179    }
180}
181
182impl Future for IqResponseToken {
183    type Output = Result<IqResponse, IqFailure>;
184
185    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
186        let mut this = self.project();
187        match this.inner.poll(cx) {
188            Poll::Ready(Ok(v)) => {
189                // Drop the map entry handle to release some memory.
190                this.entry.take();
191                return Poll::Ready(v);
192            }
193            Poll::Ready(Err(_)) => {
194                log::warn!("IqResponseToken oneshot::Receiver returned receive error!");
195                // Drop the map entry handle to release some memory.
196                this.entry.take();
197                return Poll::Ready(Err(IqFailure::LostWorker));
198            }
199            Poll::Pending => (),
200        };
201
202        loop {
203            match this.stanza_token.as_mut().as_pin_mut() {
204                // We have a stanza token to look at, so we check its state.
205                Some(stream) => match ready!(stream.poll_next(cx)) {
206                    // Still in the queue.
207                    Some(StanzaState::Queued) => (),
208
209                    Some(StanzaState::Dropped) | None => {
210                        log::warn!("StanzaToken associated with IqResponseToken signalled that the Stanza was dropped before transmission.");
211                        // Drop the map entry handle to release some memory.
212                        this.entry.take();
213                        // Lost stanza stream: cannot ever get a reply.
214                        return Poll::Ready(Err(IqFailure::LostWorker));
215                    }
216
217                    Some(StanzaState::Failed { error }) => {
218                        // Drop the map entry handle to release some memory.
219                        this.entry.take();
220                        // Send error: cannot ever get a reply.
221                        return Poll::Ready(Err(IqFailure::SendError(error.into_io_error())));
222                    }
223
224                    Some(StanzaState::Sent { .. }) | Some(StanzaState::Acked { .. }) => {
225                        // Sent successfully, stop polling the stream: We do
226                        // not care what happens after successful sending,
227                        // the next step we expect is that this.inner
228                        // completes.
229                        *this.stanza_token = None;
230                        return Poll::Pending;
231                    }
232                },
233
234                // No StanzaToken to poll, so we return Poll::Pending and hope
235                // that we will get a response through this.inner eventually..
236                None => return Poll::Pending,
237            }
238        }
239    }
240}
241
242#[derive(Debug)]
243struct IqResponseSink {
244    inner: oneshot::Sender<Result<IqResponse, IqFailure>>,
245}
246
247impl IqResponseSink {
248    fn complete(self, resp: IqResponse) {
249        let _: Result<_, _> = self.inner.send(Ok(resp));
250    }
251}
252
253/// Utility struct to track IQ responses.
254#[derive(Debug)]
255pub struct IqResponseTracker {
256    map: Arc<Mutex<IqMap>>,
257}
258
259impl IqResponseTracker {
260    /// Create a new empty response tracker.
261    pub fn new() -> Self {
262        Self {
263            map: Arc::new(Mutex::new(IqMap::new())),
264        }
265    }
266
267    /// Attempt to handle an IQ stanza as IQ response.
268    ///
269    /// Returns the IQ stanza unharmed if it is not an IQ response matching
270    /// any request which is still being tracked.
271    pub fn handle_iq(&self, iq: Iq) -> ControlFlow<(), Iq> {
272        let (from, to, id, payload) = match iq {
273            Iq::Error {
274                from,
275                to,
276                id,
277                error,
278                payload: _,
279            } => (from, to, id, IqResponse::Error(error)),
280            Iq::Result {
281                from,
282                to,
283                id,
284                payload,
285            } => (from, to, id, IqResponse::Result(payload)),
286            _ => return ControlFlow::Continue(iq),
287        };
288        let key = (from, id);
289        let mut map = self.map.lock().unwrap();
290        match map.remove(&key) {
291            None => {
292                log::trace!("not handling IQ response from {:?} with id {:?}: no active tracker for this tuple", key.0, key.1);
293                ControlFlow::Continue(payload.into_iq(key.0, to, key.1))
294            }
295            Some(sink) => {
296                sink.complete(payload);
297                ControlFlow::Break(())
298            }
299        }
300    }
301
302    /// Allocate a new IQ response tracking handle.
303    ///
304    /// This modifies the IQ to assign a unique ID.
305    pub fn allocate_iq_handle(
306        &self,
307        from: Option<Jid>,
308        to: Option<Jid>,
309        req: IqRequest,
310    ) -> (Iq, IqResponseToken) {
311        let key = (to, make_id());
312        let mut map = self.map.lock().unwrap();
313        let (tx, rx) = oneshot::channel();
314        let sink = IqResponseSink { inner: tx };
315        assert!(map.get(&key).is_none());
316        let token = IqResponseToken {
317            entry: Some(IqMapEntryHandle {
318                key: key.clone(),
319                map: Arc::downgrade(&self.map),
320            }),
321            stanza_token: None,
322            inner: rx,
323        };
324        map.insert(key.clone(), sink);
325        (req.into_iq(from, key.0, key.1), token)
326    }
327}