tokio_xmpp/stanzastream/queue.rs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360
// Copyright (c) 2019 Emmanuel Gil Peyrot <linkmauve@linkmauve.fr>
//
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
use core::cmp::Ordering;
use core::fmt;
use core::task::{Context, Poll};
use std::collections::VecDeque;
use std::io;
use futures::ready;
use tokio::sync::{mpsc, watch};
use crate::Stanza;
#[derive(Debug, Clone)]
pub struct OpaqueIoError {
kind: io::ErrorKind,
message: String,
}
impl OpaqueIoError {
pub fn kind(&self) -> io::ErrorKind {
self.kind
}
pub fn into_io_error(self) -> io::Error {
io::Error::new(self.kind, self.message)
}
pub fn to_io_error(&self) -> io::Error {
io::Error::new(self.kind, self.message.clone())
}
}
impl From<io::Error> for OpaqueIoError {
fn from(other: io::Error) -> Self {
<Self as From<&io::Error>>::from(&other)
}
}
impl From<&io::Error> for OpaqueIoError {
fn from(other: &io::Error) -> Self {
Self {
kind: other.kind(),
message: other.to_string(),
}
}
}
impl fmt::Display for OpaqueIoError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.write_str(&self.message)
}
}
impl core::error::Error for OpaqueIoError {}
/// The five stages of stanza transmission.
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone, Copy)]
pub enum StanzaStage {
/// The stanza is in the transmit queue, but has not been serialised or
/// sent to the stream yet.
Queued,
/// The stanza was successfully serialised and put into the transmit
/// buffers.
Sent,
/// The stanza has been acked by the peer using XEP-0198 or comparable
/// means.
///
/// **Note:** This state is only ever reached on streams where XEP-0198
/// was succesfully negotiated.
Acked,
/// Stanza transmission or serialisation failed.
Failed,
/// The stanza was dropped from the transmit queue before it could be
/// sent.
///
/// This may happen if the stream breaks in a fatal, panick-y way.
Dropped,
}
impl From<&StanzaState> for StanzaStage {
fn from(other: &StanzaState) -> Self {
match other {
StanzaState::Queued => Self::Queued,
StanzaState::Sent { .. } => Self::Sent,
StanzaState::Acked { .. } => Self::Acked,
StanzaState::Failed { .. } => Self::Failed,
StanzaState::Dropped => Self::Dropped,
}
}
}
impl PartialEq<StanzaStage> for StanzaState {
fn eq(&self, other: &StanzaStage) -> bool {
StanzaStage::from(self).eq(other)
}
}
impl PartialEq<StanzaState> for StanzaStage {
fn eq(&self, other: &StanzaState) -> bool {
self.eq(&Self::from(other))
}
}
impl PartialOrd<StanzaStage> for StanzaState {
fn partial_cmp(&self, other: &StanzaStage) -> Option<Ordering> {
StanzaStage::from(self).partial_cmp(other)
}
}
impl PartialOrd<StanzaState> for StanzaStage {
fn partial_cmp(&self, other: &StanzaState) -> Option<Ordering> {
self.partial_cmp(&Self::from(other))
}
}
/// State of a stanza in transit to the peer.
#[derive(Debug, Clone)]
pub enum StanzaState {
/// The stanza has been enqueued in the local queue but not sent yet.
Queued,
/// The stanza has been sent to the server, but there is no proof that it
/// has been received by the server yet.
Sent {
/*
/// The time from when the stanza was enqueued until the time it was
/// sent on the stream.
queue_delay: Duration,
*/
},
/// Confirmation that the stanza has been seen by the server has been
/// received.
Acked {
/*
/// The time from when the stanza was enqueued until the time it was
/// sent on the stream.
queue_delay: Duration,
/// The time between sending the stanza on the stream and receiving
/// confirmation from the server.
ack_delay: Duration,
*/
},
/// Sending the stanza has failed in a non-recoverable manner.
Failed {
/// The error which caused the sending to fail.
error: OpaqueIoError,
},
/// The stanza was dropped out of the queue for unspecified reasons,
/// such as the stream breaking in a fatal, panick-y way.
Dropped,
}
/// Track stanza transmission through the
/// [`StanzaStream`][`super::StanzaStream`] up to the peer.
#[derive(Clone)]
pub struct StanzaToken {
inner: watch::Receiver<StanzaState>,
}
impl StanzaToken {
/// Wait for the stanza transmission to reach the given state.
///
/// If the stanza is removed from tracking before that state is reached,
/// `None` is returned.
pub async fn wait_for(&mut self, state: StanzaStage) -> Option<StanzaState> {
self.inner
.wait_for(|st| *st >= state)
.await
.map(|x| x.clone())
.ok()
}
pub(crate) fn into_stream(self) -> tokio_stream::wrappers::WatchStream<StanzaState> {
tokio_stream::wrappers::WatchStream::new(self.inner)
}
/// Read the current transmission state.
pub fn state(&self) -> StanzaState {
self.inner.borrow().clone()
}
}
pub(super) struct QueueEntry {
pub stanza: Box<Stanza>,
pub token: watch::Sender<StanzaState>,
}
impl QueueEntry {
pub fn untracked(st: Box<Stanza>) -> Self {
Self::tracked(st).0
}
pub fn tracked(st: Box<Stanza>) -> (Self, StanzaToken) {
let (tx, rx) = watch::channel(StanzaState::Queued);
let token = StanzaToken { inner: rx };
(
QueueEntry {
stanza: st,
token: tx,
},
token,
)
}
}
/// Reference to a transmit queue entry.
///
/// On drop, the entry is returned to the queue.
pub(super) struct TransmitQueueRef<'x, T> {
q: &'x mut VecDeque<T>,
}
impl<'x, T> TransmitQueueRef<'x, T> {
/// Take the item out of the queue.
pub fn take(self) -> T {
// Unwrap: when this type is created, a check is made that the queue
// actually has a front item and because we borrow, that also cannot
// change.
self.q.pop_front().unwrap()
}
}
/// A transmit queue coupled to an [`mpsc::Receiver`].
///
/// The transmit queue will by default only allow one element to reside in the
/// queue outside the inner `Receiver`: the main queueing happens inside the
/// receiver and is governed by its queue depth and associated backpressure.
///
/// However, the queue does allow prepending elements to the front, which is
/// useful for retransmitting items.
pub(super) struct TransmitQueue<T: Unpin> {
inner: mpsc::Receiver<T>,
peek: VecDeque<T>,
}
impl<T: Unpin> TransmitQueue<T> {
/// Create a new transmission queue around an existing mpsc receiver.
pub fn wrap(ch: mpsc::Receiver<T>) -> Self {
Self {
inner: ch,
peek: VecDeque::with_capacity(1),
}
}
/// Create a new mpsc channel and wrap the receiving side in a
/// transmission queue
pub fn channel(depth: usize) -> (mpsc::Sender<T>, Self) {
let (tx, rx) = mpsc::channel(depth);
(tx, Self::wrap(rx))
}
/// Poll the queue for the next item to transmit.
pub fn poll_next(&mut self, cx: &mut Context) -> Poll<Option<TransmitQueueRef<'_, T>>> {
if self.peek.len() > 0 {
// Cannot use `if let Some(.) = .` here because of a borrowchecker
// restriction. If the reference is created before the branch is
// entered, it will think it needs to be borrowed until the end
// of the function (and that will conflict with the mutable
// borrow we do for `self.peek.push_back` below).
// See also https://github.com/rust-lang/rust/issues/54663.
return Poll::Ready(Some(TransmitQueueRef { q: &mut self.peek }));
} else {
// The target size for the queue is 1, effectively acting as an
// Option<T>. In some cases, we need more than one, but that is
// always only a temporary burst (e.g. SM resumption
// retransmissions), so we release the memory as soon as possible
// after that.
// Even though the target size is 1, we don't want to be pedantic
// about this and we don't want to reallocate often. Some short
// bursts are ok, and given that the stanzas inside QueueEntry
// elements (the main use case for this type) are boxed anyway,
// the size of the elements is rather small.
if self.peek.capacity() > 32 {
// We do not use shrink_to here, because we are *certain* that
// we won't need a larger capacity any time soon, and
// allocators may avoid moving data around.
let mut new = VecDeque::new();
core::mem::swap(&mut self.peek, &mut new);
}
}
match ready!(self.inner.poll_recv(cx)) {
None => Poll::Ready(None),
Some(v) => {
self.peek.push_back(v);
Poll::Ready(Some(TransmitQueueRef { q: &mut self.peek }))
}
}
}
/// Requeue a sequence of items to the front of the queue.
///
/// This function preserves ordering of the elements in `iter`, meaning
/// that the first item from `iter` is going to be the next item yielded
/// by `poll_take` or `poll_peek`.
pub fn requeue_all<I: IntoIterator<Item = T>>(&mut self, iter: I) {
let iter = iter.into_iter();
let to_reserve = iter.size_hint().1.unwrap_or(iter.size_hint().0);
self.peek.reserve(to_reserve);
let mut n = 0;
for item in iter {
self.peek.push_front(item);
n += 1;
}
// Now we need to revert the order: we pushed the elements to the
// front, so if we now read back from the front via poll_peek or
// poll_take, that will cause them to be read in reverse order. The
// following loop fixes that.
for i in 0..(n / 2) {
let j = n - (i + 1);
self.peek.swap(i, j);
}
}
/// Enqueues an item to be sent after all items in the *local* queue, but
/// *before* all items which are still inside the inner `mpsc` channel.
pub fn enqueue(&mut self, item: T) {
self.peek.push_back(item);
}
/// Return true if the sender side of the queue is closed.
///
/// Note that there may still be items which can be retrieved from the
/// queue even though it has been closed.
pub fn is_closed(&self) -> bool {
self.inner.is_closed()
}
}
impl TransmitQueue<QueueEntry> {
/// Fail all currently queued items with the given error.
///
/// Future items will not be affected.
pub fn fail(&mut self, error: &OpaqueIoError) {
for item in self.peek.drain(..) {
item.token.send_replace(StanzaState::Failed {
error: error.clone(),
});
}
while let Ok(item) = self.inner.try_recv() {
item.token.send_replace(StanzaState::Failed {
error: error.clone(),
});
}
self.peek.shrink_to(1);
}
}