tokio_xmpp/stanzastream/
stream_management.rs

1// Copyright (c) 2019 Emmanuel Gil Peyrot <linkmauve@linkmauve.fr>
2//
3// This Source Code Form is subject to the terms of the Mozilla Public
4// License, v. 2.0. If a copy of the MPL was not distributed with this
5// file, You can obtain one at http://mozilla.org/MPL/2.0/.
6
7use core::fmt;
8use std::collections::{vec_deque, VecDeque};
9
10use xmpp_parsers::sm;
11
12use super::queue::{QueueEntry, StanzaState};
13
14#[derive(Debug)]
15pub(super) enum SmResumeInfo {
16    NotResumable,
17    Resumable {
18        /// XEP-0198 stream ID
19        id: String,
20
21        /// Preferred IP and port for resumption as indicated by the peer.
22        // TODO: pass this to the reconnection logic.
23        #[allow(dead_code)]
24        location: Option<String>,
25    },
26}
27
28/// State for stream management
29pub(super) struct SmState {
30    /// Last value seen from the remote stanza counter.
31    outbound_base: u32,
32
33    /// Counter for received stanzas
34    inbound_ctr: u32,
35
36    /// Number of `<sm:a/>` we still need to send.
37    ///
38    /// Acks cannot always be sent right away (if our tx buffer is full), and
39    /// instead of cluttering our outbound queue or something with them, we
40    /// just keep a counter of unsanswered `<sm:r/>`. The stream will process
41    /// these in due time.
42    pub(super) pending_acks: usize,
43
44    /// Flag indicating that a `<sm:r/>` request should be sent.
45    pub(super) pending_req: bool,
46
47    /// Information about resumability of the stream
48    resumption: SmResumeInfo,
49
50    /// Unacked stanzas in the order they were sent
51    // We use a VecDeque here because that has better performance
52    // characteristics with the ringbuffer-type usage we're seeing here:
53    // we push stuff to the back, and then drain it from the front. Vec would
54    // have to move all the data around all the time, while VecDeque will just
55    // move some pointers around.
56    unacked_stanzas: VecDeque<QueueEntry>,
57}
58
59impl fmt::Debug for SmState {
60    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
61        f.debug_struct("SmState")
62            .field("outbound_base", &self.outbound_base)
63            .field("inbound_ctr", &self.inbound_ctr)
64            .field("resumption", &self.resumption)
65            .field("len(unacked_stanzas)", &self.unacked_stanzas.len())
66            .finish()
67    }
68}
69
70#[derive(Debug)]
71pub(super) enum SmError {
72    RemoteAckedMoreStanzas {
73        local_base: u32,
74        queue_len: u32,
75        remote_ctr: u32,
76    },
77    RemoteAckWentBackwards {
78        local_base: u32,
79        // NOTE: this is not needed to fully specify the error, but it's
80        // needed to generate a `<handled-count-too-high/>` from Self.
81        queue_len: u32,
82        remote_ctr: u32,
83    },
84}
85
86impl From<SmError> for xmpp_parsers::stream_error::StreamError {
87    fn from(other: SmError) -> Self {
88        let (h, send_count) = match other {
89            SmError::RemoteAckedMoreStanzas {
90                local_base,
91                queue_len,
92                remote_ctr,
93            } => (remote_ctr, local_base.wrapping_add(queue_len)),
94            SmError::RemoteAckWentBackwards {
95                local_base,
96                queue_len,
97                remote_ctr,
98            } => (remote_ctr, local_base.wrapping_add(queue_len)),
99        };
100        xmpp_parsers::sm::HandledCountTooHigh { h, send_count }.into()
101    }
102}
103
104impl fmt::Display for SmError {
105    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
106        match self {
107            Self::RemoteAckedMoreStanzas {
108                local_base,
109                queue_len,
110                remote_ctr,
111            } => {
112                let local_tip = local_base.wrapping_add(*queue_len);
113                write!(f, "remote acked more stanzas than we sent: remote counter = {}. queue covers range {}..<{}", remote_ctr, local_base, local_tip)
114            }
115            Self::RemoteAckWentBackwards {
116                local_base,
117                remote_ctr,
118                ..
119            } => {
120                write!(f, "remote acked less stanzas than before: remote counter = {}, local queue starts at {}", remote_ctr, local_base)
121            }
122        }
123    }
124}
125
126impl SmState {
127    /// Mark a stanza as sent and keep it in the stream management queue.
128    pub fn enqueue(&mut self, entry: QueueEntry) {
129        // This may seem like an arbitrary limit, but there's some thought
130        // in this.
131        // First, the SM counters go up to u32 at most and then wrap around.
132        // That means that any queue size larger than u32 would immediately
133        // cause ambiguities when resuming.
134        // Second, there's RFC 1982 "Serial Number Arithmetic". It is used for
135        // example in DNS for the serial number and it has thoughts on how to
136        // use counters which wrap around at some point. The document proposes
137        // that if the (wrapped) difference between two numbers is larger than
138        // half the number space, you should consider it as a negative
139        // difference.
140        //
141        // Hence the ambiguity already starts at u32::MAX / 2, so we limit the
142        // queue to one less than that.
143        const MAX_QUEUE_SIZE: usize = (u32::MAX / 2 - 1) as usize;
144        if self.unacked_stanzas.len() >= MAX_QUEUE_SIZE {
145            // We don't bother with an error return here. u32::MAX / 2 stanzas
146            // in the queue is fatal in any circumstance I can fathom (also,
147            // we have no way to return this error to the
148            // [`StanzaStream::send`] call anyway).
149            panic!("Too many pending stanzas.");
150        }
151
152        self.unacked_stanzas.push_back(entry);
153        log::trace!(
154            "Stored stanza in SmState. We are now at {} unacked stanzas.",
155            self.unacked_stanzas.len()
156        );
157    }
158
159    /// Process resumption.
160    ///
161    /// Updates the internal state according to the received remote counter.
162    /// Returns an iterator which yields the queue entries which need to be
163    /// retransmitted.
164    pub fn resume(&mut self, h: u32) -> Result<vec_deque::Drain<'_, QueueEntry>, SmError> {
165        self.remote_acked(h)?;
166        // Return the entire leftover queue. We cannot receive acks for them,
167        // unless they are retransmitted, because the peer has not seen them
168        // yet (they got lost in the previous unclean disconnect).
169        Ok(self.unacked_stanzas.drain(..))
170    }
171
172    /// Process remote `<a/>`
173    pub fn remote_acked(&mut self, h: u32) -> Result<(), SmError> {
174        log::debug!("remote_acked: {self:?}::remote_acked({h})");
175        // XEP-0198 specifies that counters are mod 2^32, which is handy when
176        // you use u32 data types :-).
177        let to_drop = h.wrapping_sub(self.outbound_base) as usize;
178        if to_drop > 0 {
179            log::trace!("remote_acked: need to drop {to_drop} stanzas");
180            if to_drop as usize > self.unacked_stanzas.len() {
181                if to_drop as u32 > u32::MAX / 2 {
182                    // If we look at the stanza counter values as RFC 1982
183                    // values, a wrapping difference greater than half the
184                    // number space indicates a negative difference, i.e.
185                    // h went backwards.
186                    return Err(SmError::RemoteAckWentBackwards {
187                        local_base: self.outbound_base,
188                        queue_len: self.unacked_stanzas.len() as u32,
189                        remote_ctr: h,
190                    });
191                } else {
192                    return Err(SmError::RemoteAckedMoreStanzas {
193                        local_base: self.outbound_base,
194                        queue_len: self.unacked_stanzas.len() as u32,
195                        remote_ctr: h,
196                    });
197                }
198            }
199            for entry in self.unacked_stanzas.drain(..to_drop) {
200                entry.token.send_replace(StanzaState::Acked {});
201            }
202            self.outbound_base = h;
203            log::debug!("remote_acked: remote acked {to_drop} stanzas");
204            Ok(())
205        } else {
206            log::trace!("remote_acked: no stanzas to drop");
207            Ok(())
208        }
209    }
210
211    /// Get the current inbound counter.
212    #[inline(always)]
213    pub fn inbound_ctr(&self) -> u32 {
214        self.inbound_ctr
215    }
216
217    /// Get the info necessary for resumption.
218    ///
219    /// Returns the stream ID and the current inbound counter if resumption is
220    /// available and None otherwise.
221    pub fn resume_info(&self) -> Option<(&str, u32)> {
222        match self.resumption {
223            SmResumeInfo::Resumable { ref id, .. } => Some((id, self.inbound_ctr)),
224            SmResumeInfo::NotResumable => None,
225        }
226    }
227}
228
229/// Initialize stream management state
230impl From<sm::Enabled> for SmState {
231    fn from(other: sm::Enabled) -> Self {
232        let resumption = if other.resume {
233            match other.id {
234                Some(id) => SmResumeInfo::Resumable {
235                    location: other.location,
236                    id: id.0,
237                },
238                None => {
239                    log::warn!("peer replied with <enable resume='true'/>, but without an ID! cannot make this stream resumable.");
240                    SmResumeInfo::NotResumable
241                }
242            }
243        } else {
244            SmResumeInfo::NotResumable
245        };
246
247        Self {
248            outbound_base: 0,
249            inbound_ctr: 0,
250            pending_acks: 0,
251            pending_req: false,
252            resumption,
253            unacked_stanzas: VecDeque::new(),
254        }
255    }
256}