tokio_xmpp/stanzastream/queue.rs
1// Copyright (c) 2019 Emmanuel Gil Peyrot <linkmauve@linkmauve.fr>
2//
3// This Source Code Form is subject to the terms of the Mozilla Public
4// License, v. 2.0. If a copy of the MPL was not distributed with this
5// file, You can obtain one at http://mozilla.org/MPL/2.0/.
6
7use core::cmp::Ordering;
8use core::fmt;
9use core::task::{Context, Poll};
10use std::collections::VecDeque;
11use std::io;
12
13use futures::ready;
14
15use tokio::sync::{mpsc, watch};
16
17use crate::Stanza;
18
19#[derive(Debug, Clone)]
20pub struct OpaqueIoError {
21 kind: io::ErrorKind,
22 message: String,
23}
24
25impl OpaqueIoError {
26 pub fn kind(&self) -> io::ErrorKind {
27 self.kind
28 }
29
30 pub fn into_io_error(self) -> io::Error {
31 io::Error::new(self.kind, self.message)
32 }
33
34 pub fn to_io_error(&self) -> io::Error {
35 io::Error::new(self.kind, self.message.clone())
36 }
37}
38
39impl From<io::Error> for OpaqueIoError {
40 fn from(other: io::Error) -> Self {
41 <Self as From<&io::Error>>::from(&other)
42 }
43}
44
45impl From<&io::Error> for OpaqueIoError {
46 fn from(other: &io::Error) -> Self {
47 Self {
48 kind: other.kind(),
49 message: other.to_string(),
50 }
51 }
52}
53
54impl fmt::Display for OpaqueIoError {
55 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
56 f.write_str(&self.message)
57 }
58}
59
60impl core::error::Error for OpaqueIoError {}
61
62/// The five stages of stanza transmission.
63#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone, Copy)]
64pub enum StanzaStage {
65 /// The stanza is in the transmit queue, but has not been serialised or
66 /// sent to the stream yet.
67 Queued,
68
69 /// The stanza was successfully serialised and put into the transmit
70 /// buffers.
71 Sent,
72
73 /// The stanza has been acked by the peer using XEP-0198 or comparable
74 /// means.
75 ///
76 /// **Note:** This state is only ever reached on streams where XEP-0198
77 /// was succesfully negotiated.
78 Acked,
79
80 /// Stanza transmission or serialisation failed.
81 Failed,
82
83 /// The stanza was dropped from the transmit queue before it could be
84 /// sent.
85 ///
86 /// This may happen if the stream breaks in a fatal, panick-y way.
87 Dropped,
88}
89
90impl From<&StanzaState> for StanzaStage {
91 fn from(other: &StanzaState) -> Self {
92 match other {
93 StanzaState::Queued => Self::Queued,
94 StanzaState::Sent { .. } => Self::Sent,
95 StanzaState::Acked { .. } => Self::Acked,
96 StanzaState::Failed { .. } => Self::Failed,
97 StanzaState::Dropped => Self::Dropped,
98 }
99 }
100}
101
102impl PartialEq<StanzaStage> for StanzaState {
103 fn eq(&self, other: &StanzaStage) -> bool {
104 StanzaStage::from(self).eq(other)
105 }
106}
107
108impl PartialEq<StanzaState> for StanzaStage {
109 fn eq(&self, other: &StanzaState) -> bool {
110 self.eq(&Self::from(other))
111 }
112}
113
114impl PartialOrd<StanzaStage> for StanzaState {
115 fn partial_cmp(&self, other: &StanzaStage) -> Option<Ordering> {
116 StanzaStage::from(self).partial_cmp(other)
117 }
118}
119
120impl PartialOrd<StanzaState> for StanzaStage {
121 fn partial_cmp(&self, other: &StanzaState) -> Option<Ordering> {
122 self.partial_cmp(&Self::from(other))
123 }
124}
125
126/// State of a stanza in transit to the peer.
127#[derive(Debug, Clone)]
128pub enum StanzaState {
129 /// The stanza has been enqueued in the local queue but not sent yet.
130 Queued,
131
132 /// The stanza has been sent to the server, but there is no proof that it
133 /// has been received by the server yet.
134 Sent {
135 /*
136 /// The time from when the stanza was enqueued until the time it was
137 /// sent on the stream.
138 queue_delay: Duration,
139 */
140 },
141
142 /// Confirmation that the stanza has been seen by the server has been
143 /// received.
144 Acked {
145 /*
146 /// The time from when the stanza was enqueued until the time it was
147 /// sent on the stream.
148 queue_delay: Duration,
149
150 /// The time between sending the stanza on the stream and receiving
151 /// confirmation from the server.
152 ack_delay: Duration,
153 */
154 },
155
156 /// Sending the stanza has failed in a non-recoverable manner.
157 Failed {
158 /// The error which caused the sending to fail.
159 error: OpaqueIoError,
160 },
161
162 /// The stanza was dropped out of the queue for unspecified reasons,
163 /// such as the stream breaking in a fatal, panick-y way.
164 Dropped,
165}
166
167/// Track stanza transmission through the
168/// [`StanzaStream`][`super::StanzaStream`] up to the peer.
169#[derive(Clone)]
170pub struct StanzaToken {
171 inner: watch::Receiver<StanzaState>,
172}
173
174impl StanzaToken {
175 /// Wait for the stanza transmission to reach the given state.
176 ///
177 /// If the stanza is removed from tracking before that state is reached,
178 /// `None` is returned.
179 pub async fn wait_for(&mut self, state: StanzaStage) -> Option<StanzaState> {
180 self.inner
181 .wait_for(|st| *st >= state)
182 .await
183 .map(|x| x.clone())
184 .ok()
185 }
186
187 pub(crate) fn into_stream(self) -> tokio_stream::wrappers::WatchStream<StanzaState> {
188 tokio_stream::wrappers::WatchStream::new(self.inner)
189 }
190
191 /// Read the current transmission state.
192 pub fn state(&self) -> StanzaState {
193 self.inner.borrow().clone()
194 }
195}
196
197pub(super) struct QueueEntry {
198 pub stanza: Box<Stanza>,
199 pub token: watch::Sender<StanzaState>,
200}
201
202impl QueueEntry {
203 pub fn untracked(st: Box<Stanza>) -> Self {
204 Self::tracked(st).0
205 }
206
207 pub fn tracked(st: Box<Stanza>) -> (Self, StanzaToken) {
208 let (tx, rx) = watch::channel(StanzaState::Queued);
209 let token = StanzaToken { inner: rx };
210 (
211 QueueEntry {
212 stanza: st,
213 token: tx,
214 },
215 token,
216 )
217 }
218}
219
220/// Reference to a transmit queue entry.
221///
222/// On drop, the entry is returned to the queue.
223pub(super) struct TransmitQueueRef<'x, T> {
224 q: &'x mut VecDeque<T>,
225}
226
227impl<'x, T> TransmitQueueRef<'x, T> {
228 /// Take the item out of the queue.
229 pub fn take(self) -> T {
230 // Unwrap: when this type is created, a check is made that the queue
231 // actually has a front item and because we borrow, that also cannot
232 // change.
233 self.q.pop_front().unwrap()
234 }
235}
236
237/// A transmit queue coupled to an [`mpsc::Receiver`].
238///
239/// The transmit queue will by default only allow one element to reside in the
240/// queue outside the inner `Receiver`: the main queueing happens inside the
241/// receiver and is governed by its queue depth and associated backpressure.
242///
243/// However, the queue does allow prepending elements to the front, which is
244/// useful for retransmitting items.
245pub(super) struct TransmitQueue<T: Unpin> {
246 inner: mpsc::Receiver<T>,
247 peek: VecDeque<T>,
248}
249
250impl<T: Unpin> TransmitQueue<T> {
251 /// Create a new transmission queue around an existing mpsc receiver.
252 pub fn wrap(ch: mpsc::Receiver<T>) -> Self {
253 Self {
254 inner: ch,
255 peek: VecDeque::with_capacity(1),
256 }
257 }
258
259 /// Create a new mpsc channel and wrap the receiving side in a
260 /// transmission queue
261 pub fn channel(depth: usize) -> (mpsc::Sender<T>, Self) {
262 let (tx, rx) = mpsc::channel(depth);
263 (tx, Self::wrap(rx))
264 }
265
266 /// Poll the queue for the next item to transmit.
267 pub fn poll_next(&mut self, cx: &mut Context) -> Poll<Option<TransmitQueueRef<'_, T>>> {
268 if self.peek.len() > 0 {
269 // Cannot use `if let Some(.) = .` here because of a borrowchecker
270 // restriction. If the reference is created before the branch is
271 // entered, it will think it needs to be borrowed until the end
272 // of the function (and that will conflict with the mutable
273 // borrow we do for `self.peek.push_back` below).
274 // See also https://github.com/rust-lang/rust/issues/54663.
275 return Poll::Ready(Some(TransmitQueueRef { q: &mut self.peek }));
276 } else {
277 // The target size for the queue is 1, effectively acting as an
278 // Option<T>. In some cases, we need more than one, but that is
279 // always only a temporary burst (e.g. SM resumption
280 // retransmissions), so we release the memory as soon as possible
281 // after that.
282 // Even though the target size is 1, we don't want to be pedantic
283 // about this and we don't want to reallocate often. Some short
284 // bursts are ok, and given that the stanzas inside QueueEntry
285 // elements (the main use case for this type) are boxed anyway,
286 // the size of the elements is rather small.
287 if self.peek.capacity() > 32 {
288 // We do not use shrink_to here, because we are *certain* that
289 // we won't need a larger capacity any time soon, and
290 // allocators may avoid moving data around.
291 let mut new = VecDeque::new();
292 core::mem::swap(&mut self.peek, &mut new);
293 }
294 }
295 match ready!(self.inner.poll_recv(cx)) {
296 None => Poll::Ready(None),
297 Some(v) => {
298 self.peek.push_back(v);
299 Poll::Ready(Some(TransmitQueueRef { q: &mut self.peek }))
300 }
301 }
302 }
303
304 /// Requeue a sequence of items to the front of the queue.
305 ///
306 /// This function preserves ordering of the elements in `iter`, meaning
307 /// that the first item from `iter` is going to be the next item yielded
308 /// by `poll_take` or `poll_peek`.
309 pub fn requeue_all<I: IntoIterator<Item = T>>(&mut self, iter: I) {
310 let iter = iter.into_iter();
311 let to_reserve = iter.size_hint().1.unwrap_or(iter.size_hint().0);
312 self.peek.reserve(to_reserve);
313 let mut n = 0;
314 for item in iter {
315 self.peek.push_front(item);
316 n += 1;
317 }
318 // Now we need to revert the order: we pushed the elements to the
319 // front, so if we now read back from the front via poll_peek or
320 // poll_take, that will cause them to be read in reverse order. The
321 // following loop fixes that.
322 for i in 0..(n / 2) {
323 let j = n - (i + 1);
324 self.peek.swap(i, j);
325 }
326 }
327
328 /// Enqueues an item to be sent after all items in the *local* queue, but
329 /// *before* all items which are still inside the inner `mpsc` channel.
330 pub fn enqueue(&mut self, item: T) {
331 self.peek.push_back(item);
332 }
333
334 /// Return true if the sender side of the queue is closed.
335 ///
336 /// Note that there may still be items which can be retrieved from the
337 /// queue even though it has been closed.
338 pub fn is_closed(&self) -> bool {
339 self.inner.is_closed()
340 }
341}
342
343impl TransmitQueue<QueueEntry> {
344 /// Fail all currently queued items with the given error.
345 ///
346 /// Future items will not be affected.
347 pub fn fail(&mut self, error: &OpaqueIoError) {
348 for item in self.peek.drain(..) {
349 item.token.send_replace(StanzaState::Failed {
350 error: error.clone(),
351 });
352 }
353 while let Ok(item) = self.inner.try_recv() {
354 item.token.send_replace(StanzaState::Failed {
355 error: error.clone(),
356 });
357 }
358 self.peek.shrink_to(1);
359 }
360}