tokio_xmpp/stanzastream/
negotiation.rs

1// Copyright (c) 2019 Emmanuel Gil Peyrot <linkmauve@linkmauve.fr>
2//
3// This Source Code Form is subject to the terms of the Mozilla Public
4// License, v. 2.0. If a copy of the MPL was not distributed with this
5// file, You can obtain one at http://mozilla.org/MPL/2.0/.
6
7use core::ops::ControlFlow::{self, Break, Continue};
8use core::pin::Pin;
9use core::task::{Context, Poll};
10use std::io;
11
12use futures::{ready, Sink, Stream};
13
14use xmpp_parsers::{
15    bind::{BindQuery, BindResponse},
16    iq::{Iq, IqType},
17    jid::{FullJid, Jid},
18    sm,
19    stream_error::{DefinedCondition, StreamError},
20    stream_features::StreamFeatures,
21};
22
23use crate::xmlstream::{ReadError, XmppStreamElement};
24use crate::Stanza;
25
26use super::queue::{QueueEntry, TransmitQueue};
27use super::stream_management::*;
28use super::worker::{parse_error_to_stream_error, XmppStream};
29
30static BIND_REQ_ID: &str = "resource-binding";
31
32pub(super) enum NegotiationState {
33    /// Send request to enable or resume stream management.
34    SendSmRequest {
35        /// Stream management state to use. If present, resumption will be
36        /// attempted. Otherwise, a fresh session will be established.
37        sm_state: Option<SmState>,
38
39        /// If the stream has been freshly bound, we carry the bound JID along
40        /// with us.
41        bound_jid: Option<FullJid>,
42    },
43
44    /// Await the response to the SM enable/resume request.
45    ReceiveSmResponse {
46        /// State to use.
47        sm_state: Option<SmState>,
48
49        /// If the stream has been freshly bound, we carry the bound JID along
50        /// with us.
51        bound_jid: Option<FullJid>,
52    },
53
54    /// Send a new request to bind to a resource.
55    SendBindRequest { sm_supported: bool },
56
57    /// Receive the bind response.
58    ReceiveBindResponse { sm_supported: bool },
59}
60
61/// The ultimate result of a stream negotiation.
62pub(super) enum NegotiationResult {
63    /// An unplanned disconnect happened or a stream error was received from
64    /// the remote party.
65    Disconnect {
66        /// Stream management state for a later resumption attempt.
67        sm_state: Option<SmState>,
68
69        /// I/O error which came along the disconnect.
70        error: io::Error,
71    },
72
73    /// The negotiation completed successfully, but the stream was reset (i.e.
74    /// stream management and all session state was lost).
75    StreamReset {
76        /// Stream management state. This may still be non-None if the new
77        /// stream has successfully negotiated stream management.
78        sm_state: Option<SmState>,
79
80        /// The JID to which the stream is now bound.
81        bound_jid: Jid,
82    },
83
84    /// The negotiation completed successfully and a previous session was
85    /// resumed.
86    StreamResumed {
87        /// Negotiated stream management state.
88        sm_state: SmState,
89    },
90
91    /// The negotiation failed and we need to emit a stream error.
92    ///
93    /// **Note:** Stream errors *received* from the peer are signalled using
94    /// [`Self::Disconnect`] instead, with an I/O error of kind `Other`.
95    StreamError {
96        /// Stream error to send to the remote party with details about the
97        /// failure.
98        error: StreamError,
99    },
100}
101
102impl NegotiationState {
103    pub fn new(features: &StreamFeatures, sm_state: Option<SmState>) -> io::Result<Self> {
104        match sm_state {
105            Some(sm_state) => {
106                if features.stream_management.is_some() {
107                    return Ok(Self::SendSmRequest {
108                        sm_state: Some(sm_state),
109                        bound_jid: None,
110                    });
111                } else {
112                    log::warn!("Peer is not offering stream management anymore. Dropping state.");
113                }
114            }
115            None => (),
116        }
117
118        if !features.can_bind() {
119            return Err(io::Error::new(
120                io::ErrorKind::InvalidData,
121                "Peer is not offering the bind feature. Cannot proceed with stream negotiation.",
122            ));
123        }
124
125        Ok(Self::SendBindRequest {
126            sm_supported: features.stream_management.is_some(),
127        })
128    }
129
130    fn flush(stream: Pin<&mut XmppStream>, cx: &mut Context) -> ControlFlow<io::Error, ()> {
131        match <XmppStream as Sink<&XmppStreamElement>>::poll_flush(stream, cx) {
132            Poll::Pending | Poll::Ready(Ok(())) => Continue(()),
133            Poll::Ready(Err(error)) => Break(error),
134        }
135    }
136
137    pub fn advance(
138        &mut self,
139        mut stream: Pin<&mut XmppStream>,
140        jid: &Jid,
141        transmit_queue: &mut TransmitQueue<QueueEntry>,
142        cx: &mut Context<'_>,
143    ) -> Poll<ControlFlow<NegotiationResult, Option<Stanza>>> {
144        // When sending requests, we need to wait for the stream to become
145        // ready to send and then send the corresponding request.
146        // Note that if this wasn't a fresh stream (which it always is!),
147        // doing it in this kind of simplex fashion could lead to deadlocks
148        // (because we are blockingly sending without attempting to receive: a
149        // peer could stop receiving from our side if their tx buffer was too
150        // full or smth). However, because this stream is fresh, we know that
151        // our tx buffers are empty enough that this will succeed quickly, so
152        // that we can proceed.
153        // TODO: define a deadline for negotiation.
154        match self {
155            Self::SendBindRequest { sm_supported } => {
156                match ready!(<XmppStream as Sink<&Stanza>>::poll_ready(
157                    stream.as_mut(),
158                    cx
159                )) {
160                    // We can send.
161                    Ok(()) => (),
162
163                    // Stream broke horribly.
164                    Err(error) => {
165                        return Poll::Ready(Break(NegotiationResult::Disconnect {
166                            sm_state: None,
167                            error,
168                        }))
169                    }
170                };
171
172                let resource = jid.resource().map(|x| x.as_str().to_owned());
173                let stanza = Iq::from_set(BIND_REQ_ID, BindQuery::new(resource));
174                match stream.start_send(&stanza) {
175                    Ok(()) => (),
176                    Err(e) => panic!("failed to serialize BindQuery: {}", e),
177                };
178
179                *self = Self::ReceiveBindResponse {
180                    sm_supported: *sm_supported,
181                };
182                Poll::Ready(Continue(None))
183            }
184
185            Self::ReceiveBindResponse { sm_supported } => {
186                match Self::flush(stream.as_mut(), cx) {
187                    Break(error) => {
188                        return Poll::Ready(Break(NegotiationResult::Disconnect {
189                            sm_state: None,
190                            error,
191                        }))
192                    }
193                    Continue(()) => (),
194                }
195
196                let item = ready!(stream.poll_next(cx));
197                let item = item.unwrap_or_else(|| {
198                    Err(ReadError::HardError(io::Error::new(
199                        io::ErrorKind::UnexpectedEof,
200                        "eof before stream footer",
201                    )))
202                });
203
204                match item {
205                    Ok(XmppStreamElement::Stanza(data)) => match data {
206                        Stanza::Iq(iq) if iq.id == BIND_REQ_ID => {
207                            let error = match iq.payload {
208                                IqType::Result(Some(payload)) => {
209                                    match BindResponse::try_from(payload) {
210                                        Ok(v) => {
211                                            let bound_jid = v.into();
212                                            if *sm_supported {
213                                                *self = Self::SendSmRequest {
214                                                    sm_state: None,
215                                                    bound_jid: Some(bound_jid),
216                                                };
217                                                return Poll::Ready(Continue(None));
218                                            } else {
219                                                return Poll::Ready(Break(
220                                                    NegotiationResult::StreamReset {
221                                                        sm_state: None,
222                                                        bound_jid: Jid::from(bound_jid),
223                                                    },
224                                                ));
225                                            }
226                                        }
227                                        Err(e) => e.to_string(),
228                                    }
229                                }
230                                IqType::Result(None) => "Bind response has no payload".to_owned(),
231                                _ => "Unexpected IQ type in response to bind request".to_owned(),
232                            };
233                            log::warn!("Received IQ matching the bind request, but parsing failed ({error})! Emitting stream error.");
234                            Poll::Ready(Break(NegotiationResult::StreamError {
235                                error: StreamError {
236                                    condition: DefinedCondition::UndefinedCondition,
237                                    text: Some((None, error)),
238                                    application_specific: vec![super::error::ParseError.into()],
239                                },
240                            }))
241                        }
242                        st => {
243                            log::warn!("Received unexpected stanza before response to bind request: {st:?}. Dropping.");
244                            Poll::Ready(Continue(None))
245                        }
246                    },
247
248                    Ok(XmppStreamElement::StreamError(error)) => {
249                        log::debug!("Received stream:error, failing stream and discarding any stream management state.");
250                        let error = io::Error::new(io::ErrorKind::Other, error);
251                        transmit_queue.fail(&(&error).into());
252                        Poll::Ready(Break(NegotiationResult::Disconnect {
253                            error,
254                            sm_state: None,
255                        }))
256                    }
257
258                    Ok(other) => {
259                        log::warn!("Received unsupported stream element during bind: {other:?}. Emitting stream error.");
260                        Poll::Ready(Break(NegotiationResult::StreamError {
261                            error: StreamError {
262                                condition: DefinedCondition::UnsupportedStanzaType,
263                                text: None,
264                                application_specific: vec![],
265                            },
266                        }))
267                    }
268
269                    // Soft timeouts during negotiation are a bad sign
270                    // (because we already prompted the server to send
271                    // something and are waiting for it), but also nothing
272                    // to write home about.
273                    Err(ReadError::SoftTimeout) => Poll::Ready(Continue(None)),
274
275                    // Parse errors during negotiation cause an unconditional
276                    // stream error.
277                    Err(ReadError::ParseError(e)) => {
278                        Poll::Ready(Break(NegotiationResult::StreamError {
279                            error: parse_error_to_stream_error(e),
280                        }))
281                    }
282
283                    // I/O errors cause the stream to be considered
284                    // broken; we drop it and send a Disconnect event with
285                    // the error embedded.
286                    Err(ReadError::HardError(error)) => {
287                        Poll::Ready(Break(NegotiationResult::Disconnect {
288                            sm_state: None,
289                            error,
290                        }))
291                    }
292
293                    // Stream footer during negotation is really weird.
294                    // We kill the stream immediately with an error
295                    // (but allow preservation of the SM state).
296                    Err(ReadError::StreamFooterReceived) => {
297                        Poll::Ready(Break(NegotiationResult::Disconnect {
298                            sm_state: None,
299                            error: io::Error::new(
300                                io::ErrorKind::InvalidData,
301                                "stream footer received during negotation",
302                            ),
303                        }))
304                    }
305                }
306            }
307
308            Self::SendSmRequest {
309                sm_state,
310                bound_jid,
311            } => {
312                match ready!(<XmppStream as Sink<&XmppStreamElement>>::poll_ready(
313                    stream.as_mut(),
314                    cx
315                )) {
316                    // We can send.
317                    Ok(()) => (),
318
319                    // Stream broke horribly.
320                    Err(error) => {
321                        return Poll::Ready(Break(NegotiationResult::Disconnect {
322                            sm_state: sm_state.take(),
323                            error,
324                        }))
325                    }
326                };
327
328                let nonza = if let Some((id, inbound_ctr)) =
329                    sm_state.as_ref().and_then(|x| x.resume_info())
330                {
331                    // Attempt resumption
332                    sm::Nonza::Resume(sm::Resume {
333                        h: inbound_ctr,
334                        previd: sm::StreamId(id.to_owned()),
335                    })
336                } else {
337                    // Attempt enabling
338                    sm::Nonza::Enable(sm::Enable {
339                        max: None,
340                        resume: true,
341                    })
342                };
343                match stream.start_send(&XmppStreamElement::SM(nonza)) {
344                    Ok(()) => (),
345                    Err(e) => {
346                        // We panic here, instead of returning an
347                        // error, because after we confirmed via
348                        // poll_ready that the stream is ready to
349                        // send, the only error returned by start_send
350                        // can be caused by our data.
351                        panic!("Failed to send SM nonza: {}", e);
352                    }
353                }
354
355                *self = Self::ReceiveSmResponse {
356                    sm_state: sm_state.take(),
357                    bound_jid: bound_jid.take(),
358                };
359                // Ask caller to poll us again immediately in order to
360                // start flushing the stream.
361                Poll::Ready(Continue(None))
362            }
363
364            Self::ReceiveSmResponse {
365                sm_state,
366                bound_jid,
367            } => {
368                match Self::flush(stream.as_mut(), cx) {
369                    Break(error) => {
370                        return Poll::Ready(Break(NegotiationResult::Disconnect {
371                            sm_state: sm_state.take(),
372                            error,
373                        }))
374                    }
375                    Continue(()) => (),
376                }
377
378                // So the difficulty here is that there's a possibility
379                // that we receive non-SM data while the SM negotiation
380                // is going on.
381
382                let item = ready!(stream.poll_next(cx));
383                let item = item.unwrap_or_else(|| {
384                    Err(ReadError::HardError(io::Error::new(
385                        io::ErrorKind::UnexpectedEof,
386                        "eof before stream footer",
387                    )))
388                });
389                match item {
390                    // Pre-SM data. Note that we mustn't count this while we
391                    // are still in negotiating state: we transit to
392                    // [`Self::Ready`] immediately after we got the
393                    // `<resumed/>` or `<enabled/>`, and before we got that,
394                    // counting inbound stanzas is definitely wrong (see e.g.
395                    // aioxmpp commit 796aa32).
396                    Ok(XmppStreamElement::Stanza(data)) => Poll::Ready(Continue(Some(data))),
397
398                    Ok(XmppStreamElement::SM(sm::Nonza::Enabled(enabled))) => {
399                        if sm_state.is_some() {
400                            // Okay, the peer violated the stream management
401                            // protocol here (or we have a bug).
402                            log::warn!(
403                                "Received <enabled/>, but we also have previous SM state. One of us has a bug here (us or the peer) and I'm not sure which it is. If you can reproduce this, please re-run with trace loglevel and provide the logs. Attempting to proceed with a fresh session.",
404                            );
405                        }
406                        // We must emit Reset here because this is a
407                        // fresh stream and we did not resume.
408                        Poll::Ready(Break(NegotiationResult::StreamReset {
409                            sm_state: Some(enabled.into()),
410                            bound_jid: bound_jid.take().expect("State machine error: no bound_jid available in SM negotiation.").into(),
411                        }))
412                    }
413
414                    Ok(XmppStreamElement::SM(sm::Nonza::Resumed(resumed))) => match sm_state.take()
415                    {
416                        Some(mut sm_state) => {
417                            // Yay!
418                            match sm_state.resume(resumed.h) {
419                                Ok(to_retransmit) => transmit_queue.requeue_all(to_retransmit),
420                                Err(e) => {
421                                    // We kill the stream with an error
422                                    log::error!("Resumption failed: {e}");
423                                    return Poll::Ready(Break(NegotiationResult::StreamError {
424                                        error: e.into(),
425                                    }));
426                                }
427                            }
428                            Poll::Ready(Break(NegotiationResult::StreamResumed { sm_state }))
429                        }
430                        None => {
431                            // Okay, the peer violated the stream management
432                            // protocol here (or we have a bug).
433                            // Unlike the
434                            // received-enabled-but-attempted-to-resume
435                            // situation, we do not have enough information to
436                            // proceed without having the stream break soon.
437                            // (If we proceeded without a SM state, we would
438                            // have the stream die as soon as the peer
439                            // requests our counters).
440                            // We thus terminate the stream with an error.
441                            // We must emit Reset here because this is a fresh
442                            // stream and we did not resume.
443                            Poll::Ready(Break(NegotiationResult::Disconnect {
444                                sm_state: None,
445                                error: io::Error::new(io::ErrorKind::InvalidData, "Peer replied to <sm:enable/> request with <sm:resumed/> response"),
446                            }))
447                        }
448                    },
449
450                    Ok(XmppStreamElement::SM(sm::Nonza::Failed(failed))) => match sm_state {
451                        Some(sm_state) => {
452                            log::debug!("Received <sm:failed/> in response to resumption request. Discarding SM data and attempting to renegotiate.");
453                            if let Some(h) = failed.h {
454                                // This is only an optimization anyway, so
455                                // we can also just ignore this.
456                                let _: Result<_, _> = sm_state.remote_acked(h);
457                            }
458                            *self = Self::SendBindRequest { sm_supported: true };
459                            Poll::Ready(Continue(None))
460                        }
461                        None => {
462                            log::warn!("Received <sm:failed/> in response to enable request. Proceeding without stream management.");
463
464                            // We must emit Reset here because this is a
465                            // fresh stream and we did not resume.
466                            Poll::Ready(Break(NegotiationResult::StreamReset {
467                                bound_jid: bound_jid.take().expect("State machine error: no bound_jid available in SM negotiation.").into(),
468                                sm_state: None,
469                            }))
470                        }
471                    },
472
473                    Ok(XmppStreamElement::StreamError(error)) => {
474                        log::debug!("Received stream error, failing stream and discarding any stream management state.");
475                        let error = io::Error::new(io::ErrorKind::Other, error);
476                        transmit_queue.fail(&(&error).into());
477                        Poll::Ready(Break(NegotiationResult::Disconnect {
478                            error,
479                            sm_state: None,
480                        }))
481                    }
482
483                    Ok(other) => {
484                        log::warn!("Received unsupported stream element during negotiation: {other:?}. Emitting stream error.");
485                        Poll::Ready(Break(NegotiationResult::StreamError {
486                            error: StreamError {
487                                condition: DefinedCondition::UnsupportedStanzaType,
488                                text: None,
489                                application_specific: vec![],
490                            },
491                        }))
492                    }
493
494                    // Soft timeouts during negotiation are a bad sign
495                    // (because we already prompted the server to send
496                    // something and are waiting for it), but also nothing
497                    // to write home about.
498                    Err(ReadError::SoftTimeout) => Poll::Ready(Continue(None)),
499
500                    // Parse errors during negotiation cause an unconditional
501                    // stream error.
502                    Err(ReadError::ParseError(e)) => {
503                        Poll::Ready(Break(NegotiationResult::StreamError {
504                            error: parse_error_to_stream_error(e),
505                        }))
506                    }
507
508                    // I/O errors cause the stream to be considered
509                    // broken; we drop it and send a Disconnect event with
510                    // the error embedded.
511                    Err(ReadError::HardError(error)) => {
512                        Poll::Ready(Break(NegotiationResult::Disconnect {
513                            sm_state: sm_state.take(),
514                            error,
515                        }))
516                    }
517
518                    // Stream footer during negotation is really weird.
519                    // We kill the stream immediately with an error
520                    // (but allow preservation of the SM state).
521                    Err(ReadError::StreamFooterReceived) => {
522                        Poll::Ready(Break(NegotiationResult::Disconnect {
523                            sm_state: sm_state.take(),
524                            error: io::Error::new(
525                                io::ErrorKind::InvalidData,
526                                "stream footer received during negotation",
527                            ),
528                        }))
529                    }
530                }
531            }
532        }
533    }
534}