tokio_xmpp/stanzastream/
queue.rs

1// Copyright (c) 2019 Emmanuel Gil Peyrot <linkmauve@linkmauve.fr>
2//
3// This Source Code Form is subject to the terms of the Mozilla Public
4// License, v. 2.0. If a copy of the MPL was not distributed with this
5// file, You can obtain one at http://mozilla.org/MPL/2.0/.
6
7use core::cmp::Ordering;
8use core::fmt;
9use core::task::{Context, Poll};
10use std::collections::VecDeque;
11use std::io;
12
13use futures::ready;
14
15use tokio::sync::{mpsc, watch};
16
17use crate::Stanza;
18
19#[derive(Debug, Clone)]
20pub struct OpaqueIoError {
21    kind: io::ErrorKind,
22    message: String,
23}
24
25impl OpaqueIoError {
26    pub fn kind(&self) -> io::ErrorKind {
27        self.kind
28    }
29
30    pub fn into_io_error(self) -> io::Error {
31        io::Error::new(self.kind, self.message)
32    }
33
34    pub fn to_io_error(&self) -> io::Error {
35        io::Error::new(self.kind, self.message.clone())
36    }
37}
38
39impl From<io::Error> for OpaqueIoError {
40    fn from(other: io::Error) -> Self {
41        <Self as From<&io::Error>>::from(&other)
42    }
43}
44
45impl From<&io::Error> for OpaqueIoError {
46    fn from(other: &io::Error) -> Self {
47        Self {
48            kind: other.kind(),
49            message: other.to_string(),
50        }
51    }
52}
53
54impl fmt::Display for OpaqueIoError {
55    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
56        f.write_str(&self.message)
57    }
58}
59
60impl core::error::Error for OpaqueIoError {}
61
62/// The five stages of stanza transmission.
63#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone, Copy)]
64pub enum StanzaStage {
65    /// The stanza is in the transmit queue, but has not been serialised or
66    /// sent to the stream yet.
67    Queued,
68
69    /// The stanza was successfully serialised and put into the transmit
70    /// buffers.
71    Sent,
72
73    /// The stanza has been acked by the peer using XEP-0198 or comparable
74    /// means.
75    ///
76    /// **Note:** This state is only ever reached on streams where XEP-0198
77    /// was succesfully negotiated.
78    Acked,
79
80    /// Stanza transmission or serialisation failed.
81    Failed,
82
83    /// The stanza was dropped from the transmit queue before it could be
84    /// sent.
85    ///
86    /// This may happen if the stream breaks in a fatal, panick-y way.
87    Dropped,
88}
89
90impl From<&StanzaState> for StanzaStage {
91    fn from(other: &StanzaState) -> Self {
92        match other {
93            StanzaState::Queued => Self::Queued,
94            StanzaState::Sent { .. } => Self::Sent,
95            StanzaState::Acked { .. } => Self::Acked,
96            StanzaState::Failed { .. } => Self::Failed,
97            StanzaState::Dropped => Self::Dropped,
98        }
99    }
100}
101
102impl PartialEq<StanzaStage> for StanzaState {
103    fn eq(&self, other: &StanzaStage) -> bool {
104        StanzaStage::from(self).eq(other)
105    }
106}
107
108impl PartialEq<StanzaState> for StanzaStage {
109    fn eq(&self, other: &StanzaState) -> bool {
110        self.eq(&Self::from(other))
111    }
112}
113
114impl PartialOrd<StanzaStage> for StanzaState {
115    fn partial_cmp(&self, other: &StanzaStage) -> Option<Ordering> {
116        StanzaStage::from(self).partial_cmp(other)
117    }
118}
119
120impl PartialOrd<StanzaState> for StanzaStage {
121    fn partial_cmp(&self, other: &StanzaState) -> Option<Ordering> {
122        self.partial_cmp(&Self::from(other))
123    }
124}
125
126/// State of a stanza in transit to the peer.
127#[derive(Debug, Clone)]
128pub enum StanzaState {
129    /// The stanza has been enqueued in the local queue but not sent yet.
130    Queued,
131
132    /// The stanza has been sent to the server, but there is no proof that it
133    /// has been received by the server yet.
134    Sent {
135        /*
136        /// The time from when the stanza was enqueued until the time it was
137        /// sent on the stream.
138        queue_delay: Duration,
139        */
140    },
141
142    /// Confirmation that the stanza has been seen by the server has been
143    /// received.
144    Acked {
145        /*
146        /// The time from when the stanza was enqueued until the time it was
147        /// sent on the stream.
148        queue_delay: Duration,
149
150        /// The time between sending the stanza on the stream and receiving
151        /// confirmation from the server.
152        ack_delay: Duration,
153        */
154    },
155
156    /// Sending the stanza has failed in a non-recoverable manner.
157    Failed {
158        /// The error which caused the sending to fail.
159        error: OpaqueIoError,
160    },
161
162    /// The stanza was dropped out of the queue for unspecified reasons,
163    /// such as the stream breaking in a fatal, panick-y way.
164    Dropped,
165}
166
167/// Track stanza transmission through the
168/// [`StanzaStream`][`super::StanzaStream`] up to the peer.
169#[derive(Clone)]
170pub struct StanzaToken {
171    inner: watch::Receiver<StanzaState>,
172}
173
174impl StanzaToken {
175    /// Wait for the stanza transmission to reach the given state.
176    ///
177    /// If the stanza is removed from tracking before that state is reached,
178    /// `None` is returned.
179    pub async fn wait_for(&mut self, state: StanzaStage) -> Option<StanzaState> {
180        self.inner
181            .wait_for(|st| *st >= state)
182            .await
183            .map(|x| x.clone())
184            .ok()
185    }
186
187    pub(crate) fn into_stream(self) -> tokio_stream::wrappers::WatchStream<StanzaState> {
188        tokio_stream::wrappers::WatchStream::new(self.inner)
189    }
190
191    /// Read the current transmission state.
192    pub fn state(&self) -> StanzaState {
193        self.inner.borrow().clone()
194    }
195}
196
197pub(super) struct QueueEntry {
198    pub stanza: Box<Stanza>,
199    pub token: watch::Sender<StanzaState>,
200}
201
202impl QueueEntry {
203    pub fn untracked(st: Box<Stanza>) -> Self {
204        Self::tracked(st).0
205    }
206
207    pub fn tracked(st: Box<Stanza>) -> (Self, StanzaToken) {
208        let (tx, rx) = watch::channel(StanzaState::Queued);
209        let token = StanzaToken { inner: rx };
210        (
211            QueueEntry {
212                stanza: st,
213                token: tx,
214            },
215            token,
216        )
217    }
218}
219
220/// Reference to a transmit queue entry.
221///
222/// On drop, the entry is returned to the queue.
223pub(super) struct TransmitQueueRef<'x, T> {
224    q: &'x mut VecDeque<T>,
225}
226
227impl<'x, T> TransmitQueueRef<'x, T> {
228    /// Take the item out of the queue.
229    pub fn take(self) -> T {
230        // Unwrap: when this type is created, a check is made that the queue
231        // actually has a front item and because we borrow, that also cannot
232        // change.
233        self.q.pop_front().unwrap()
234    }
235}
236
237/// A transmit queue coupled to an [`mpsc::Receiver`].
238///
239/// The transmit queue will by default only allow one element to reside in the
240/// queue outside the inner `Receiver`: the main queueing happens inside the
241/// receiver and is governed by its queue depth and associated backpressure.
242///
243/// However, the queue does allow prepending elements to the front, which is
244/// useful for retransmitting items.
245pub(super) struct TransmitQueue<T: Unpin> {
246    inner: mpsc::Receiver<T>,
247    peek: VecDeque<T>,
248}
249
250impl<T: Unpin> TransmitQueue<T> {
251    /// Create a new transmission queue around an existing mpsc receiver.
252    pub fn wrap(ch: mpsc::Receiver<T>) -> Self {
253        Self {
254            inner: ch,
255            peek: VecDeque::with_capacity(1),
256        }
257    }
258
259    /// Create a new mpsc channel and wrap the receiving side in a
260    /// transmission queue
261    pub fn channel(depth: usize) -> (mpsc::Sender<T>, Self) {
262        let (tx, rx) = mpsc::channel(depth);
263        (tx, Self::wrap(rx))
264    }
265
266    /// Poll the queue for the next item to transmit.
267    pub fn poll_next(&mut self, cx: &mut Context) -> Poll<Option<TransmitQueueRef<'_, T>>> {
268        if self.peek.len() > 0 {
269            // Cannot use `if let Some(.) = .` here because of a borrowchecker
270            // restriction. If the reference is created before the branch is
271            // entered, it will think it needs to be borrowed until the end
272            // of the function (and that will conflict with the mutable
273            // borrow we do for `self.peek.push_back` below).
274            // See also https://github.com/rust-lang/rust/issues/54663.
275            return Poll::Ready(Some(TransmitQueueRef { q: &mut self.peek }));
276        } else {
277            // The target size for the queue is 1, effectively acting as an
278            // Option<T>. In some cases, we need more than one, but that is
279            // always only a temporary burst (e.g. SM resumption
280            // retransmissions), so we release the memory as soon as possible
281            // after that.
282            // Even though the target size is 1, we don't want to be pedantic
283            // about this and we don't want to reallocate often. Some short
284            // bursts are ok, and given that the stanzas inside QueueEntry
285            // elements (the main use case for this type) are boxed anyway,
286            // the size of the elements is rather small.
287            if self.peek.capacity() > 32 {
288                // We do not use shrink_to here, because we are *certain* that
289                // we won't need a larger capacity any time soon, and
290                // allocators may avoid moving data around.
291                let mut new = VecDeque::new();
292                core::mem::swap(&mut self.peek, &mut new);
293            }
294        }
295        match ready!(self.inner.poll_recv(cx)) {
296            None => Poll::Ready(None),
297            Some(v) => {
298                self.peek.push_back(v);
299                Poll::Ready(Some(TransmitQueueRef { q: &mut self.peek }))
300            }
301        }
302    }
303
304    /// Requeue a sequence of items to the front of the queue.
305    ///
306    /// This function preserves ordering of the elements in `iter`, meaning
307    /// that the first item from `iter` is going to be the next item yielded
308    /// by `poll_take` or `poll_peek`.
309    pub fn requeue_all<I: IntoIterator<Item = T>>(&mut self, iter: I) {
310        let iter = iter.into_iter();
311        let to_reserve = iter.size_hint().1.unwrap_or(iter.size_hint().0);
312        self.peek.reserve(to_reserve);
313        let mut n = 0;
314        for item in iter {
315            self.peek.push_front(item);
316            n += 1;
317        }
318        // Now we need to revert the order: we pushed the elements to the
319        // front, so if we now read back from the front via poll_peek or
320        // poll_take, that will cause them to be read in reverse order. The
321        // following loop fixes that.
322        for i in 0..(n / 2) {
323            let j = n - (i + 1);
324            self.peek.swap(i, j);
325        }
326    }
327
328    /// Enqueues an item to be sent after all items in the *local* queue, but
329    /// *before* all items which are still inside the inner `mpsc` channel.
330    pub fn enqueue(&mut self, item: T) {
331        self.peek.push_back(item);
332    }
333
334    /// Return true if the sender side of the queue is closed.
335    ///
336    /// Note that there may still be items which can be retrieved from the
337    /// queue even though it has been closed.
338    pub fn is_closed(&self) -> bool {
339        self.inner.is_closed()
340    }
341}
342
343impl TransmitQueue<QueueEntry> {
344    /// Fail all currently queued items with the given error.
345    ///
346    /// Future items will not be affected.
347    pub fn fail(&mut self, error: &OpaqueIoError) {
348        for item in self.peek.drain(..) {
349            item.token.send_replace(StanzaState::Failed {
350                error: error.clone(),
351            });
352        }
353        while let Ok(item) = self.inner.try_recv() {
354            item.token.send_replace(StanzaState::Failed {
355                error: error.clone(),
356            });
357        }
358        self.peek.shrink_to(1);
359    }
360}