tokio_xmpp/stanzastream/negotiation.rs
1// Copyright (c) 2019 Emmanuel Gil Peyrot <linkmauve@linkmauve.fr>
2//
3// This Source Code Form is subject to the terms of the Mozilla Public
4// License, v. 2.0. If a copy of the MPL was not distributed with this
5// file, You can obtain one at http://mozilla.org/MPL/2.0/.
6
7use core::ops::ControlFlow::{self, Break, Continue};
8use core::pin::Pin;
9use core::task::{Context, Poll};
10use std::io;
11
12use futures::{ready, Sink, Stream};
13
14use xmpp_parsers::{
15 bind::{BindQuery, BindResponse},
16 iq::Iq,
17 jid::{FullJid, Jid},
18 sm,
19 stream_error::{DefinedCondition, StreamError},
20 stream_features::StreamFeatures,
21};
22
23use crate::xmlstream::{ReadError, XmppStreamElement};
24use crate::Stanza;
25
26use super::queue::{QueueEntry, TransmitQueue};
27use super::stream_management::*;
28use super::worker::{parse_error_to_stream_error, XmppStream};
29
30static BIND_REQ_ID: &str = "resource-binding";
31
32pub(super) enum NegotiationState {
33 /// Send request to enable or resume stream management.
34 SendSmRequest {
35 /// Stream management state to use. If present, resumption will be
36 /// attempted. Otherwise, a fresh session will be established.
37 sm_state: Option<SmState>,
38
39 /// If the stream has been freshly bound, we carry the bound JID along
40 /// with us.
41 bound_jid: Option<FullJid>,
42 },
43
44 /// Await the response to the SM enable/resume request.
45 ReceiveSmResponse {
46 /// State to use.
47 sm_state: Option<SmState>,
48
49 /// If the stream has been freshly bound, we carry the bound JID along
50 /// with us.
51 bound_jid: Option<FullJid>,
52 },
53
54 /// Send a new request to bind to a resource.
55 SendBindRequest { sm_supported: bool },
56
57 /// Receive the bind response.
58 ReceiveBindResponse { sm_supported: bool },
59}
60
61/// The ultimate result of a stream negotiation.
62pub(super) enum NegotiationResult {
63 /// An unplanned disconnect happened or a stream error was received from
64 /// the remote party.
65 Disconnect {
66 /// Stream management state for a later resumption attempt.
67 sm_state: Option<SmState>,
68
69 /// I/O error which came along the disconnect.
70 error: io::Error,
71 },
72
73 /// The negotiation completed successfully, but the stream was reset (i.e.
74 /// stream management and all session state was lost).
75 StreamReset {
76 /// Stream management state. This may still be non-None if the new
77 /// stream has successfully negotiated stream management.
78 sm_state: Option<SmState>,
79
80 /// The JID to which the stream is now bound.
81 bound_jid: Jid,
82 },
83
84 /// The negotiation completed successfully and a previous session was
85 /// resumed.
86 StreamResumed {
87 /// Negotiated stream management state.
88 sm_state: SmState,
89 },
90
91 /// The negotiation failed and we need to emit a stream error.
92 ///
93 /// **Note:** Stream errors *received* from the peer are signalled using
94 /// [`Self::Disconnect`] instead, with an I/O error of kind `Other`.
95 StreamError {
96 /// Stream error to send to the remote party with details about the
97 /// failure.
98 error: StreamError,
99 },
100}
101
102impl NegotiationState {
103 pub fn new(features: &StreamFeatures, sm_state: Option<SmState>) -> io::Result<Self> {
104 if let Some(sm_state) = sm_state {
105 if features.stream_management.is_some() {
106 return Ok(Self::SendSmRequest {
107 sm_state: Some(sm_state),
108 bound_jid: None,
109 });
110 } else {
111 log::warn!("Peer is not offering stream management anymore. Dropping state.");
112 }
113 }
114
115 if !features.can_bind() {
116 return Err(io::Error::new(
117 io::ErrorKind::InvalidData,
118 "Peer is not offering the bind feature. Cannot proceed with stream negotiation.",
119 ));
120 }
121
122 Ok(Self::SendBindRequest {
123 sm_supported: features.stream_management.is_some(),
124 })
125 }
126
127 fn flush(stream: Pin<&mut XmppStream>, cx: &mut Context) -> ControlFlow<io::Error, ()> {
128 match <XmppStream as Sink<&XmppStreamElement>>::poll_flush(stream, cx) {
129 Poll::Pending | Poll::Ready(Ok(())) => Continue(()),
130 Poll::Ready(Err(error)) => Break(error),
131 }
132 }
133
134 pub fn advance(
135 &mut self,
136 mut stream: Pin<&mut XmppStream>,
137 jid: &Jid,
138 transmit_queue: &mut TransmitQueue<QueueEntry>,
139 cx: &mut Context<'_>,
140 ) -> Poll<ControlFlow<NegotiationResult, Option<Stanza>>> {
141 // When sending requests, we need to wait for the stream to become
142 // ready to send and then send the corresponding request.
143 // Note that if this wasn't a fresh stream (which it always is!),
144 // doing it in this kind of simplex fashion could lead to deadlocks
145 // (because we are blockingly sending without attempting to receive: a
146 // peer could stop receiving from our side if their tx buffer was too
147 // full or smth). However, because this stream is fresh, we know that
148 // our tx buffers are empty enough that this will succeed quickly, so
149 // that we can proceed.
150 // TODO: define a deadline for negotiation.
151 match self {
152 Self::SendBindRequest { sm_supported } => {
153 match ready!(<XmppStream as Sink<&Stanza>>::poll_ready(
154 stream.as_mut(),
155 cx
156 )) {
157 // We can send.
158 Ok(()) => (),
159
160 // Stream broke horribly.
161 Err(error) => {
162 return Poll::Ready(Break(NegotiationResult::Disconnect {
163 sm_state: None,
164 error,
165 }))
166 }
167 };
168
169 let resource = jid.resource().map(|x| x.as_str().to_owned());
170 let stanza = Iq::from_set(BIND_REQ_ID, BindQuery::new(resource));
171 match stream.start_send(&stanza) {
172 Ok(()) => (),
173 Err(e) => panic!("failed to serialize BindQuery: {}", e),
174 };
175
176 *self = Self::ReceiveBindResponse {
177 sm_supported: *sm_supported,
178 };
179 Poll::Ready(Continue(None))
180 }
181
182 Self::ReceiveBindResponse { sm_supported } => {
183 match Self::flush(stream.as_mut(), cx) {
184 Break(error) => {
185 return Poll::Ready(Break(NegotiationResult::Disconnect {
186 sm_state: None,
187 error,
188 }))
189 }
190 Continue(()) => (),
191 }
192
193 let item = ready!(stream.poll_next(cx));
194 let item = item.unwrap_or_else(|| {
195 Err(ReadError::HardError(io::Error::new(
196 io::ErrorKind::UnexpectedEof,
197 "eof before stream footer",
198 )))
199 });
200 let item = item.map(|v| v.into_read_error()).flatten();
201
202 match item {
203 Ok(XmppStreamElement::Stanza(data)) => match data {
204 Stanza::Iq(iq) if iq.id() == BIND_REQ_ID => {
205 let error = match iq {
206 Iq::Result {
207 payload: Some(payload),
208 ..
209 } => match BindResponse::try_from(payload) {
210 Ok(v) => {
211 let bound_jid = v.into();
212 if *sm_supported {
213 *self = Self::SendSmRequest {
214 sm_state: None,
215 bound_jid: Some(bound_jid),
216 };
217 return Poll::Ready(Continue(None));
218 } else {
219 return Poll::Ready(Break(
220 NegotiationResult::StreamReset {
221 sm_state: None,
222 bound_jid: Jid::from(bound_jid),
223 },
224 ));
225 }
226 }
227 Err(e) => e.to_string(),
228 },
229 Iq::Result { payload: None, .. } => {
230 "Bind response has no payload".to_owned()
231 }
232 _ => "Unexpected IQ type in response to bind request".to_owned(),
233 };
234 log::warn!("Received IQ matching the bind request, but parsing failed ({error})! Emitting stream error.");
235 Poll::Ready(Break(NegotiationResult::StreamError {
236 error: StreamError::new(
237 DefinedCondition::UndefinedCondition,
238 "en",
239 error,
240 )
241 .with_application_specific(vec![super::error::ParseError.into()]),
242 }))
243 }
244 st => {
245 log::warn!("Received unexpected stanza before response to bind request: {st:?}. Dropping.");
246 Poll::Ready(Continue(None))
247 }
248 },
249
250 Ok(XmppStreamElement::StreamError(error)) => {
251 log::debug!("Received stream:error, failing stream and discarding any stream management state.");
252 let error = io::Error::other(error);
253 transmit_queue.fail(&(&error).into());
254 Poll::Ready(Break(NegotiationResult::Disconnect {
255 error,
256 sm_state: None,
257 }))
258 }
259
260 Ok(other) => {
261 log::warn!("Received unsupported stream element during bind: {other:?}. Emitting stream error.");
262 Poll::Ready(Break(NegotiationResult::StreamError {
263 error: StreamError::new(
264 DefinedCondition::UnsupportedStanzaType,
265 "en",
266 format!("Unsupported stream element during bind: {other:?}"),
267 ),
268 }))
269 }
270
271 // Soft timeouts during negotiation are a bad sign
272 // (because we already prompted the server to send
273 // something and are waiting for it), but also nothing
274 // to write home about.
275 Err(ReadError::SoftTimeout) => Poll::Ready(Continue(None)),
276
277 // Parse errors during negotiation cause an unconditional
278 // stream error.
279 Err(ReadError::ParseError(e)) => {
280 Poll::Ready(Break(NegotiationResult::StreamError {
281 error: parse_error_to_stream_error(e),
282 }))
283 }
284
285 // I/O errors cause the stream to be considered
286 // broken; we drop it and send a Disconnect event with
287 // the error embedded.
288 Err(ReadError::HardError(error)) => {
289 Poll::Ready(Break(NegotiationResult::Disconnect {
290 sm_state: None,
291 error,
292 }))
293 }
294
295 // Stream footer during negotiation is really weird.
296 // We kill the stream immediately with an error
297 // (but allow preservation of the SM state).
298 Err(ReadError::StreamFooterReceived) => {
299 Poll::Ready(Break(NegotiationResult::Disconnect {
300 sm_state: None,
301 error: io::Error::new(
302 io::ErrorKind::InvalidData,
303 "stream footer received during negotiation",
304 ),
305 }))
306 }
307 }
308 }
309
310 Self::SendSmRequest {
311 sm_state,
312 bound_jid,
313 } => {
314 match ready!(<XmppStream as Sink<&XmppStreamElement>>::poll_ready(
315 stream.as_mut(),
316 cx
317 )) {
318 // We can send.
319 Ok(()) => (),
320
321 // Stream broke horribly.
322 Err(error) => {
323 return Poll::Ready(Break(NegotiationResult::Disconnect {
324 sm_state: sm_state.take(),
325 error,
326 }))
327 }
328 };
329
330 let nonza = if let Some((id, inbound_ctr)) =
331 sm_state.as_ref().and_then(|x| x.resume_info())
332 {
333 // Attempt resumption
334 sm::Nonza::Resume(sm::Resume {
335 h: inbound_ctr,
336 previd: sm::StreamId(id.to_owned()),
337 })
338 } else {
339 // Attempt enabling
340 sm::Nonza::Enable(sm::Enable {
341 max: None,
342 resume: true,
343 })
344 };
345 match stream.start_send(&XmppStreamElement::SM(nonza)) {
346 Ok(()) => (),
347 Err(e) => {
348 // We panic here, instead of returning an
349 // error, because after we confirmed via
350 // poll_ready that the stream is ready to
351 // send, the only error returned by start_send
352 // can be caused by our data.
353 panic!("Failed to send SM nonza: {}", e);
354 }
355 }
356
357 *self = Self::ReceiveSmResponse {
358 sm_state: sm_state.take(),
359 bound_jid: bound_jid.take(),
360 };
361 // Ask caller to poll us again immediately in order to
362 // start flushing the stream.
363 Poll::Ready(Continue(None))
364 }
365
366 Self::ReceiveSmResponse {
367 sm_state,
368 bound_jid,
369 } => {
370 match Self::flush(stream.as_mut(), cx) {
371 Break(error) => {
372 return Poll::Ready(Break(NegotiationResult::Disconnect {
373 sm_state: sm_state.take(),
374 error,
375 }))
376 }
377 Continue(()) => (),
378 }
379
380 // So the difficulty here is that there's a possibility
381 // that we receive non-SM data while the SM negotiation
382 // is going on.
383
384 let item = ready!(stream.poll_next(cx));
385 let item = item.unwrap_or_else(|| {
386 Err(ReadError::HardError(io::Error::new(
387 io::ErrorKind::UnexpectedEof,
388 "eof before stream footer",
389 )))
390 });
391 let item = item.map(|v| v.into_read_error()).flatten();
392
393 match item {
394 // Pre-SM data. Note that we mustn't count this while we
395 // are still in negotiating state: we transit to
396 // [`Self::Ready`] immediately after we got the
397 // `<resumed/>` or `<enabled/>`, and before we got that,
398 // counting inbound stanzas is definitely wrong (see e.g.
399 // aioxmpp commit 796aa32).
400 Ok(XmppStreamElement::Stanza(data)) => Poll::Ready(Continue(Some(data))),
401
402 Ok(XmppStreamElement::SM(sm::Nonza::Enabled(enabled))) => {
403 if sm_state.is_some() {
404 // Okay, the peer violated the stream management
405 // protocol here (or we have a bug).
406 log::warn!(
407 "Received <enabled/>, but we also have previous SM state. One of us has a bug here (us or the peer) and I'm not sure which it is. If you can reproduce this, please re-run with trace loglevel and provide the logs. Attempting to proceed with a fresh session.",
408 );
409 }
410 // We must emit Reset here because this is a
411 // fresh stream and we did not resume.
412 Poll::Ready(Break(NegotiationResult::StreamReset {
413 sm_state: Some(enabled.into()),
414 bound_jid: bound_jid.take().expect("State machine error: no bound_jid available in SM negotiation.").into(),
415 }))
416 }
417
418 Ok(XmppStreamElement::SM(sm::Nonza::Resumed(resumed))) => match sm_state.take()
419 {
420 Some(mut sm_state) => {
421 // Yay!
422 match sm_state.resume(resumed.h) {
423 Ok(to_retransmit) => transmit_queue.requeue_all(to_retransmit),
424 Err(e) => {
425 // We kill the stream with an error
426 log::error!("Resumption failed: {e}");
427 return Poll::Ready(Break(NegotiationResult::StreamError {
428 error: e.into(),
429 }));
430 }
431 }
432 Poll::Ready(Break(NegotiationResult::StreamResumed { sm_state }))
433 }
434 None => {
435 // Okay, the peer violated the stream management
436 // protocol here (or we have a bug).
437 // Unlike the
438 // received-enabled-but-attempted-to-resume
439 // situation, we do not have enough information to
440 // proceed without having the stream break soon.
441 // (If we proceeded without a SM state, we would
442 // have the stream die as soon as the peer
443 // requests our counters).
444 // We thus terminate the stream with an error.
445 // We must emit Reset here because this is a fresh
446 // stream and we did not resume.
447 Poll::Ready(Break(NegotiationResult::Disconnect {
448 sm_state: None,
449 error: io::Error::new(io::ErrorKind::InvalidData, "Peer replied to <sm:enable/> request with <sm:resumed/> response"),
450 }))
451 }
452 },
453
454 Ok(XmppStreamElement::SM(sm::Nonza::Failed(failed))) => match sm_state {
455 Some(sm_state) => {
456 log::debug!("Received <sm:failed/> in response to resumption request. Discarding SM data and attempting to renegotiate.");
457 if let Some(h) = failed.h {
458 // This is only an optimization anyway, so
459 // we can also just ignore this.
460 let _: Result<_, _> = sm_state.remote_acked(h);
461 }
462 *self = Self::SendBindRequest { sm_supported: true };
463 Poll::Ready(Continue(None))
464 }
465 None => {
466 log::warn!("Received <sm:failed/> in response to enable request. Proceeding without stream management.");
467
468 // We must emit Reset here because this is a
469 // fresh stream and we did not resume.
470 Poll::Ready(Break(NegotiationResult::StreamReset {
471 bound_jid: bound_jid.take().expect("State machine error: no bound_jid available in SM negotiation.").into(),
472 sm_state: None,
473 }))
474 }
475 },
476
477 Ok(XmppStreamElement::StreamError(error)) => {
478 log::debug!("Received stream error, failing stream and discarding any stream management state.");
479 let error = io::Error::other(error);
480 transmit_queue.fail(&(&error).into());
481 Poll::Ready(Break(NegotiationResult::Disconnect {
482 error,
483 sm_state: None,
484 }))
485 }
486
487 Ok(other) => {
488 log::warn!("Received unsupported stream element during negotiation: {other:?}. Emitting stream error.");
489 Poll::Ready(Break(NegotiationResult::StreamError {
490 error: StreamError::new(
491 DefinedCondition::UnsupportedStanzaType,
492 "en",
493 format!("Unsupported stream element during negotiation: {other:?}"),
494 ),
495 }))
496 }
497
498 // Soft timeouts during negotiation are a bad sign
499 // (because we already prompted the server to send
500 // something and are waiting for it), but also nothing
501 // to write home about.
502 Err(ReadError::SoftTimeout) => Poll::Ready(Continue(None)),
503
504 // Parse errors during negotiation cause an unconditional
505 // stream error.
506 Err(ReadError::ParseError(e)) => {
507 Poll::Ready(Break(NegotiationResult::StreamError {
508 error: parse_error_to_stream_error(e),
509 }))
510 }
511
512 // I/O errors cause the stream to be considered
513 // broken; we drop it and send a Disconnect event with
514 // the error embedded.
515 Err(ReadError::HardError(error)) => {
516 Poll::Ready(Break(NegotiationResult::Disconnect {
517 sm_state: sm_state.take(),
518 error,
519 }))
520 }
521
522 // Stream footer during negotiation is really weird.
523 // We kill the stream immediately with an error
524 // (but allow preservation of the SM state).
525 Err(ReadError::StreamFooterReceived) => {
526 Poll::Ready(Break(NegotiationResult::Disconnect {
527 sm_state: sm_state.take(),
528 error: io::Error::new(
529 io::ErrorKind::InvalidData,
530 "stream footer received during negotiation",
531 ),
532 }))
533 }
534 }
535 }
536 }
537 }
538}