1// Copyright (c) 2025 Jonas Schäfer <jonas@zombofant.net>
2//
3// This Source Code Form is subject to the terms of the Mozilla Public
4// License, v. 2.0. If a copy of the MPL was not distributed with this
5// file, You can obtain one at http://mozilla.org/MPL/2.0/.
67use alloc::collections::BTreeMap;
8use alloc::sync::{Arc, Weak};
9use core::error::Error;
10use core::fmt;
11use core::future::Future;
12use core::ops::ControlFlow;
13use core::pin::Pin;
14use core::task::{ready, Context, Poll};
15use std::io;
16use std::sync::Mutex;
1718use futures::Stream;
19use tokio::sync::oneshot;
2021use xmpp_parsers::{
22 iq::{Iq, IqType},
23 stanza_error::StanzaError,
24};
2526use crate::{
27 event::make_id,
28 jid::Jid,
29 minidom::Element,
30 stanzastream::{StanzaState, StanzaToken},
31};
3233/// An IQ request payload
34pub enum IqRequest {
35/// Payload for a `type="get"` request
36Get(Element),
3738/// Payload for a `type="set"` request
39Set(Element),
40}
4142impl From<IqRequest> for IqType {
43fn from(other: IqRequest) -> IqType {
44match other {
45 IqRequest::Get(v) => Self::Get(v),
46 IqRequest::Set(v) => Self::Set(v),
47 }
48 }
49}
5051/// An IQ response payload
52pub enum IqResponse {
53/// Payload for a `type="result"` response.
54Result(Option<Element>),
5556/// Payload for a `type="error"` response.
57Error(StanzaError),
58}
5960impl From<IqResponse> for IqType {
61fn from(other: IqResponse) -> IqType {
62match other {
63 IqResponse::Result(v) => Self::Result(v),
64 IqResponse::Error(v) => Self::Error(v),
65 }
66 }
67}
6869/// Error enumeration for Iq sending failures
70#[derive(Debug)]
71pub enum IqFailure {
72/// Internal error inside tokio_xmpp which caused the stream worker to
73 /// drop the token before the response was received.
74 ///
75 /// Most likely, this means that the stream has died with a panic.
76LostWorker,
7778/// The IQ failed to send because of an I/O or serialisation error.
79SendError(io::Error),
80}
8182impl fmt::Display for IqFailure {
83fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
84match self {
85Self::LostWorker => {
86 f.write_str("disconnected from internal connection worker while sending IQ")
87 }
88Self::SendError(e) => write!(f, "send error: {e}"),
89 }
90 }
91}
9293impl Error for IqFailure {
94fn source(&self) -> Option<&(dyn Error + 'static)> {
95match self {
96Self::SendError(ref e) => Some(e),
97Self::LostWorker => None,
98 }
99 }
100}
101102type IqKey = (Option<Jid>, String);
103type IqMap = BTreeMap<IqKey, IqResponseSink>;
104105struct IqMapEntryHandle {
106 key: IqKey,
107 map: Weak<Mutex<IqMap>>,
108}
109110impl Drop for IqMapEntryHandle {
111fn drop(&mut self) {
112let Some(map) = self.map.upgrade() else {
113return;
114 };
115let Some(mut map) = map.lock().ok() else {
116return;
117 };
118 map.remove(&self.key);
119 }
120}
121122pin_project_lite::pin_project! {
123/// Handle for awaiting an IQ response.
124 ///
125 /// The `IqResponseToken` can be awaited and will generate a result once
126 /// the Iq response has been received. Note that an `Ok(_)` result does
127 /// **not** imply a successful execution of the remote command: It may
128 /// contain a [`IqResponse::Error`] variant.
129 ///
130 /// Note that there are no internal timeouts for Iq responses: If a reply
131 /// never arrives, the [`IqResponseToken`] future will never complete.
132 /// Most of the time, you should combine that token with something like
133 /// [`tokio::time::timeout`].
134 ///
135 /// Dropping (cancelling) an `IqResponseToken` removes the internal
136 /// bookkeeping required for tracking the response.
137pub struct IqResponseToken {
138 entry: Option<IqMapEntryHandle>,
139#[pin]
140stanza_token: Option<tokio_stream::wrappers::WatchStream<StanzaState>>,
141#[pin]
142inner: oneshot::Receiver<Result<IqResponse, IqFailure>>,
143 }
144}
145146impl IqResponseToken {
147/// Tie a stanza token to this IQ response token.
148 ///
149 /// The stanza token should point at the IQ **request**, the response of
150 /// which this response token awaits.
151 ///
152 /// Awaiting the response token will then handle error states in the
153 /// stanza token and return IqFailure as appropriate.
154pub(crate) fn set_stanza_token(&mut self, token: StanzaToken) {
155assert!(self.stanza_token.is_none());
156self.stanza_token = Some(token.into_stream());
157 }
158}
159160impl Future for IqResponseToken {
161type Output = Result<IqResponse, IqFailure>;
162163fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
164let mut this = self.project();
165match this.inner.poll(cx) {
166 Poll::Ready(Ok(v)) => {
167// Drop the map entry handle to release some memory.
168this.entry.take();
169return Poll::Ready(v);
170 }
171 Poll::Ready(Err(_)) => {
172log::warn!("IqResponseToken oneshot::Receiver returned receive error!");
173// Drop the map entry handle to release some memory.
174this.entry.take();
175return Poll::Ready(Err(IqFailure::LostWorker));
176 }
177 Poll::Pending => (),
178 };
179180loop {
181match this.stanza_token.as_mut().as_pin_mut() {
182// We have a stanza token to look at, so we check its state.
183Some(stream) => match ready!(stream.poll_next(cx)) {
184// Still in the queue.
185Some(StanzaState::Queued) => (),
186187Some(StanzaState::Dropped) | None => {
188log::warn!("StanzaToken associated with IqResponseToken signalled that the Stanza was dropped before transmission.");
189// Drop the map entry handle to release some memory.
190this.entry.take();
191// Lost stanza stream: cannot ever get a reply.
192return Poll::Ready(Err(IqFailure::LostWorker));
193 }
194195Some(StanzaState::Failed { error }) => {
196// Drop the map entry handle to release some memory.
197this.entry.take();
198// Send error: cannot ever get a reply.
199return Poll::Ready(Err(IqFailure::SendError(error.into_io_error())));
200 }
201202Some(StanzaState::Sent { .. }) | Some(StanzaState::Acked { .. }) => {
203// Sent successfully, stop polling the stream: We do
204 // not care what happens after successful sending,
205 // the next step we expect is that this.inner
206 // completes.
207*this.stanza_token = None;
208return Poll::Pending;
209 }
210 },
211212// No StanzaToken to poll, so we return Poll::Pending and hope
213 // that we will get a response through this.inner eventually..
214None => return Poll::Pending,
215 }
216 }
217 }
218}
219220struct IqResponseSink {
221 inner: oneshot::Sender<Result<IqResponse, IqFailure>>,
222}
223224impl IqResponseSink {
225fn complete(self, resp: IqResponse) {
226let _: Result<_, _> = self.inner.send(Ok(resp));
227 }
228}
229230/// Utility struct to track IQ responses.
231pub struct IqResponseTracker {
232 map: Arc<Mutex<IqMap>>,
233}
234235impl IqResponseTracker {
236/// Create a new empty response tracker.
237pub fn new() -> Self {
238Self {
239 map: Arc::new(Mutex::new(IqMap::new())),
240 }
241 }
242243/// Attempt to handle an IQ stanza as IQ response.
244 ///
245 /// Returns the IQ stanza unharmed if it is not an IQ response matching
246 /// any request which is still being tracked.
247pub fn handle_iq(&self, iq: Iq) -> ControlFlow<(), Iq> {
248let payload = match iq.payload {
249 IqType::Error(error) => IqResponse::Error(error),
250 IqType::Result(result) => IqResponse::Result(result),
251_ => return ControlFlow::Continue(iq),
252 };
253let key = (iq.from, iq.id);
254let mut map = self.map.lock().unwrap();
255match map.remove(&key) {
256None => {
257log::trace!("not handling IQ response from {:?} with id {:?}: no active tracker for this tuple", key.0, key.1);
258 ControlFlow::Continue(Iq {
259 from: key.0,
260 id: key.1,
261 to: iq.to,
262 payload: payload.into(),
263 })
264 }
265Some(sink) => {
266 sink.complete(payload);
267 ControlFlow::Break(())
268 }
269 }
270 }
271272/// Allocate a new IQ response tracking handle.
273 ///
274 /// This modifies the IQ to assign a unique ID.
275pub fn allocate_iq_handle(
276&self,
277 from: Option<Jid>,
278 to: Option<Jid>,
279 req: IqRequest,
280 ) -> (Iq, IqResponseToken) {
281let key = (to, make_id());
282let mut map = self.map.lock().unwrap();
283let (tx, rx) = oneshot::channel();
284let sink = IqResponseSink { inner: tx };
285assert!(map.get(&key).is_none());
286let token = IqResponseToken {
287 entry: Some(IqMapEntryHandle {
288 key: key.clone(),
289 map: Arc::downgrade(&self.map),
290 }),
291 stanza_token: None,
292 inner: rx,
293 };
294 map.insert(key.clone(), sink);
295 (
296 Iq {
297 from,
298 to: key.0,
299 id: key.1,
300 payload: req.into(),
301 },
302 token,
303 )
304 }
305}