tokio_xmpp/stanzastream/stream_management.rs
1// Copyright (c) 2019 Emmanuel Gil Peyrot <linkmauve@linkmauve.fr>
2//
3// This Source Code Form is subject to the terms of the Mozilla Public
4// License, v. 2.0. If a copy of the MPL was not distributed with this
5// file, You can obtain one at http://mozilla.org/MPL/2.0/.
6
7use core::fmt;
8use std::collections::{vec_deque, VecDeque};
9
10use xmpp_parsers::sm;
11
12use super::queue::{QueueEntry, StanzaState};
13
14#[derive(Debug)]
15pub(super) enum SmResumeInfo {
16 NotResumable,
17 Resumable {
18 /// XEP-0198 stream ID
19 id: String,
20
21 /// Preferred IP and port for resumption as indicated by the peer.
22 // TODO: pass this to the reconnection logic.
23 #[allow(dead_code)]
24 location: Option<String>,
25 },
26}
27
28/// State for stream management
29pub(super) struct SmState {
30 /// Last value seen from the remote stanza counter.
31 outbound_base: u32,
32
33 /// Counter for received stanzas
34 inbound_ctr: u32,
35
36 /// Number of `<sm:a/>` we still need to send.
37 ///
38 /// Acks cannot always be sent right away (if our tx buffer is full), and
39 /// instead of cluttering our outbound queue or something with them, we
40 /// just keep a counter of unsanswered `<sm:r/>`. The stream will process
41 /// these in due time.
42 pub(super) pending_acks: usize,
43
44 /// Flag indicating that a `<sm:r/>` request should be sent.
45 pub(super) pending_req: bool,
46
47 /// Information about resumability of the stream
48 resumption: SmResumeInfo,
49
50 /// Unacked stanzas in the order they were sent
51 // We use a VecDeque here because that has better performance
52 // characteristics with the ringbuffer-type usage we're seeing here:
53 // we push stuff to the back, and then drain it from the front. Vec would
54 // have to move all the data around all the time, while VecDeque will just
55 // move some pointers around.
56 unacked_stanzas: VecDeque<QueueEntry>,
57}
58
59impl fmt::Debug for SmState {
60 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
61 f.debug_struct("SmState")
62 .field("outbound_base", &self.outbound_base)
63 .field("inbound_ctr", &self.inbound_ctr)
64 .field("resumption", &self.resumption)
65 .field("len(unacked_stanzas)", &self.unacked_stanzas.len())
66 .finish()
67 }
68}
69
70#[derive(Debug)]
71pub(super) enum SmError {
72 RemoteAckedMoreStanzas {
73 local_base: u32,
74 queue_len: u32,
75 remote_ctr: u32,
76 },
77 RemoteAckWentBackwards {
78 local_base: u32,
79 // NOTE: this is not needed to fully specify the error, but it's
80 // needed to generate a `<handled-count-too-high/>` from Self.
81 queue_len: u32,
82 remote_ctr: u32,
83 },
84}
85
86impl From<SmError> for xmpp_parsers::stream_error::StreamError {
87 fn from(other: SmError) -> Self {
88 let (h, send_count) = match other {
89 SmError::RemoteAckedMoreStanzas {
90 local_base,
91 queue_len,
92 remote_ctr,
93 } => (remote_ctr, local_base.wrapping_add(queue_len)),
94 SmError::RemoteAckWentBackwards {
95 local_base,
96 queue_len,
97 remote_ctr,
98 } => (remote_ctr, local_base.wrapping_add(queue_len)),
99 };
100 xmpp_parsers::sm::HandledCountTooHigh { h, send_count }.into()
101 }
102}
103
104impl fmt::Display for SmError {
105 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
106 match self {
107 Self::RemoteAckedMoreStanzas {
108 local_base,
109 queue_len,
110 remote_ctr,
111 } => {
112 let local_tip = local_base.wrapping_add(*queue_len);
113 write!(f, "remote acked more stanzas than we sent: remote counter = {}. queue covers range {}..<{}", remote_ctr, local_base, local_tip)
114 }
115 Self::RemoteAckWentBackwards {
116 local_base,
117 remote_ctr,
118 ..
119 } => {
120 write!(f, "remote acked less stanzas than before: remote counter = {}, local queue starts at {}", remote_ctr, local_base)
121 }
122 }
123 }
124}
125
126impl SmState {
127 /// Mark a stanza as sent and keep it in the stream management queue.
128 pub fn enqueue(&mut self, entry: QueueEntry) {
129 // This may seem like an arbitrary limit, but there's some thought
130 // in this.
131 // First, the SM counters go up to u32 at most and then wrap around.
132 // That means that any queue size larger than u32 would immediately
133 // cause ambiguities when resuming.
134 // Second, there's RFC 1982 "Serial Number Arithmetic". It is used for
135 // example in DNS for the serial number and it has thoughts on how to
136 // use counters which wrap around at some point. The document proposes
137 // that if the (wrapped) difference between two numbers is larger than
138 // half the number space, you should consider it as a negative
139 // difference.
140 //
141 // Hence the ambiguity already starts at u32::MAX / 2, so we limit the
142 // queue to one less than that.
143 const MAX_QUEUE_SIZE: usize = (u32::MAX / 2 - 1) as usize;
144 if self.unacked_stanzas.len() >= MAX_QUEUE_SIZE {
145 // We don't bother with an error return here. u32::MAX / 2 stanzas
146 // in the queue is fatal in any circumstance I can fathom (also,
147 // we have no way to return this error to the
148 // [`StanzaStream::send`] call anyway).
149 panic!("Too many pending stanzas.");
150 }
151
152 self.unacked_stanzas.push_back(entry);
153 log::trace!(
154 "Stored stanza in SmState. We are now at {} unacked stanzas.",
155 self.unacked_stanzas.len()
156 );
157 }
158
159 /// Process resumption.
160 ///
161 /// Updates the internal state according to the received remote counter.
162 /// Returns an iterator which yields the queue entries which need to be
163 /// retransmitted.
164 pub fn resume(&mut self, h: u32) -> Result<vec_deque::Drain<'_, QueueEntry>, SmError> {
165 self.remote_acked(h)?;
166 // Return the entire leftover queue. We cannot receive acks for them,
167 // unless they are retransmitted, because the peer has not seen them
168 // yet (they got lost in the previous unclean disconnect).
169 Ok(self.unacked_stanzas.drain(..))
170 }
171
172 /// Process remote `<a/>`
173 pub fn remote_acked(&mut self, h: u32) -> Result<(), SmError> {
174 log::debug!("remote_acked: {self:?}::remote_acked({h})");
175 // XEP-0198 specifies that counters are mod 2^32, which is handy when
176 // you use u32 data types :-).
177 let to_drop = h.wrapping_sub(self.outbound_base) as usize;
178 if to_drop > 0 {
179 log::trace!("remote_acked: need to drop {to_drop} stanzas");
180 if to_drop as usize > self.unacked_stanzas.len() {
181 if to_drop as u32 > u32::MAX / 2 {
182 // If we look at the stanza counter values as RFC 1982
183 // values, a wrapping difference greater than half the
184 // number space indicates a negative difference, i.e.
185 // h went backwards.
186 return Err(SmError::RemoteAckWentBackwards {
187 local_base: self.outbound_base,
188 queue_len: self.unacked_stanzas.len() as u32,
189 remote_ctr: h,
190 });
191 } else {
192 return Err(SmError::RemoteAckedMoreStanzas {
193 local_base: self.outbound_base,
194 queue_len: self.unacked_stanzas.len() as u32,
195 remote_ctr: h,
196 });
197 }
198 }
199 for entry in self.unacked_stanzas.drain(..to_drop) {
200 entry.token.send_replace(StanzaState::Acked {});
201 }
202 self.outbound_base = h;
203 log::debug!("remote_acked: remote acked {to_drop} stanzas");
204 Ok(())
205 } else {
206 log::trace!("remote_acked: no stanzas to drop");
207 Ok(())
208 }
209 }
210
211 /// Get the current inbound counter.
212 #[inline(always)]
213 pub fn inbound_ctr(&self) -> u32 {
214 self.inbound_ctr
215 }
216
217 /// Get the info necessary for resumption.
218 ///
219 /// Returns the stream ID and the current inbound counter if resumption is
220 /// available and None otherwise.
221 pub fn resume_info(&self) -> Option<(&str, u32)> {
222 match self.resumption {
223 SmResumeInfo::Resumable { ref id, .. } => Some((id, self.inbound_ctr)),
224 SmResumeInfo::NotResumable => None,
225 }
226 }
227}
228
229/// Initialize stream management state
230impl From<sm::Enabled> for SmState {
231 fn from(other: sm::Enabled) -> Self {
232 let resumption = if other.resume {
233 match other.id {
234 Some(id) => SmResumeInfo::Resumable {
235 location: other.location,
236 id: id.0,
237 },
238 None => {
239 log::warn!("peer replied with <enable resume='true'/>, but without an ID! cannot make this stream resumable.");
240 SmResumeInfo::NotResumable
241 }
242 }
243 } else {
244 SmResumeInfo::NotResumable
245 };
246
247 Self {
248 outbound_base: 0,
249 inbound_ctr: 0,
250 pending_acks: 0,
251 pending_req: false,
252 resumption,
253 unacked_stanzas: VecDeque::new(),
254 }
255 }
256}