tokio_xmpp/stanzastream/negotiation.rs
1// Copyright (c) 2019 Emmanuel Gil Peyrot <linkmauve@linkmauve.fr>
2//
3// This Source Code Form is subject to the terms of the Mozilla Public
4// License, v. 2.0. If a copy of the MPL was not distributed with this
5// file, You can obtain one at http://mozilla.org/MPL/2.0/.
6
7use core::ops::ControlFlow::{self, Break, Continue};
8use core::pin::Pin;
9use core::task::{Context, Poll};
10use std::io;
11
12use futures::{ready, Sink, Stream};
13
14use xmpp_parsers::{
15 bind::{BindQuery, BindResponse},
16 iq::{Iq, IqType},
17 jid::{FullJid, Jid},
18 sm,
19 stream_error::{DefinedCondition, StreamError},
20 stream_features::StreamFeatures,
21};
22
23use crate::xmlstream::{ReadError, XmppStreamElement};
24use crate::Stanza;
25
26use super::queue::{QueueEntry, TransmitQueue};
27use super::stream_management::*;
28use super::worker::{parse_error_to_stream_error, XmppStream};
29
30static BIND_REQ_ID: &str = "resource-binding";
31
32pub(super) enum NegotiationState {
33 /// Send request to enable or resume stream management.
34 SendSmRequest {
35 /// Stream management state to use. If present, resumption will be
36 /// attempted. Otherwise, a fresh session will be established.
37 sm_state: Option<SmState>,
38
39 /// If the stream has been freshly bound, we carry the bound JID along
40 /// with us.
41 bound_jid: Option<FullJid>,
42 },
43
44 /// Await the response to the SM enable/resume request.
45 ReceiveSmResponse {
46 /// State to use.
47 sm_state: Option<SmState>,
48
49 /// If the stream has been freshly bound, we carry the bound JID along
50 /// with us.
51 bound_jid: Option<FullJid>,
52 },
53
54 /// Send a new request to bind to a resource.
55 SendBindRequest { sm_supported: bool },
56
57 /// Receive the bind response.
58 ReceiveBindResponse { sm_supported: bool },
59}
60
61/// The ultimate result of a stream negotiation.
62pub(super) enum NegotiationResult {
63 /// An unplanned disconnect happened or a stream error was received from
64 /// the remote party.
65 Disconnect {
66 /// Stream management state for a later resumption attempt.
67 sm_state: Option<SmState>,
68
69 /// I/O error which came along the disconnect.
70 error: io::Error,
71 },
72
73 /// The negotiation completed successfully, but the stream was reset (i.e.
74 /// stream management and all session state was lost).
75 StreamReset {
76 /// Stream management state. This may still be non-None if the new
77 /// stream has successfully negotiated stream management.
78 sm_state: Option<SmState>,
79
80 /// The JID to which the stream is now bound.
81 bound_jid: Jid,
82 },
83
84 /// The negotiation completed successfully and a previous session was
85 /// resumed.
86 StreamResumed {
87 /// Negotiated stream management state.
88 sm_state: SmState,
89 },
90
91 /// The negotiation failed and we need to emit a stream error.
92 ///
93 /// **Note:** Stream errors *received* from the peer are signalled using
94 /// [`Self::Disconnect`] instead, with an I/O error of kind `Other`.
95 StreamError {
96 /// Stream error to send to the remote party with details about the
97 /// failure.
98 error: StreamError,
99 },
100}
101
102impl NegotiationState {
103 pub fn new(features: &StreamFeatures, sm_state: Option<SmState>) -> io::Result<Self> {
104 match sm_state {
105 Some(sm_state) => {
106 if features.stream_management.is_some() {
107 return Ok(Self::SendSmRequest {
108 sm_state: Some(sm_state),
109 bound_jid: None,
110 });
111 } else {
112 log::warn!("Peer is not offering stream management anymore. Dropping state.");
113 }
114 }
115 None => (),
116 }
117
118 if !features.can_bind() {
119 return Err(io::Error::new(
120 io::ErrorKind::InvalidData,
121 "Peer is not offering the bind feature. Cannot proceed with stream negotiation.",
122 ));
123 }
124
125 Ok(Self::SendBindRequest {
126 sm_supported: features.stream_management.is_some(),
127 })
128 }
129
130 fn flush(stream: Pin<&mut XmppStream>, cx: &mut Context) -> ControlFlow<io::Error, ()> {
131 match <XmppStream as Sink<&XmppStreamElement>>::poll_flush(stream, cx) {
132 Poll::Pending | Poll::Ready(Ok(())) => Continue(()),
133 Poll::Ready(Err(error)) => Break(error),
134 }
135 }
136
137 pub fn advance(
138 &mut self,
139 mut stream: Pin<&mut XmppStream>,
140 jid: &Jid,
141 transmit_queue: &mut TransmitQueue<QueueEntry>,
142 cx: &mut Context<'_>,
143 ) -> Poll<ControlFlow<NegotiationResult, Option<Stanza>>> {
144 // When sending requests, we need to wait for the stream to become
145 // ready to send and then send the corresponding request.
146 // Note that if this wasn't a fresh stream (which it always is!),
147 // doing it in this kind of simplex fashion could lead to deadlocks
148 // (because we are blockingly sending without attempting to receive: a
149 // peer could stop receiving from our side if their tx buffer was too
150 // full or smth). However, because this stream is fresh, we know that
151 // our tx buffers are empty enough that this will succeed quickly, so
152 // that we can proceed.
153 // TODO: define a deadline for negotiation.
154 match self {
155 Self::SendBindRequest { sm_supported } => {
156 match ready!(<XmppStream as Sink<&Stanza>>::poll_ready(
157 stream.as_mut(),
158 cx
159 )) {
160 // We can send.
161 Ok(()) => (),
162
163 // Stream broke horribly.
164 Err(error) => {
165 return Poll::Ready(Break(NegotiationResult::Disconnect {
166 sm_state: None,
167 error,
168 }))
169 }
170 };
171
172 let resource = jid.resource().map(|x| x.as_str().to_owned());
173 let stanza = Iq::from_set(BIND_REQ_ID, BindQuery::new(resource));
174 match stream.start_send(&stanza) {
175 Ok(()) => (),
176 Err(e) => panic!("failed to serialize BindQuery: {}", e),
177 };
178
179 *self = Self::ReceiveBindResponse {
180 sm_supported: *sm_supported,
181 };
182 Poll::Ready(Continue(None))
183 }
184
185 Self::ReceiveBindResponse { sm_supported } => {
186 match Self::flush(stream.as_mut(), cx) {
187 Break(error) => {
188 return Poll::Ready(Break(NegotiationResult::Disconnect {
189 sm_state: None,
190 error,
191 }))
192 }
193 Continue(()) => (),
194 }
195
196 let item = ready!(stream.poll_next(cx));
197 let item = item.unwrap_or_else(|| {
198 Err(ReadError::HardError(io::Error::new(
199 io::ErrorKind::UnexpectedEof,
200 "eof before stream footer",
201 )))
202 });
203
204 match item {
205 Ok(XmppStreamElement::Stanza(data)) => match data {
206 Stanza::Iq(iq) if iq.id == BIND_REQ_ID => {
207 let error = match iq.payload {
208 IqType::Result(Some(payload)) => {
209 match BindResponse::try_from(payload) {
210 Ok(v) => {
211 let bound_jid = v.into();
212 if *sm_supported {
213 *self = Self::SendSmRequest {
214 sm_state: None,
215 bound_jid: Some(bound_jid),
216 };
217 return Poll::Ready(Continue(None));
218 } else {
219 return Poll::Ready(Break(
220 NegotiationResult::StreamReset {
221 sm_state: None,
222 bound_jid: Jid::from(bound_jid),
223 },
224 ));
225 }
226 }
227 Err(e) => e.to_string(),
228 }
229 }
230 IqType::Result(None) => "Bind response has no payload".to_owned(),
231 _ => "Unexpected IQ type in response to bind request".to_owned(),
232 };
233 log::warn!("Received IQ matching the bind request, but parsing failed ({error})! Emitting stream error.");
234 Poll::Ready(Break(NegotiationResult::StreamError {
235 error: StreamError {
236 condition: DefinedCondition::UndefinedCondition,
237 text: Some((None, error)),
238 application_specific: vec![super::error::ParseError.into()],
239 },
240 }))
241 }
242 st => {
243 log::warn!("Received unexpected stanza before response to bind request: {st:?}. Dropping.");
244 Poll::Ready(Continue(None))
245 }
246 },
247
248 Ok(XmppStreamElement::StreamError(error)) => {
249 log::debug!("Received stream:error, failing stream and discarding any stream management state.");
250 let error = io::Error::new(io::ErrorKind::Other, error);
251 transmit_queue.fail(&(&error).into());
252 Poll::Ready(Break(NegotiationResult::Disconnect {
253 error,
254 sm_state: None,
255 }))
256 }
257
258 Ok(other) => {
259 log::warn!("Received unsupported stream element during bind: {other:?}. Emitting stream error.");
260 Poll::Ready(Break(NegotiationResult::StreamError {
261 error: StreamError {
262 condition: DefinedCondition::UnsupportedStanzaType,
263 text: None,
264 application_specific: vec![],
265 },
266 }))
267 }
268
269 // Soft timeouts during negotiation are a bad sign
270 // (because we already prompted the server to send
271 // something and are waiting for it), but also nothing
272 // to write home about.
273 Err(ReadError::SoftTimeout) => Poll::Ready(Continue(None)),
274
275 // Parse errors during negotiation cause an unconditional
276 // stream error.
277 Err(ReadError::ParseError(e)) => {
278 Poll::Ready(Break(NegotiationResult::StreamError {
279 error: parse_error_to_stream_error(e),
280 }))
281 }
282
283 // I/O errors cause the stream to be considered
284 // broken; we drop it and send a Disconnect event with
285 // the error embedded.
286 Err(ReadError::HardError(error)) => {
287 Poll::Ready(Break(NegotiationResult::Disconnect {
288 sm_state: None,
289 error,
290 }))
291 }
292
293 // Stream footer during negotation is really weird.
294 // We kill the stream immediately with an error
295 // (but allow preservation of the SM state).
296 Err(ReadError::StreamFooterReceived) => {
297 Poll::Ready(Break(NegotiationResult::Disconnect {
298 sm_state: None,
299 error: io::Error::new(
300 io::ErrorKind::InvalidData,
301 "stream footer received during negotation",
302 ),
303 }))
304 }
305 }
306 }
307
308 Self::SendSmRequest {
309 sm_state,
310 bound_jid,
311 } => {
312 match ready!(<XmppStream as Sink<&XmppStreamElement>>::poll_ready(
313 stream.as_mut(),
314 cx
315 )) {
316 // We can send.
317 Ok(()) => (),
318
319 // Stream broke horribly.
320 Err(error) => {
321 return Poll::Ready(Break(NegotiationResult::Disconnect {
322 sm_state: sm_state.take(),
323 error,
324 }))
325 }
326 };
327
328 let nonza = if let Some((id, inbound_ctr)) =
329 sm_state.as_ref().and_then(|x| x.resume_info())
330 {
331 // Attempt resumption
332 sm::Nonza::Resume(sm::Resume {
333 h: inbound_ctr,
334 previd: sm::StreamId(id.to_owned()),
335 })
336 } else {
337 // Attempt enabling
338 sm::Nonza::Enable(sm::Enable {
339 max: None,
340 resume: true,
341 })
342 };
343 match stream.start_send(&XmppStreamElement::SM(nonza)) {
344 Ok(()) => (),
345 Err(e) => {
346 // We panic here, instead of returning an
347 // error, because after we confirmed via
348 // poll_ready that the stream is ready to
349 // send, the only error returned by start_send
350 // can be caused by our data.
351 panic!("Failed to send SM nonza: {}", e);
352 }
353 }
354
355 *self = Self::ReceiveSmResponse {
356 sm_state: sm_state.take(),
357 bound_jid: bound_jid.take(),
358 };
359 // Ask caller to poll us again immediately in order to
360 // start flushing the stream.
361 Poll::Ready(Continue(None))
362 }
363
364 Self::ReceiveSmResponse {
365 sm_state,
366 bound_jid,
367 } => {
368 match Self::flush(stream.as_mut(), cx) {
369 Break(error) => {
370 return Poll::Ready(Break(NegotiationResult::Disconnect {
371 sm_state: sm_state.take(),
372 error,
373 }))
374 }
375 Continue(()) => (),
376 }
377
378 // So the difficulty here is that there's a possibility
379 // that we receive non-SM data while the SM negotiation
380 // is going on.
381
382 let item = ready!(stream.poll_next(cx));
383 let item = item.unwrap_or_else(|| {
384 Err(ReadError::HardError(io::Error::new(
385 io::ErrorKind::UnexpectedEof,
386 "eof before stream footer",
387 )))
388 });
389 match item {
390 // Pre-SM data. Note that we mustn't count this while we
391 // are still in negotiating state: we transit to
392 // [`Self::Ready`] immediately after we got the
393 // `<resumed/>` or `<enabled/>`, and before we got that,
394 // counting inbound stanzas is definitely wrong (see e.g.
395 // aioxmpp commit 796aa32).
396 Ok(XmppStreamElement::Stanza(data)) => Poll::Ready(Continue(Some(data))),
397
398 Ok(XmppStreamElement::SM(sm::Nonza::Enabled(enabled))) => {
399 if sm_state.is_some() {
400 // Okay, the peer violated the stream management
401 // protocol here (or we have a bug).
402 log::warn!(
403 "Received <enabled/>, but we also have previous SM state. One of us has a bug here (us or the peer) and I'm not sure which it is. If you can reproduce this, please re-run with trace loglevel and provide the logs. Attempting to proceed with a fresh session.",
404 );
405 }
406 // We must emit Reset here because this is a
407 // fresh stream and we did not resume.
408 Poll::Ready(Break(NegotiationResult::StreamReset {
409 sm_state: Some(enabled.into()),
410 bound_jid: bound_jid.take().expect("State machine error: no bound_jid available in SM negotiation.").into(),
411 }))
412 }
413
414 Ok(XmppStreamElement::SM(sm::Nonza::Resumed(resumed))) => match sm_state.take()
415 {
416 Some(mut sm_state) => {
417 // Yay!
418 match sm_state.resume(resumed.h) {
419 Ok(to_retransmit) => transmit_queue.requeue_all(to_retransmit),
420 Err(e) => {
421 // We kill the stream with an error
422 log::error!("Resumption failed: {e}");
423 return Poll::Ready(Break(NegotiationResult::StreamError {
424 error: e.into(),
425 }));
426 }
427 }
428 Poll::Ready(Break(NegotiationResult::StreamResumed { sm_state }))
429 }
430 None => {
431 // Okay, the peer violated the stream management
432 // protocol here (or we have a bug).
433 // Unlike the
434 // received-enabled-but-attempted-to-resume
435 // situation, we do not have enough information to
436 // proceed without having the stream break soon.
437 // (If we proceeded without a SM state, we would
438 // have the stream die as soon as the peer
439 // requests our counters).
440 // We thus terminate the stream with an error.
441 // We must emit Reset here because this is a fresh
442 // stream and we did not resume.
443 Poll::Ready(Break(NegotiationResult::Disconnect {
444 sm_state: None,
445 error: io::Error::new(io::ErrorKind::InvalidData, "Peer replied to <sm:enable/> request with <sm:resumed/> response"),
446 }))
447 }
448 },
449
450 Ok(XmppStreamElement::SM(sm::Nonza::Failed(failed))) => match sm_state {
451 Some(sm_state) => {
452 log::debug!("Received <sm:failed/> in response to resumption request. Discarding SM data and attempting to renegotiate.");
453 if let Some(h) = failed.h {
454 // This is only an optimization anyway, so
455 // we can also just ignore this.
456 let _: Result<_, _> = sm_state.remote_acked(h);
457 }
458 *self = Self::SendBindRequest { sm_supported: true };
459 Poll::Ready(Continue(None))
460 }
461 None => {
462 log::warn!("Received <sm:failed/> in response to enable request. Proceeding without stream management.");
463
464 // We must emit Reset here because this is a
465 // fresh stream and we did not resume.
466 Poll::Ready(Break(NegotiationResult::StreamReset {
467 bound_jid: bound_jid.take().expect("State machine error: no bound_jid available in SM negotiation.").into(),
468 sm_state: None,
469 }))
470 }
471 },
472
473 Ok(XmppStreamElement::StreamError(error)) => {
474 log::debug!("Received stream error, failing stream and discarding any stream management state.");
475 let error = io::Error::new(io::ErrorKind::Other, error);
476 transmit_queue.fail(&(&error).into());
477 Poll::Ready(Break(NegotiationResult::Disconnect {
478 error,
479 sm_state: None,
480 }))
481 }
482
483 Ok(other) => {
484 log::warn!("Received unsupported stream element during negotiation: {other:?}. Emitting stream error.");
485 Poll::Ready(Break(NegotiationResult::StreamError {
486 error: StreamError {
487 condition: DefinedCondition::UnsupportedStanzaType,
488 text: None,
489 application_specific: vec![],
490 },
491 }))
492 }
493
494 // Soft timeouts during negotiation are a bad sign
495 // (because we already prompted the server to send
496 // something and are waiting for it), but also nothing
497 // to write home about.
498 Err(ReadError::SoftTimeout) => Poll::Ready(Continue(None)),
499
500 // Parse errors during negotiation cause an unconditional
501 // stream error.
502 Err(ReadError::ParseError(e)) => {
503 Poll::Ready(Break(NegotiationResult::StreamError {
504 error: parse_error_to_stream_error(e),
505 }))
506 }
507
508 // I/O errors cause the stream to be considered
509 // broken; we drop it and send a Disconnect event with
510 // the error embedded.
511 Err(ReadError::HardError(error)) => {
512 Poll::Ready(Break(NegotiationResult::Disconnect {
513 sm_state: sm_state.take(),
514 error,
515 }))
516 }
517
518 // Stream footer during negotation is really weird.
519 // We kill the stream immediately with an error
520 // (but allow preservation of the SM state).
521 Err(ReadError::StreamFooterReceived) => {
522 Poll::Ready(Break(NegotiationResult::Disconnect {
523 sm_state: sm_state.take(),
524 error: io::Error::new(
525 io::ErrorKind::InvalidData,
526 "stream footer received during negotation",
527 ),
528 }))
529 }
530 }
531 }
532 }
533 }
534}