tokio_xmpp/stanzastream/
negotiation.rs

1// Copyright (c) 2019 Emmanuel Gil Peyrot <linkmauve@linkmauve.fr>
2//
3// This Source Code Form is subject to the terms of the Mozilla Public
4// License, v. 2.0. If a copy of the MPL was not distributed with this
5// file, You can obtain one at http://mozilla.org/MPL/2.0/.
6
7use core::ops::ControlFlow::{self, Break, Continue};
8use core::pin::Pin;
9use core::task::{Context, Poll};
10use std::io;
11
12use futures::{ready, Sink, Stream};
13
14use xmpp_parsers::{
15    bind::{BindQuery, BindResponse},
16    iq::Iq,
17    jid::{FullJid, Jid},
18    sm,
19    stream_error::{DefinedCondition, StreamError},
20    stream_features::StreamFeatures,
21};
22
23use crate::xmlstream::{ReadError, XmppStreamElement};
24use crate::Stanza;
25
26use super::queue::{QueueEntry, TransmitQueue};
27use super::stream_management::*;
28use super::worker::{parse_error_to_stream_error, XmppStream};
29
30static BIND_REQ_ID: &str = "resource-binding";
31
32pub(super) enum NegotiationState {
33    /// Send request to enable or resume stream management.
34    SendSmRequest {
35        /// Stream management state to use. If present, resumption will be
36        /// attempted. Otherwise, a fresh session will be established.
37        sm_state: Option<SmState>,
38
39        /// If the stream has been freshly bound, we carry the bound JID along
40        /// with us.
41        bound_jid: Option<FullJid>,
42    },
43
44    /// Await the response to the SM enable/resume request.
45    ReceiveSmResponse {
46        /// State to use.
47        sm_state: Option<SmState>,
48
49        /// If the stream has been freshly bound, we carry the bound JID along
50        /// with us.
51        bound_jid: Option<FullJid>,
52    },
53
54    /// Send a new request to bind to a resource.
55    SendBindRequest { sm_supported: bool },
56
57    /// Receive the bind response.
58    ReceiveBindResponse { sm_supported: bool },
59}
60
61/// The ultimate result of a stream negotiation.
62pub(super) enum NegotiationResult {
63    /// An unplanned disconnect happened or a stream error was received from
64    /// the remote party.
65    Disconnect {
66        /// Stream management state for a later resumption attempt.
67        sm_state: Option<SmState>,
68
69        /// I/O error which came along the disconnect.
70        error: io::Error,
71    },
72
73    /// The negotiation completed successfully, but the stream was reset (i.e.
74    /// stream management and all session state was lost).
75    StreamReset {
76        /// Stream management state. This may still be non-None if the new
77        /// stream has successfully negotiated stream management.
78        sm_state: Option<SmState>,
79
80        /// The JID to which the stream is now bound.
81        bound_jid: Jid,
82    },
83
84    /// The negotiation completed successfully and a previous session was
85    /// resumed.
86    StreamResumed {
87        /// Negotiated stream management state.
88        sm_state: SmState,
89    },
90
91    /// The negotiation failed and we need to emit a stream error.
92    ///
93    /// **Note:** Stream errors *received* from the peer are signalled using
94    /// [`Self::Disconnect`] instead, with an I/O error of kind `Other`.
95    StreamError {
96        /// Stream error to send to the remote party with details about the
97        /// failure.
98        error: StreamError,
99    },
100}
101
102impl NegotiationState {
103    pub fn new(features: &StreamFeatures, sm_state: Option<SmState>) -> io::Result<Self> {
104        if let Some(sm_state) = sm_state {
105            if features.stream_management.is_some() {
106                return Ok(Self::SendSmRequest {
107                    sm_state: Some(sm_state),
108                    bound_jid: None,
109                });
110            } else {
111                log::warn!("Peer is not offering stream management anymore. Dropping state.");
112            }
113        }
114
115        if !features.can_bind() {
116            return Err(io::Error::new(
117                io::ErrorKind::InvalidData,
118                "Peer is not offering the bind feature. Cannot proceed with stream negotiation.",
119            ));
120        }
121
122        Ok(Self::SendBindRequest {
123            sm_supported: features.stream_management.is_some(),
124        })
125    }
126
127    fn flush(stream: Pin<&mut XmppStream>, cx: &mut Context) -> ControlFlow<io::Error, ()> {
128        match <XmppStream as Sink<&XmppStreamElement>>::poll_flush(stream, cx) {
129            Poll::Pending | Poll::Ready(Ok(())) => Continue(()),
130            Poll::Ready(Err(error)) => Break(error),
131        }
132    }
133
134    pub fn advance(
135        &mut self,
136        mut stream: Pin<&mut XmppStream>,
137        jid: &Jid,
138        transmit_queue: &mut TransmitQueue<QueueEntry>,
139        cx: &mut Context<'_>,
140    ) -> Poll<ControlFlow<NegotiationResult, Option<Stanza>>> {
141        // When sending requests, we need to wait for the stream to become
142        // ready to send and then send the corresponding request.
143        // Note that if this wasn't a fresh stream (which it always is!),
144        // doing it in this kind of simplex fashion could lead to deadlocks
145        // (because we are blockingly sending without attempting to receive: a
146        // peer could stop receiving from our side if their tx buffer was too
147        // full or smth). However, because this stream is fresh, we know that
148        // our tx buffers are empty enough that this will succeed quickly, so
149        // that we can proceed.
150        // TODO: define a deadline for negotiation.
151        match self {
152            Self::SendBindRequest { sm_supported } => {
153                match ready!(<XmppStream as Sink<&Stanza>>::poll_ready(
154                    stream.as_mut(),
155                    cx
156                )) {
157                    // We can send.
158                    Ok(()) => (),
159
160                    // Stream broke horribly.
161                    Err(error) => {
162                        return Poll::Ready(Break(NegotiationResult::Disconnect {
163                            sm_state: None,
164                            error,
165                        }))
166                    }
167                };
168
169                let resource = jid.resource().map(|x| x.as_str().to_owned());
170                let stanza = Iq::from_set(BIND_REQ_ID, BindQuery::new(resource));
171                match stream.start_send(&stanza) {
172                    Ok(()) => (),
173                    Err(e) => panic!("failed to serialize BindQuery: {}", e),
174                };
175
176                *self = Self::ReceiveBindResponse {
177                    sm_supported: *sm_supported,
178                };
179                Poll::Ready(Continue(None))
180            }
181
182            Self::ReceiveBindResponse { sm_supported } => {
183                match Self::flush(stream.as_mut(), cx) {
184                    Break(error) => {
185                        return Poll::Ready(Break(NegotiationResult::Disconnect {
186                            sm_state: None,
187                            error,
188                        }))
189                    }
190                    Continue(()) => (),
191                }
192
193                let item = ready!(stream.poll_next(cx));
194                let item = item.unwrap_or_else(|| {
195                    Err(ReadError::HardError(io::Error::new(
196                        io::ErrorKind::UnexpectedEof,
197                        "eof before stream footer",
198                    )))
199                });
200
201                match item {
202                    Ok(XmppStreamElement::Stanza(data)) => match data {
203                        Stanza::Iq(iq) if iq.id() == BIND_REQ_ID => {
204                            let error = match iq {
205                                Iq::Result {
206                                    payload: Some(payload),
207                                    ..
208                                } => match BindResponse::try_from(payload) {
209                                    Ok(v) => {
210                                        let bound_jid = v.into();
211                                        if *sm_supported {
212                                            *self = Self::SendSmRequest {
213                                                sm_state: None,
214                                                bound_jid: Some(bound_jid),
215                                            };
216                                            return Poll::Ready(Continue(None));
217                                        } else {
218                                            return Poll::Ready(Break(
219                                                NegotiationResult::StreamReset {
220                                                    sm_state: None,
221                                                    bound_jid: Jid::from(bound_jid),
222                                                },
223                                            ));
224                                        }
225                                    }
226                                    Err(e) => e.to_string(),
227                                },
228                                Iq::Result { payload: None, .. } => {
229                                    "Bind response has no payload".to_owned()
230                                }
231                                _ => "Unexpected IQ type in response to bind request".to_owned(),
232                            };
233                            log::warn!("Received IQ matching the bind request, but parsing failed ({error})! Emitting stream error.");
234                            Poll::Ready(Break(NegotiationResult::StreamError {
235                                error: StreamError::new(
236                                    DefinedCondition::UndefinedCondition,
237                                    "en",
238                                    error,
239                                )
240                                .with_application_specific(vec![super::error::ParseError.into()]),
241                            }))
242                        }
243                        st => {
244                            log::warn!("Received unexpected stanza before response to bind request: {st:?}. Dropping.");
245                            Poll::Ready(Continue(None))
246                        }
247                    },
248
249                    Ok(XmppStreamElement::StreamError(error)) => {
250                        log::debug!("Received stream:error, failing stream and discarding any stream management state.");
251                        let error = io::Error::other(error);
252                        transmit_queue.fail(&(&error).into());
253                        Poll::Ready(Break(NegotiationResult::Disconnect {
254                            error,
255                            sm_state: None,
256                        }))
257                    }
258
259                    Ok(other) => {
260                        log::warn!("Received unsupported stream element during bind: {other:?}. Emitting stream error.");
261                        Poll::Ready(Break(NegotiationResult::StreamError {
262                            error: StreamError::new(
263                                DefinedCondition::UnsupportedStanzaType,
264                                "en",
265                                format!("Unsupported stream element during bind: {other:?}"),
266                            ),
267                        }))
268                    }
269
270                    // Soft timeouts during negotiation are a bad sign
271                    // (because we already prompted the server to send
272                    // something and are waiting for it), but also nothing
273                    // to write home about.
274                    Err(ReadError::SoftTimeout) => Poll::Ready(Continue(None)),
275
276                    // Parse errors during negotiation cause an unconditional
277                    // stream error.
278                    Err(ReadError::ParseError(e)) => {
279                        Poll::Ready(Break(NegotiationResult::StreamError {
280                            error: parse_error_to_stream_error(e),
281                        }))
282                    }
283
284                    // I/O errors cause the stream to be considered
285                    // broken; we drop it and send a Disconnect event with
286                    // the error embedded.
287                    Err(ReadError::HardError(error)) => {
288                        Poll::Ready(Break(NegotiationResult::Disconnect {
289                            sm_state: None,
290                            error,
291                        }))
292                    }
293
294                    // Stream footer during negotation is really weird.
295                    // We kill the stream immediately with an error
296                    // (but allow preservation of the SM state).
297                    Err(ReadError::StreamFooterReceived) => {
298                        Poll::Ready(Break(NegotiationResult::Disconnect {
299                            sm_state: None,
300                            error: io::Error::new(
301                                io::ErrorKind::InvalidData,
302                                "stream footer received during negotation",
303                            ),
304                        }))
305                    }
306                }
307            }
308
309            Self::SendSmRequest {
310                sm_state,
311                bound_jid,
312            } => {
313                match ready!(<XmppStream as Sink<&XmppStreamElement>>::poll_ready(
314                    stream.as_mut(),
315                    cx
316                )) {
317                    // We can send.
318                    Ok(()) => (),
319
320                    // Stream broke horribly.
321                    Err(error) => {
322                        return Poll::Ready(Break(NegotiationResult::Disconnect {
323                            sm_state: sm_state.take(),
324                            error,
325                        }))
326                    }
327                };
328
329                let nonza = if let Some((id, inbound_ctr)) =
330                    sm_state.as_ref().and_then(|x| x.resume_info())
331                {
332                    // Attempt resumption
333                    sm::Nonza::Resume(sm::Resume {
334                        h: inbound_ctr,
335                        previd: sm::StreamId(id.to_owned()),
336                    })
337                } else {
338                    // Attempt enabling
339                    sm::Nonza::Enable(sm::Enable {
340                        max: None,
341                        resume: true,
342                    })
343                };
344                match stream.start_send(&XmppStreamElement::SM(nonza)) {
345                    Ok(()) => (),
346                    Err(e) => {
347                        // We panic here, instead of returning an
348                        // error, because after we confirmed via
349                        // poll_ready that the stream is ready to
350                        // send, the only error returned by start_send
351                        // can be caused by our data.
352                        panic!("Failed to send SM nonza: {}", e);
353                    }
354                }
355
356                *self = Self::ReceiveSmResponse {
357                    sm_state: sm_state.take(),
358                    bound_jid: bound_jid.take(),
359                };
360                // Ask caller to poll us again immediately in order to
361                // start flushing the stream.
362                Poll::Ready(Continue(None))
363            }
364
365            Self::ReceiveSmResponse {
366                sm_state,
367                bound_jid,
368            } => {
369                match Self::flush(stream.as_mut(), cx) {
370                    Break(error) => {
371                        return Poll::Ready(Break(NegotiationResult::Disconnect {
372                            sm_state: sm_state.take(),
373                            error,
374                        }))
375                    }
376                    Continue(()) => (),
377                }
378
379                // So the difficulty here is that there's a possibility
380                // that we receive non-SM data while the SM negotiation
381                // is going on.
382
383                let item = ready!(stream.poll_next(cx));
384                let item = item.unwrap_or_else(|| {
385                    Err(ReadError::HardError(io::Error::new(
386                        io::ErrorKind::UnexpectedEof,
387                        "eof before stream footer",
388                    )))
389                });
390                match item {
391                    // Pre-SM data. Note that we mustn't count this while we
392                    // are still in negotiating state: we transit to
393                    // [`Self::Ready`] immediately after we got the
394                    // `<resumed/>` or `<enabled/>`, and before we got that,
395                    // counting inbound stanzas is definitely wrong (see e.g.
396                    // aioxmpp commit 796aa32).
397                    Ok(XmppStreamElement::Stanza(data)) => Poll::Ready(Continue(Some(data))),
398
399                    Ok(XmppStreamElement::SM(sm::Nonza::Enabled(enabled))) => {
400                        if sm_state.is_some() {
401                            // Okay, the peer violated the stream management
402                            // protocol here (or we have a bug).
403                            log::warn!(
404                                "Received <enabled/>, but we also have previous SM state. One of us has a bug here (us or the peer) and I'm not sure which it is. If you can reproduce this, please re-run with trace loglevel and provide the logs. Attempting to proceed with a fresh session.",
405                            );
406                        }
407                        // We must emit Reset here because this is a
408                        // fresh stream and we did not resume.
409                        Poll::Ready(Break(NegotiationResult::StreamReset {
410                            sm_state: Some(enabled.into()),
411                            bound_jid: bound_jid.take().expect("State machine error: no bound_jid available in SM negotiation.").into(),
412                        }))
413                    }
414
415                    Ok(XmppStreamElement::SM(sm::Nonza::Resumed(resumed))) => match sm_state.take()
416                    {
417                        Some(mut sm_state) => {
418                            // Yay!
419                            match sm_state.resume(resumed.h) {
420                                Ok(to_retransmit) => transmit_queue.requeue_all(to_retransmit),
421                                Err(e) => {
422                                    // We kill the stream with an error
423                                    log::error!("Resumption failed: {e}");
424                                    return Poll::Ready(Break(NegotiationResult::StreamError {
425                                        error: e.into(),
426                                    }));
427                                }
428                            }
429                            Poll::Ready(Break(NegotiationResult::StreamResumed { sm_state }))
430                        }
431                        None => {
432                            // Okay, the peer violated the stream management
433                            // protocol here (or we have a bug).
434                            // Unlike the
435                            // received-enabled-but-attempted-to-resume
436                            // situation, we do not have enough information to
437                            // proceed without having the stream break soon.
438                            // (If we proceeded without a SM state, we would
439                            // have the stream die as soon as the peer
440                            // requests our counters).
441                            // We thus terminate the stream with an error.
442                            // We must emit Reset here because this is a fresh
443                            // stream and we did not resume.
444                            Poll::Ready(Break(NegotiationResult::Disconnect {
445                                sm_state: None,
446                                error: io::Error::new(io::ErrorKind::InvalidData, "Peer replied to <sm:enable/> request with <sm:resumed/> response"),
447                            }))
448                        }
449                    },
450
451                    Ok(XmppStreamElement::SM(sm::Nonza::Failed(failed))) => match sm_state {
452                        Some(sm_state) => {
453                            log::debug!("Received <sm:failed/> in response to resumption request. Discarding SM data and attempting to renegotiate.");
454                            if let Some(h) = failed.h {
455                                // This is only an optimization anyway, so
456                                // we can also just ignore this.
457                                let _: Result<_, _> = sm_state.remote_acked(h);
458                            }
459                            *self = Self::SendBindRequest { sm_supported: true };
460                            Poll::Ready(Continue(None))
461                        }
462                        None => {
463                            log::warn!("Received <sm:failed/> in response to enable request. Proceeding without stream management.");
464
465                            // We must emit Reset here because this is a
466                            // fresh stream and we did not resume.
467                            Poll::Ready(Break(NegotiationResult::StreamReset {
468                                bound_jid: bound_jid.take().expect("State machine error: no bound_jid available in SM negotiation.").into(),
469                                sm_state: None,
470                            }))
471                        }
472                    },
473
474                    Ok(XmppStreamElement::StreamError(error)) => {
475                        log::debug!("Received stream error, failing stream and discarding any stream management state.");
476                        let error = io::Error::other(error);
477                        transmit_queue.fail(&(&error).into());
478                        Poll::Ready(Break(NegotiationResult::Disconnect {
479                            error,
480                            sm_state: None,
481                        }))
482                    }
483
484                    Ok(other) => {
485                        log::warn!("Received unsupported stream element during negotiation: {other:?}. Emitting stream error.");
486                        Poll::Ready(Break(NegotiationResult::StreamError {
487                            error: StreamError::new(
488                                DefinedCondition::UnsupportedStanzaType,
489                                "en",
490                                format!("Unsupported stream element during negotiation: {other:?}"),
491                            ),
492                        }))
493                    }
494
495                    // Soft timeouts during negotiation are a bad sign
496                    // (because we already prompted the server to send
497                    // something and are waiting for it), but also nothing
498                    // to write home about.
499                    Err(ReadError::SoftTimeout) => Poll::Ready(Continue(None)),
500
501                    // Parse errors during negotiation cause an unconditional
502                    // stream error.
503                    Err(ReadError::ParseError(e)) => {
504                        Poll::Ready(Break(NegotiationResult::StreamError {
505                            error: parse_error_to_stream_error(e),
506                        }))
507                    }
508
509                    // I/O errors cause the stream to be considered
510                    // broken; we drop it and send a Disconnect event with
511                    // the error embedded.
512                    Err(ReadError::HardError(error)) => {
513                        Poll::Ready(Break(NegotiationResult::Disconnect {
514                            sm_state: sm_state.take(),
515                            error,
516                        }))
517                    }
518
519                    // Stream footer during negotation is really weird.
520                    // We kill the stream immediately with an error
521                    // (but allow preservation of the SM state).
522                    Err(ReadError::StreamFooterReceived) => {
523                        Poll::Ready(Break(NegotiationResult::Disconnect {
524                            sm_state: sm_state.take(),
525                            error: io::Error::new(
526                                io::ErrorKind::InvalidData,
527                                "stream footer received during negotation",
528                            ),
529                        }))
530                    }
531                }
532            }
533        }
534    }
535}