tokio_xmpp/stanzastream/negotiation.rs
1// Copyright (c) 2019 Emmanuel Gil Peyrot <linkmauve@linkmauve.fr>
2//
3// This Source Code Form is subject to the terms of the Mozilla Public
4// License, v. 2.0. If a copy of the MPL was not distributed with this
5// file, You can obtain one at http://mozilla.org/MPL/2.0/.
6
7use core::ops::ControlFlow::{self, Break, Continue};
8use core::pin::Pin;
9use core::task::{Context, Poll};
10use std::io;
11
12use futures::{ready, Sink, Stream};
13
14use xmpp_parsers::{
15 bind::{BindQuery, BindResponse},
16 iq::Iq,
17 jid::{FullJid, Jid},
18 sm,
19 stream_error::{DefinedCondition, StreamError},
20 stream_features::StreamFeatures,
21};
22
23use crate::xmlstream::{ReadError, XmppStreamElement};
24use crate::Stanza;
25
26use super::queue::{QueueEntry, TransmitQueue};
27use super::stream_management::*;
28use super::worker::{parse_error_to_stream_error, XmppStream};
29
30static BIND_REQ_ID: &str = "resource-binding";
31
32pub(super) enum NegotiationState {
33 /// Send request to enable or resume stream management.
34 SendSmRequest {
35 /// Stream management state to use. If present, resumption will be
36 /// attempted. Otherwise, a fresh session will be established.
37 sm_state: Option<SmState>,
38
39 /// If the stream has been freshly bound, we carry the bound JID along
40 /// with us.
41 bound_jid: Option<FullJid>,
42 },
43
44 /// Await the response to the SM enable/resume request.
45 ReceiveSmResponse {
46 /// State to use.
47 sm_state: Option<SmState>,
48
49 /// If the stream has been freshly bound, we carry the bound JID along
50 /// with us.
51 bound_jid: Option<FullJid>,
52 },
53
54 /// Send a new request to bind to a resource.
55 SendBindRequest { sm_supported: bool },
56
57 /// Receive the bind response.
58 ReceiveBindResponse { sm_supported: bool },
59}
60
61/// The ultimate result of a stream negotiation.
62pub(super) enum NegotiationResult {
63 /// An unplanned disconnect happened or a stream error was received from
64 /// the remote party.
65 Disconnect {
66 /// Stream management state for a later resumption attempt.
67 sm_state: Option<SmState>,
68
69 /// I/O error which came along the disconnect.
70 error: io::Error,
71 },
72
73 /// The negotiation completed successfully, but the stream was reset (i.e.
74 /// stream management and all session state was lost).
75 StreamReset {
76 /// Stream management state. This may still be non-None if the new
77 /// stream has successfully negotiated stream management.
78 sm_state: Option<SmState>,
79
80 /// The JID to which the stream is now bound.
81 bound_jid: Jid,
82 },
83
84 /// The negotiation completed successfully and a previous session was
85 /// resumed.
86 StreamResumed {
87 /// Negotiated stream management state.
88 sm_state: SmState,
89 },
90
91 /// The negotiation failed and we need to emit a stream error.
92 ///
93 /// **Note:** Stream errors *received* from the peer are signalled using
94 /// [`Self::Disconnect`] instead, with an I/O error of kind `Other`.
95 StreamError {
96 /// Stream error to send to the remote party with details about the
97 /// failure.
98 error: StreamError,
99 },
100}
101
102impl NegotiationState {
103 pub fn new(features: &StreamFeatures, sm_state: Option<SmState>) -> io::Result<Self> {
104 if let Some(sm_state) = sm_state {
105 if features.stream_management.is_some() {
106 return Ok(Self::SendSmRequest {
107 sm_state: Some(sm_state),
108 bound_jid: None,
109 });
110 } else {
111 log::warn!("Peer is not offering stream management anymore. Dropping state.");
112 }
113 }
114
115 if !features.can_bind() {
116 return Err(io::Error::new(
117 io::ErrorKind::InvalidData,
118 "Peer is not offering the bind feature. Cannot proceed with stream negotiation.",
119 ));
120 }
121
122 Ok(Self::SendBindRequest {
123 sm_supported: features.stream_management.is_some(),
124 })
125 }
126
127 fn flush(stream: Pin<&mut XmppStream>, cx: &mut Context) -> ControlFlow<io::Error, ()> {
128 match <XmppStream as Sink<&XmppStreamElement>>::poll_flush(stream, cx) {
129 Poll::Pending | Poll::Ready(Ok(())) => Continue(()),
130 Poll::Ready(Err(error)) => Break(error),
131 }
132 }
133
134 pub fn advance(
135 &mut self,
136 mut stream: Pin<&mut XmppStream>,
137 jid: &Jid,
138 transmit_queue: &mut TransmitQueue<QueueEntry>,
139 cx: &mut Context<'_>,
140 ) -> Poll<ControlFlow<NegotiationResult, Option<Stanza>>> {
141 // When sending requests, we need to wait for the stream to become
142 // ready to send and then send the corresponding request.
143 // Note that if this wasn't a fresh stream (which it always is!),
144 // doing it in this kind of simplex fashion could lead to deadlocks
145 // (because we are blockingly sending without attempting to receive: a
146 // peer could stop receiving from our side if their tx buffer was too
147 // full or smth). However, because this stream is fresh, we know that
148 // our tx buffers are empty enough that this will succeed quickly, so
149 // that we can proceed.
150 // TODO: define a deadline for negotiation.
151 match self {
152 Self::SendBindRequest { sm_supported } => {
153 match ready!(<XmppStream as Sink<&Stanza>>::poll_ready(
154 stream.as_mut(),
155 cx
156 )) {
157 // We can send.
158 Ok(()) => (),
159
160 // Stream broke horribly.
161 Err(error) => {
162 return Poll::Ready(Break(NegotiationResult::Disconnect {
163 sm_state: None,
164 error,
165 }))
166 }
167 };
168
169 let resource = jid.resource().map(|x| x.as_str().to_owned());
170 let stanza = Iq::from_set(BIND_REQ_ID, BindQuery::new(resource));
171 match stream.start_send(&stanza) {
172 Ok(()) => (),
173 Err(e) => panic!("failed to serialize BindQuery: {}", e),
174 };
175
176 *self = Self::ReceiveBindResponse {
177 sm_supported: *sm_supported,
178 };
179 Poll::Ready(Continue(None))
180 }
181
182 Self::ReceiveBindResponse { sm_supported } => {
183 match Self::flush(stream.as_mut(), cx) {
184 Break(error) => {
185 return Poll::Ready(Break(NegotiationResult::Disconnect {
186 sm_state: None,
187 error,
188 }))
189 }
190 Continue(()) => (),
191 }
192
193 let item = ready!(stream.poll_next(cx));
194 let item = item.unwrap_or_else(|| {
195 Err(ReadError::HardError(io::Error::new(
196 io::ErrorKind::UnexpectedEof,
197 "eof before stream footer",
198 )))
199 });
200
201 match item {
202 Ok(XmppStreamElement::Stanza(data)) => match data {
203 Stanza::Iq(iq) if iq.id() == BIND_REQ_ID => {
204 let error = match iq {
205 Iq::Result {
206 payload: Some(payload),
207 ..
208 } => match BindResponse::try_from(payload) {
209 Ok(v) => {
210 let bound_jid = v.into();
211 if *sm_supported {
212 *self = Self::SendSmRequest {
213 sm_state: None,
214 bound_jid: Some(bound_jid),
215 };
216 return Poll::Ready(Continue(None));
217 } else {
218 return Poll::Ready(Break(
219 NegotiationResult::StreamReset {
220 sm_state: None,
221 bound_jid: Jid::from(bound_jid),
222 },
223 ));
224 }
225 }
226 Err(e) => e.to_string(),
227 },
228 Iq::Result { payload: None, .. } => {
229 "Bind response has no payload".to_owned()
230 }
231 _ => "Unexpected IQ type in response to bind request".to_owned(),
232 };
233 log::warn!("Received IQ matching the bind request, but parsing failed ({error})! Emitting stream error.");
234 Poll::Ready(Break(NegotiationResult::StreamError {
235 error: StreamError::new(
236 DefinedCondition::UndefinedCondition,
237 "en",
238 error,
239 )
240 .with_application_specific(vec![super::error::ParseError.into()]),
241 }))
242 }
243 st => {
244 log::warn!("Received unexpected stanza before response to bind request: {st:?}. Dropping.");
245 Poll::Ready(Continue(None))
246 }
247 },
248
249 Ok(XmppStreamElement::StreamError(error)) => {
250 log::debug!("Received stream:error, failing stream and discarding any stream management state.");
251 let error = io::Error::other(error);
252 transmit_queue.fail(&(&error).into());
253 Poll::Ready(Break(NegotiationResult::Disconnect {
254 error,
255 sm_state: None,
256 }))
257 }
258
259 Ok(other) => {
260 log::warn!("Received unsupported stream element during bind: {other:?}. Emitting stream error.");
261 Poll::Ready(Break(NegotiationResult::StreamError {
262 error: StreamError::new(
263 DefinedCondition::UnsupportedStanzaType,
264 "en",
265 format!("Unsupported stream element during bind: {other:?}"),
266 ),
267 }))
268 }
269
270 // Soft timeouts during negotiation are a bad sign
271 // (because we already prompted the server to send
272 // something and are waiting for it), but also nothing
273 // to write home about.
274 Err(ReadError::SoftTimeout) => Poll::Ready(Continue(None)),
275
276 // Parse errors during negotiation cause an unconditional
277 // stream error.
278 Err(ReadError::ParseError(e)) => {
279 Poll::Ready(Break(NegotiationResult::StreamError {
280 error: parse_error_to_stream_error(e),
281 }))
282 }
283
284 // I/O errors cause the stream to be considered
285 // broken; we drop it and send a Disconnect event with
286 // the error embedded.
287 Err(ReadError::HardError(error)) => {
288 Poll::Ready(Break(NegotiationResult::Disconnect {
289 sm_state: None,
290 error,
291 }))
292 }
293
294 // Stream footer during negotation is really weird.
295 // We kill the stream immediately with an error
296 // (but allow preservation of the SM state).
297 Err(ReadError::StreamFooterReceived) => {
298 Poll::Ready(Break(NegotiationResult::Disconnect {
299 sm_state: None,
300 error: io::Error::new(
301 io::ErrorKind::InvalidData,
302 "stream footer received during negotation",
303 ),
304 }))
305 }
306 }
307 }
308
309 Self::SendSmRequest {
310 sm_state,
311 bound_jid,
312 } => {
313 match ready!(<XmppStream as Sink<&XmppStreamElement>>::poll_ready(
314 stream.as_mut(),
315 cx
316 )) {
317 // We can send.
318 Ok(()) => (),
319
320 // Stream broke horribly.
321 Err(error) => {
322 return Poll::Ready(Break(NegotiationResult::Disconnect {
323 sm_state: sm_state.take(),
324 error,
325 }))
326 }
327 };
328
329 let nonza = if let Some((id, inbound_ctr)) =
330 sm_state.as_ref().and_then(|x| x.resume_info())
331 {
332 // Attempt resumption
333 sm::Nonza::Resume(sm::Resume {
334 h: inbound_ctr,
335 previd: sm::StreamId(id.to_owned()),
336 })
337 } else {
338 // Attempt enabling
339 sm::Nonza::Enable(sm::Enable {
340 max: None,
341 resume: true,
342 })
343 };
344 match stream.start_send(&XmppStreamElement::SM(nonza)) {
345 Ok(()) => (),
346 Err(e) => {
347 // We panic here, instead of returning an
348 // error, because after we confirmed via
349 // poll_ready that the stream is ready to
350 // send, the only error returned by start_send
351 // can be caused by our data.
352 panic!("Failed to send SM nonza: {}", e);
353 }
354 }
355
356 *self = Self::ReceiveSmResponse {
357 sm_state: sm_state.take(),
358 bound_jid: bound_jid.take(),
359 };
360 // Ask caller to poll us again immediately in order to
361 // start flushing the stream.
362 Poll::Ready(Continue(None))
363 }
364
365 Self::ReceiveSmResponse {
366 sm_state,
367 bound_jid,
368 } => {
369 match Self::flush(stream.as_mut(), cx) {
370 Break(error) => {
371 return Poll::Ready(Break(NegotiationResult::Disconnect {
372 sm_state: sm_state.take(),
373 error,
374 }))
375 }
376 Continue(()) => (),
377 }
378
379 // So the difficulty here is that there's a possibility
380 // that we receive non-SM data while the SM negotiation
381 // is going on.
382
383 let item = ready!(stream.poll_next(cx));
384 let item = item.unwrap_or_else(|| {
385 Err(ReadError::HardError(io::Error::new(
386 io::ErrorKind::UnexpectedEof,
387 "eof before stream footer",
388 )))
389 });
390 match item {
391 // Pre-SM data. Note that we mustn't count this while we
392 // are still in negotiating state: we transit to
393 // [`Self::Ready`] immediately after we got the
394 // `<resumed/>` or `<enabled/>`, and before we got that,
395 // counting inbound stanzas is definitely wrong (see e.g.
396 // aioxmpp commit 796aa32).
397 Ok(XmppStreamElement::Stanza(data)) => Poll::Ready(Continue(Some(data))),
398
399 Ok(XmppStreamElement::SM(sm::Nonza::Enabled(enabled))) => {
400 if sm_state.is_some() {
401 // Okay, the peer violated the stream management
402 // protocol here (or we have a bug).
403 log::warn!(
404 "Received <enabled/>, but we also have previous SM state. One of us has a bug here (us or the peer) and I'm not sure which it is. If you can reproduce this, please re-run with trace loglevel and provide the logs. Attempting to proceed with a fresh session.",
405 );
406 }
407 // We must emit Reset here because this is a
408 // fresh stream and we did not resume.
409 Poll::Ready(Break(NegotiationResult::StreamReset {
410 sm_state: Some(enabled.into()),
411 bound_jid: bound_jid.take().expect("State machine error: no bound_jid available in SM negotiation.").into(),
412 }))
413 }
414
415 Ok(XmppStreamElement::SM(sm::Nonza::Resumed(resumed))) => match sm_state.take()
416 {
417 Some(mut sm_state) => {
418 // Yay!
419 match sm_state.resume(resumed.h) {
420 Ok(to_retransmit) => transmit_queue.requeue_all(to_retransmit),
421 Err(e) => {
422 // We kill the stream with an error
423 log::error!("Resumption failed: {e}");
424 return Poll::Ready(Break(NegotiationResult::StreamError {
425 error: e.into(),
426 }));
427 }
428 }
429 Poll::Ready(Break(NegotiationResult::StreamResumed { sm_state }))
430 }
431 None => {
432 // Okay, the peer violated the stream management
433 // protocol here (or we have a bug).
434 // Unlike the
435 // received-enabled-but-attempted-to-resume
436 // situation, we do not have enough information to
437 // proceed without having the stream break soon.
438 // (If we proceeded without a SM state, we would
439 // have the stream die as soon as the peer
440 // requests our counters).
441 // We thus terminate the stream with an error.
442 // We must emit Reset here because this is a fresh
443 // stream and we did not resume.
444 Poll::Ready(Break(NegotiationResult::Disconnect {
445 sm_state: None,
446 error: io::Error::new(io::ErrorKind::InvalidData, "Peer replied to <sm:enable/> request with <sm:resumed/> response"),
447 }))
448 }
449 },
450
451 Ok(XmppStreamElement::SM(sm::Nonza::Failed(failed))) => match sm_state {
452 Some(sm_state) => {
453 log::debug!("Received <sm:failed/> in response to resumption request. Discarding SM data and attempting to renegotiate.");
454 if let Some(h) = failed.h {
455 // This is only an optimization anyway, so
456 // we can also just ignore this.
457 let _: Result<_, _> = sm_state.remote_acked(h);
458 }
459 *self = Self::SendBindRequest { sm_supported: true };
460 Poll::Ready(Continue(None))
461 }
462 None => {
463 log::warn!("Received <sm:failed/> in response to enable request. Proceeding without stream management.");
464
465 // We must emit Reset here because this is a
466 // fresh stream and we did not resume.
467 Poll::Ready(Break(NegotiationResult::StreamReset {
468 bound_jid: bound_jid.take().expect("State machine error: no bound_jid available in SM negotiation.").into(),
469 sm_state: None,
470 }))
471 }
472 },
473
474 Ok(XmppStreamElement::StreamError(error)) => {
475 log::debug!("Received stream error, failing stream and discarding any stream management state.");
476 let error = io::Error::other(error);
477 transmit_queue.fail(&(&error).into());
478 Poll::Ready(Break(NegotiationResult::Disconnect {
479 error,
480 sm_state: None,
481 }))
482 }
483
484 Ok(other) => {
485 log::warn!("Received unsupported stream element during negotiation: {other:?}. Emitting stream error.");
486 Poll::Ready(Break(NegotiationResult::StreamError {
487 error: StreamError::new(
488 DefinedCondition::UnsupportedStanzaType,
489 "en",
490 format!("Unsupported stream element during negotiation: {other:?}"),
491 ),
492 }))
493 }
494
495 // Soft timeouts during negotiation are a bad sign
496 // (because we already prompted the server to send
497 // something and are waiting for it), but also nothing
498 // to write home about.
499 Err(ReadError::SoftTimeout) => Poll::Ready(Continue(None)),
500
501 // Parse errors during negotiation cause an unconditional
502 // stream error.
503 Err(ReadError::ParseError(e)) => {
504 Poll::Ready(Break(NegotiationResult::StreamError {
505 error: parse_error_to_stream_error(e),
506 }))
507 }
508
509 // I/O errors cause the stream to be considered
510 // broken; we drop it and send a Disconnect event with
511 // the error embedded.
512 Err(ReadError::HardError(error)) => {
513 Poll::Ready(Break(NegotiationResult::Disconnect {
514 sm_state: sm_state.take(),
515 error,
516 }))
517 }
518
519 // Stream footer during negotation is really weird.
520 // We kill the stream immediately with an error
521 // (but allow preservation of the SM state).
522 Err(ReadError::StreamFooterReceived) => {
523 Poll::Ready(Break(NegotiationResult::Disconnect {
524 sm_state: sm_state.take(),
525 error: io::Error::new(
526 io::ErrorKind::InvalidData,
527 "stream footer received during negotation",
528 ),
529 }))
530 }
531 }
532 }
533 }
534 }
535}