Skip to main content

tokio_xmpp/stanzastream/
negotiation.rs

1// Copyright (c) 2019 Emmanuel Gil Peyrot <linkmauve@linkmauve.fr>
2//
3// This Source Code Form is subject to the terms of the Mozilla Public
4// License, v. 2.0. If a copy of the MPL was not distributed with this
5// file, You can obtain one at http://mozilla.org/MPL/2.0/.
6
7use core::ops::ControlFlow::{self, Break, Continue};
8use core::pin::Pin;
9use core::task::{Context, Poll};
10use std::io;
11
12use futures::{ready, Sink, Stream};
13
14use xmpp_parsers::{
15    bind::{BindQuery, BindResponse},
16    iq::Iq,
17    jid::{FullJid, Jid},
18    sm,
19    stream_error::{DefinedCondition, StreamError},
20    stream_features::StreamFeatures,
21};
22
23use crate::xmlstream::{ReadError, XmppStreamElement};
24use crate::Stanza;
25
26use super::queue::{QueueEntry, TransmitQueue};
27use super::stream_management::*;
28use super::worker::{parse_error_to_stream_error, XmppStream};
29
30static BIND_REQ_ID: &str = "resource-binding";
31
32pub(super) enum NegotiationState {
33    /// Send request to enable or resume stream management.
34    SendSmRequest {
35        /// Stream management state to use. If present, resumption will be
36        /// attempted. Otherwise, a fresh session will be established.
37        sm_state: Option<SmState>,
38
39        /// If the stream has been freshly bound, we carry the bound JID along
40        /// with us.
41        bound_jid: Option<FullJid>,
42    },
43
44    /// Await the response to the SM enable/resume request.
45    ReceiveSmResponse {
46        /// State to use.
47        sm_state: Option<SmState>,
48
49        /// If the stream has been freshly bound, we carry the bound JID along
50        /// with us.
51        bound_jid: Option<FullJid>,
52    },
53
54    /// Send a new request to bind to a resource.
55    SendBindRequest { sm_supported: bool },
56
57    /// Receive the bind response.
58    ReceiveBindResponse { sm_supported: bool },
59}
60
61/// The ultimate result of a stream negotiation.
62pub(super) enum NegotiationResult {
63    /// An unplanned disconnect happened or a stream error was received from
64    /// the remote party.
65    Disconnect {
66        /// Stream management state for a later resumption attempt.
67        sm_state: Option<SmState>,
68
69        /// I/O error which came along the disconnect.
70        error: io::Error,
71    },
72
73    /// The negotiation completed successfully, but the stream was reset (i.e.
74    /// stream management and all session state was lost).
75    StreamReset {
76        /// Stream management state. This may still be non-None if the new
77        /// stream has successfully negotiated stream management.
78        sm_state: Option<SmState>,
79
80        /// The JID to which the stream is now bound.
81        bound_jid: Jid,
82    },
83
84    /// The negotiation completed successfully and a previous session was
85    /// resumed.
86    StreamResumed {
87        /// Negotiated stream management state.
88        sm_state: SmState,
89    },
90
91    /// The negotiation failed and we need to emit a stream error.
92    ///
93    /// **Note:** Stream errors *received* from the peer are signalled using
94    /// [`Self::Disconnect`] instead, with an I/O error of kind `Other`.
95    StreamError {
96        /// Stream error to send to the remote party with details about the
97        /// failure.
98        error: StreamError,
99    },
100}
101
102impl NegotiationState {
103    pub fn new(features: &StreamFeatures, sm_state: Option<SmState>) -> io::Result<Self> {
104        if let Some(sm_state) = sm_state {
105            if features.stream_management.is_some() {
106                return Ok(Self::SendSmRequest {
107                    sm_state: Some(sm_state),
108                    bound_jid: None,
109                });
110            } else {
111                log::warn!("Peer is not offering stream management anymore. Dropping state.");
112            }
113        }
114
115        if !features.can_bind() {
116            return Err(io::Error::new(
117                io::ErrorKind::InvalidData,
118                "Peer is not offering the bind feature. Cannot proceed with stream negotiation.",
119            ));
120        }
121
122        Ok(Self::SendBindRequest {
123            sm_supported: features.stream_management.is_some(),
124        })
125    }
126
127    fn flush(stream: Pin<&mut XmppStream>, cx: &mut Context) -> ControlFlow<io::Error, ()> {
128        match <XmppStream as Sink<&XmppStreamElement>>::poll_flush(stream, cx) {
129            Poll::Pending | Poll::Ready(Ok(())) => Continue(()),
130            Poll::Ready(Err(error)) => Break(error),
131        }
132    }
133
134    pub fn advance(
135        &mut self,
136        mut stream: Pin<&mut XmppStream>,
137        jid: &Jid,
138        transmit_queue: &mut TransmitQueue<QueueEntry>,
139        cx: &mut Context<'_>,
140    ) -> Poll<ControlFlow<NegotiationResult, Option<Stanza>>> {
141        // When sending requests, we need to wait for the stream to become
142        // ready to send and then send the corresponding request.
143        // Note that if this wasn't a fresh stream (which it always is!),
144        // doing it in this kind of simplex fashion could lead to deadlocks
145        // (because we are blockingly sending without attempting to receive: a
146        // peer could stop receiving from our side if their tx buffer was too
147        // full or smth). However, because this stream is fresh, we know that
148        // our tx buffers are empty enough that this will succeed quickly, so
149        // that we can proceed.
150        // TODO: define a deadline for negotiation.
151        match self {
152            Self::SendBindRequest { sm_supported } => {
153                match ready!(<XmppStream as Sink<&Stanza>>::poll_ready(
154                    stream.as_mut(),
155                    cx
156                )) {
157                    // We can send.
158                    Ok(()) => (),
159
160                    // Stream broke horribly.
161                    Err(error) => {
162                        return Poll::Ready(Break(NegotiationResult::Disconnect {
163                            sm_state: None,
164                            error,
165                        }))
166                    }
167                };
168
169                let resource = jid.resource().map(|x| x.as_str().to_owned());
170                let stanza = Iq::from_set(BIND_REQ_ID, BindQuery::new(resource));
171                match stream.start_send(&stanza) {
172                    Ok(()) => (),
173                    Err(e) => panic!("failed to serialize BindQuery: {}", e),
174                };
175
176                *self = Self::ReceiveBindResponse {
177                    sm_supported: *sm_supported,
178                };
179                Poll::Ready(Continue(None))
180            }
181
182            Self::ReceiveBindResponse { sm_supported } => {
183                match Self::flush(stream.as_mut(), cx) {
184                    Break(error) => {
185                        return Poll::Ready(Break(NegotiationResult::Disconnect {
186                            sm_state: None,
187                            error,
188                        }))
189                    }
190                    Continue(()) => (),
191                }
192
193                let item = ready!(stream.poll_next(cx));
194                let item = item.unwrap_or_else(|| {
195                    Err(ReadError::HardError(io::Error::new(
196                        io::ErrorKind::UnexpectedEof,
197                        "eof before stream footer",
198                    )))
199                });
200                let item = item.map(|v| v.into_read_error()).flatten();
201
202                match item {
203                    Ok(XmppStreamElement::Stanza(data)) => match data {
204                        Stanza::Iq(iq) if iq.id() == BIND_REQ_ID => {
205                            let error = match iq {
206                                Iq::Result {
207                                    payload: Some(payload),
208                                    ..
209                                } => match BindResponse::try_from(payload) {
210                                    Ok(v) => {
211                                        let bound_jid = v.into();
212                                        if *sm_supported {
213                                            *self = Self::SendSmRequest {
214                                                sm_state: None,
215                                                bound_jid: Some(bound_jid),
216                                            };
217                                            return Poll::Ready(Continue(None));
218                                        } else {
219                                            return Poll::Ready(Break(
220                                                NegotiationResult::StreamReset {
221                                                    sm_state: None,
222                                                    bound_jid: Jid::from(bound_jid),
223                                                },
224                                            ));
225                                        }
226                                    }
227                                    Err(e) => e.to_string(),
228                                },
229                                Iq::Result { payload: None, .. } => {
230                                    "Bind response has no payload".to_owned()
231                                }
232                                _ => "Unexpected IQ type in response to bind request".to_owned(),
233                            };
234                            log::warn!("Received IQ matching the bind request, but parsing failed ({error})! Emitting stream error.");
235                            Poll::Ready(Break(NegotiationResult::StreamError {
236                                error: StreamError::new(
237                                    DefinedCondition::UndefinedCondition,
238                                    "en",
239                                    error,
240                                )
241                                .with_application_specific(vec![super::error::ParseError.into()]),
242                            }))
243                        }
244                        st => {
245                            log::warn!("Received unexpected stanza before response to bind request: {st:?}. Dropping.");
246                            Poll::Ready(Continue(None))
247                        }
248                    },
249
250                    Ok(XmppStreamElement::StreamError(error)) => {
251                        log::debug!("Received stream:error, failing stream and discarding any stream management state.");
252                        let error = io::Error::other(error);
253                        transmit_queue.fail(&(&error).into());
254                        Poll::Ready(Break(NegotiationResult::Disconnect {
255                            error,
256                            sm_state: None,
257                        }))
258                    }
259
260                    Ok(other) => {
261                        log::warn!("Received unsupported stream element during bind: {other:?}. Emitting stream error.");
262                        Poll::Ready(Break(NegotiationResult::StreamError {
263                            error: StreamError::new(
264                                DefinedCondition::UnsupportedStanzaType,
265                                "en",
266                                format!("Unsupported stream element during bind: {other:?}"),
267                            ),
268                        }))
269                    }
270
271                    // Soft timeouts during negotiation are a bad sign
272                    // (because we already prompted the server to send
273                    // something and are waiting for it), but also nothing
274                    // to write home about.
275                    Err(ReadError::SoftTimeout) => Poll::Ready(Continue(None)),
276
277                    // Parse errors during negotiation cause an unconditional
278                    // stream error.
279                    Err(ReadError::ParseError(e)) => {
280                        Poll::Ready(Break(NegotiationResult::StreamError {
281                            error: parse_error_to_stream_error(e),
282                        }))
283                    }
284
285                    // I/O errors cause the stream to be considered
286                    // broken; we drop it and send a Disconnect event with
287                    // the error embedded.
288                    Err(ReadError::HardError(error)) => {
289                        Poll::Ready(Break(NegotiationResult::Disconnect {
290                            sm_state: None,
291                            error,
292                        }))
293                    }
294
295                    // Stream footer during negotiation is really weird.
296                    // We kill the stream immediately with an error
297                    // (but allow preservation of the SM state).
298                    Err(ReadError::StreamFooterReceived) => {
299                        Poll::Ready(Break(NegotiationResult::Disconnect {
300                            sm_state: None,
301                            error: io::Error::new(
302                                io::ErrorKind::InvalidData,
303                                "stream footer received during negotiation",
304                            ),
305                        }))
306                    }
307                }
308            }
309
310            Self::SendSmRequest {
311                sm_state,
312                bound_jid,
313            } => {
314                match ready!(<XmppStream as Sink<&XmppStreamElement>>::poll_ready(
315                    stream.as_mut(),
316                    cx
317                )) {
318                    // We can send.
319                    Ok(()) => (),
320
321                    // Stream broke horribly.
322                    Err(error) => {
323                        return Poll::Ready(Break(NegotiationResult::Disconnect {
324                            sm_state: sm_state.take(),
325                            error,
326                        }))
327                    }
328                };
329
330                let nonza = if let Some((id, inbound_ctr)) =
331                    sm_state.as_ref().and_then(|x| x.resume_info())
332                {
333                    // Attempt resumption
334                    sm::Nonza::Resume(sm::Resume {
335                        h: inbound_ctr,
336                        previd: sm::StreamId(id.to_owned()),
337                    })
338                } else {
339                    // Attempt enabling
340                    sm::Nonza::Enable(sm::Enable {
341                        max: None,
342                        resume: true,
343                    })
344                };
345                match stream.start_send(&XmppStreamElement::SM(nonza)) {
346                    Ok(()) => (),
347                    Err(e) => {
348                        // We panic here, instead of returning an
349                        // error, because after we confirmed via
350                        // poll_ready that the stream is ready to
351                        // send, the only error returned by start_send
352                        // can be caused by our data.
353                        panic!("Failed to send SM nonza: {}", e);
354                    }
355                }
356
357                *self = Self::ReceiveSmResponse {
358                    sm_state: sm_state.take(),
359                    bound_jid: bound_jid.take(),
360                };
361                // Ask caller to poll us again immediately in order to
362                // start flushing the stream.
363                Poll::Ready(Continue(None))
364            }
365
366            Self::ReceiveSmResponse {
367                sm_state,
368                bound_jid,
369            } => {
370                match Self::flush(stream.as_mut(), cx) {
371                    Break(error) => {
372                        return Poll::Ready(Break(NegotiationResult::Disconnect {
373                            sm_state: sm_state.take(),
374                            error,
375                        }))
376                    }
377                    Continue(()) => (),
378                }
379
380                // So the difficulty here is that there's a possibility
381                // that we receive non-SM data while the SM negotiation
382                // is going on.
383
384                let item = ready!(stream.poll_next(cx));
385                let item = item.unwrap_or_else(|| {
386                    Err(ReadError::HardError(io::Error::new(
387                        io::ErrorKind::UnexpectedEof,
388                        "eof before stream footer",
389                    )))
390                });
391                let item = item.map(|v| v.into_read_error()).flatten();
392
393                match item {
394                    // Pre-SM data. Note that we mustn't count this while we
395                    // are still in negotiating state: we transit to
396                    // [`Self::Ready`] immediately after we got the
397                    // `<resumed/>` or `<enabled/>`, and before we got that,
398                    // counting inbound stanzas is definitely wrong (see e.g.
399                    // aioxmpp commit 796aa32).
400                    Ok(XmppStreamElement::Stanza(data)) => Poll::Ready(Continue(Some(data))),
401
402                    Ok(XmppStreamElement::SM(sm::Nonza::Enabled(enabled))) => {
403                        if sm_state.is_some() {
404                            // Okay, the peer violated the stream management
405                            // protocol here (or we have a bug).
406                            log::warn!(
407                                "Received <enabled/>, but we also have previous SM state. One of us has a bug here (us or the peer) and I'm not sure which it is. If you can reproduce this, please re-run with trace loglevel and provide the logs. Attempting to proceed with a fresh session.",
408                            );
409                        }
410                        // We must emit Reset here because this is a
411                        // fresh stream and we did not resume.
412                        Poll::Ready(Break(NegotiationResult::StreamReset {
413                            sm_state: Some(enabled.into()),
414                            bound_jid: bound_jid.take().expect("State machine error: no bound_jid available in SM negotiation.").into(),
415                        }))
416                    }
417
418                    Ok(XmppStreamElement::SM(sm::Nonza::Resumed(resumed))) => match sm_state.take()
419                    {
420                        Some(mut sm_state) => {
421                            // Yay!
422                            match sm_state.resume(resumed.h) {
423                                Ok(to_retransmit) => transmit_queue.requeue_all(to_retransmit),
424                                Err(e) => {
425                                    // We kill the stream with an error
426                                    log::error!("Resumption failed: {e}");
427                                    return Poll::Ready(Break(NegotiationResult::StreamError {
428                                        error: e.into(),
429                                    }));
430                                }
431                            }
432                            Poll::Ready(Break(NegotiationResult::StreamResumed { sm_state }))
433                        }
434                        None => {
435                            // Okay, the peer violated the stream management
436                            // protocol here (or we have a bug).
437                            // Unlike the
438                            // received-enabled-but-attempted-to-resume
439                            // situation, we do not have enough information to
440                            // proceed without having the stream break soon.
441                            // (If we proceeded without a SM state, we would
442                            // have the stream die as soon as the peer
443                            // requests our counters).
444                            // We thus terminate the stream with an error.
445                            // We must emit Reset here because this is a fresh
446                            // stream and we did not resume.
447                            Poll::Ready(Break(NegotiationResult::Disconnect {
448                                sm_state: None,
449                                error: io::Error::new(io::ErrorKind::InvalidData, "Peer replied to <sm:enable/> request with <sm:resumed/> response"),
450                            }))
451                        }
452                    },
453
454                    Ok(XmppStreamElement::SM(sm::Nonza::Failed(failed))) => match sm_state {
455                        Some(sm_state) => {
456                            log::debug!("Received <sm:failed/> in response to resumption request. Discarding SM data and attempting to renegotiate.");
457                            if let Some(h) = failed.h {
458                                // This is only an optimization anyway, so
459                                // we can also just ignore this.
460                                let _: Result<_, _> = sm_state.remote_acked(h);
461                            }
462                            *self = Self::SendBindRequest { sm_supported: true };
463                            Poll::Ready(Continue(None))
464                        }
465                        None => {
466                            log::warn!("Received <sm:failed/> in response to enable request. Proceeding without stream management.");
467
468                            // We must emit Reset here because this is a
469                            // fresh stream and we did not resume.
470                            Poll::Ready(Break(NegotiationResult::StreamReset {
471                                bound_jid: bound_jid.take().expect("State machine error: no bound_jid available in SM negotiation.").into(),
472                                sm_state: None,
473                            }))
474                        }
475                    },
476
477                    Ok(XmppStreamElement::StreamError(error)) => {
478                        log::debug!("Received stream error, failing stream and discarding any stream management state.");
479                        let error = io::Error::other(error);
480                        transmit_queue.fail(&(&error).into());
481                        Poll::Ready(Break(NegotiationResult::Disconnect {
482                            error,
483                            sm_state: None,
484                        }))
485                    }
486
487                    Ok(other) => {
488                        log::warn!("Received unsupported stream element during negotiation: {other:?}. Emitting stream error.");
489                        Poll::Ready(Break(NegotiationResult::StreamError {
490                            error: StreamError::new(
491                                DefinedCondition::UnsupportedStanzaType,
492                                "en",
493                                format!("Unsupported stream element during negotiation: {other:?}"),
494                            ),
495                        }))
496                    }
497
498                    // Soft timeouts during negotiation are a bad sign
499                    // (because we already prompted the server to send
500                    // something and are waiting for it), but also nothing
501                    // to write home about.
502                    Err(ReadError::SoftTimeout) => Poll::Ready(Continue(None)),
503
504                    // Parse errors during negotiation cause an unconditional
505                    // stream error.
506                    Err(ReadError::ParseError(e)) => {
507                        Poll::Ready(Break(NegotiationResult::StreamError {
508                            error: parse_error_to_stream_error(e),
509                        }))
510                    }
511
512                    // I/O errors cause the stream to be considered
513                    // broken; we drop it and send a Disconnect event with
514                    // the error embedded.
515                    Err(ReadError::HardError(error)) => {
516                        Poll::Ready(Break(NegotiationResult::Disconnect {
517                            sm_state: sm_state.take(),
518                            error,
519                        }))
520                    }
521
522                    // Stream footer during negotiation is really weird.
523                    // We kill the stream immediately with an error
524                    // (but allow preservation of the SM state).
525                    Err(ReadError::StreamFooterReceived) => {
526                        Poll::Ready(Break(NegotiationResult::Disconnect {
527                            sm_state: sm_state.take(),
528                            error: io::Error::new(
529                                io::ErrorKind::InvalidData,
530                                "stream footer received during negotiation",
531                            ),
532                        }))
533                    }
534                }
535            }
536        }
537    }
538}