tokio_xmpp/stanzastream/connected.rs
1// Copyright (c) 2019 Emmanuel Gil Peyrot <linkmauve@linkmauve.fr>
2//
3// This Source Code Form is subject to the terms of the Mozilla Public
4// License, v. 2.0. If a copy of the MPL was not distributed with this
5// file, You can obtain one at http://mozilla.org/MPL/2.0/.
6
7use core::future::Future;
8use core::ops::ControlFlow::{Break, Continue};
9use core::pin::Pin;
10use core::task::{Context, Poll};
11use std::io;
12
13use futures::{ready, Sink, Stream};
14
15use xmpp_parsers::{
16 jid::Jid,
17 sm,
18 stream_error::{DefinedCondition, SentStreamError, StreamError},
19 stream_features::StreamFeatures,
20};
21
22use crate::xmlstream::{ReadError, XmppStreamElement};
23use crate::Stanza;
24
25use super::negotiation::{NegotiationResult, NegotiationState};
26use super::queue::{QueueEntry, StanzaState, TransmitQueue};
27use super::stream_management::*;
28use super::worker::{WorkerEvent, XmppStream, LOCAL_SHUTDOWN_TIMEOUT};
29
30#[derive(PartialEq)]
31pub(super) enum RxShutdownState {
32 AwaitingFooter,
33 AwaitingEof,
34 Done,
35}
36
37fn local_error_for_stream_error(
38 io_error: &mut Option<io::Error>,
39 stream_error: &mut Option<StreamError>,
40) -> io::Error {
41 io_error
42 .take()
43 .or_else(|| {
44 stream_error
45 .take()
46 .map(|x| io::Error::new(io::ErrorKind::InvalidData, SentStreamError(x)))
47 })
48 .unwrap_or_else(|| {
49 io::Error::new(
50 io::ErrorKind::InvalidData,
51 "unknown local stream error generated",
52 )
53 })
54}
55
56/// Substate of the [`BackendStream::Connected`] state.
57///
58/// Having the substate and its logic in a separate type allows us to
59/// circumvent problemns with moving data out of `&mut _` when transitioning
60/// between substates.
61pub(super) enum ConnectedState {
62 /// The stream is still being negotiated.
63 Negotiating {
64 /// Current state within the negotiations
65 substate: NegotiationState,
66 },
67
68 /// The stream is ready for transceiving.
69 Ready {
70 /// Stream management state, if any.
71 sm_state: Option<SmState>,
72 },
73
74 SendStreamError {
75 /// Stream error to send.
76 ///
77 /// `None` implies that we now only need to flush.
78 stream_error: Option<StreamError>,
79
80 /// I/O error to return to the caller once the flush is done.
81 ///
82 /// If `None`, an error will be synthesised.
83 io_error: Option<io::Error>,
84
85 /// Deadline until which the error must've been sent and the stream
86 /// must've been shut down.
87 deadline: Pin<Box<tokio::time::Sleep>>,
88 },
89
90 Failed {
91 error: Option<io::Error>,
92 sm_state: Option<SmState>,
93 },
94
95 /// A stream shutdown was initiated locally and we are flushing RX and TX
96 /// queues.
97 LocalShutdown {
98 /// Keep track on whether we have closed the TX side yet.
99 tx_closed: bool,
100
101 /// Keep track on how shut down the receiving side is.
102 rx_state: RxShutdownState,
103
104 /// Deadline until which graceful shutdown must complete; if the
105 /// deadline is exceeded (i.e. the contained Sleep future returns
106 /// ready), the streams will be dropped (and thus closed by the OS).
107 deadline: Pin<Box<tokio::time::Sleep>>,
108 },
109
110 /// The remote side closed the stream.
111 RemoteShutdown {
112 /// Keep the SM state for later resumption.
113 sm_state: Option<SmState>,
114 },
115
116 /// Local shutdown has completed; this is a final state, as local shutdown
117 /// signals an intent of stopping the stream forever.
118 LocalShutdownComplete,
119}
120
121/// Enumeration of events happening while the stream has finished the
122/// connection procedures and is established.
123pub(super) enum ConnectedEvent {
124 /// Event generated by the stream worker.
125 Worker(WorkerEvent),
126
127 /// The remote closed the stream orderly.
128 RemoteShutdown { sm_state: Option<SmState> },
129
130 /// We got disconnected through an error, either an I/O error
131 /// or some kind of stream error.
132 Disconnect {
133 /// Stream management state for later resumption attempts.
134 sm_state: Option<SmState>,
135
136 /// The error which caused the disconnect. This is generally not none,
137 /// but we cannot prove this at compile time because we have to take
138 /// the error from a mutable (i.e. non-owned) place.
139 error: Option<io::Error>,
140 },
141
142 /// A shutdown was requested by the local side of the stream.
143 LocalShutdownRequested,
144}
145
146impl ConnectedState {
147 fn to_stream_error_state(&mut self, stream_error: StreamError) {
148 *self = Self::SendStreamError {
149 stream_error: Some(stream_error),
150 io_error: None,
151 deadline: Box::pin(tokio::time::sleep(LOCAL_SHUTDOWN_TIMEOUT)),
152 };
153 }
154
155 fn to_failed_state(&mut self, error: io::Error, sm_state: Option<SmState>) {
156 *self = Self::Failed {
157 error: Some(error),
158 sm_state,
159 };
160 }
161
162 fn poll_write_sm_req(
163 mut sm_state: Option<&mut SmState>,
164 mut stream: Pin<&mut XmppStream>,
165 cx: &mut Context<'_>,
166 ) -> Poll<io::Result<()>> {
167 if let Some(sm_state) = sm_state.as_mut() {
168 // Request is pending.
169 if sm_state.pending_req {
170 match ready!(<XmppStream as Sink<&XmppStreamElement>>::poll_ready(
171 stream.as_mut(),
172 cx,
173 )) {
174 Ok(()) => (),
175 Err(e) => return Poll::Ready(Err(e)),
176 }
177 match stream
178 .as_mut()
179 .start_send(&XmppStreamElement::SM(sm::Nonza::Req(sm::R)))
180 {
181 Ok(()) => (),
182 Err(e) => {
183 // As the stream promised we would be able to
184 // send, this must be a problem with our
185 // (locally generated) nonza, i.e. this is
186 // fatal.
187 panic!("Failed to send SM Req nonza: {}", e);
188 }
189 }
190 sm_state.pending_req = false;
191 }
192 }
193 Poll::Ready(Ok(()))
194 }
195
196 fn poll_writes_inner(
197 mut sm_state: Option<&mut SmState>,
198 mut stream: Pin<&mut XmppStream>,
199 transmit_queue: &mut TransmitQueue<QueueEntry>,
200 cx: &mut Context<'_>,
201 ) -> Poll<io::Result<()>> {
202 let mut depleted = false;
203
204 // We prefer sending SM reqs before actual data.
205 // SM requests are used in response to soft timeouts as a way to
206 // trigger the remote side to send *something* to us. We may be
207 // sending a lot of data in bulk right now without ever expecting a
208 // response (or at least not anytime soon). In order to ensure that
209 // the server will in fact send a message to us soon, we have to send
210 // the SM request before anything else.
211 //
212 // Example scenario: Sending a bunch of MAM `<message/>`s over a slow,
213 // but low-latency link. The MAM response only triggers a response
214 // from the peer when everything has been transmitted, which may be
215 // longer than the stream timeout.
216 ready!(Self::poll_write_sm_req(
217 match sm_state {
218 None => None,
219 Some(ref mut v) => Some(v),
220 },
221 stream.as_mut(),
222 cx
223 ))?;
224
225 let mut transmitted = false;
226 // We prefer sending actual data before stream-management ACKs.
227 // While the other side may be waiting for our ACK, we are not obliged
228 // to send it straight away (XEP-0198 explicitly allows us to delay it
229 // for some implementation-defined time if we have stuff to send. Our
230 // implementation-defined time is "infinity"), so we try to make
231 // progress on real data.
232 loop {
233 // If either the queue has nothing for us or the stream isn't
234 // ready to send, we break out of the loop. We don't use ready!
235 // here because we may have SM ACKs to send.
236 let next = match transmit_queue.poll_next(cx) {
237 Poll::Ready(Some(v)) => v,
238 Poll::Ready(None) => {
239 // The transmit_queue is empty, so we set `depleted` to
240 // true in order to ensure that we return Ready if all SM
241 // acks also have been transmitted.
242 depleted = true;
243 break;
244 }
245 Poll::Pending => break,
246 };
247 // If the stream isn't ready to send, none of the other things can
248 // be sent either, so we can use ready!.
249 match ready!(<XmppStream as Sink<&Stanza>>::poll_ready(
250 stream.as_mut(),
251 cx
252 )) {
253 Ok(()) => (),
254 Err(e) => return Poll::Ready(Err(e)),
255 }
256 // We now either send the item or "die trying". It must
257 // be removed from the queue, because even if it fails to
258 // serialise, we don't want to reattempt sending it (
259 // unless by SM resumption retransmission).
260 let next = next.take();
261 match stream.as_mut().start_send(&next.stanza) {
262 Ok(()) => {
263 next.token.send_replace(StanzaState::Sent {});
264 if let Some(sm_state) = sm_state.as_mut() {
265 sm_state.enqueue(next);
266 }
267 transmitted = true;
268 }
269 // Serialisation error, report back to the queue item.
270 Err(e) => {
271 next.token
272 .send_replace(StanzaState::Failed { error: e.into() });
273 }
274 }
275 }
276
277 if let Some(sm_state) = sm_state.as_mut() {
278 // We can set it to transmitted directly, because it has been
279 // cleared by the previous call to poll_write_sm_req.
280 sm_state.pending_req = transmitted;
281 ready!(Self::poll_write_sm_req(Some(sm_state), stream.as_mut(), cx))?;
282 }
283
284 // Now, if the stream will let us and we need to, we can tack
285 // on some SM ACKs.
286 if let Some(sm_state) = sm_state {
287 while sm_state.pending_acks > 0 {
288 match ready!(<XmppStream as Sink<&XmppStreamElement>>::poll_ready(
289 stream.as_mut(),
290 cx,
291 )) {
292 Ok(()) => (),
293 Err(e) => return Poll::Ready(Err(e)),
294 }
295 match stream
296 .as_mut()
297 .start_send(&XmppStreamElement::SM(sm::Nonza::Ack(sm::A {
298 h: sm_state.inbound_ctr(),
299 }))) {
300 Ok(()) => (),
301 Err(e) => {
302 // As the stream promised we would be able to
303 // send, this must be a problem with our
304 // (locally generated) nonza, i.e. this is
305 // fatal.
306 panic!("Failed to send SM Ack nonza: {}", e);
307 }
308 }
309 sm_state.pending_acks -= 1;
310 }
311 }
312
313 // If we haven't transmitted, we may also not have polled
314 // the stream for readiness or flushing. We need to do
315 // that here to ensure progress. Even if our tx queue is
316 // empty, the tx buffer may be nonempty
317 match ready!(<XmppStream as Sink<&Stanza>>::poll_flush(
318 stream.as_mut(),
319 cx
320 )) {
321 Ok(()) => (),
322 Err(e) => return Poll::Ready(Err(e)),
323 }
324
325 // If we end up here, all data we currently have has been
326 // transmitted via the stream and the stream's tx buffers have
327 // been properly flushed.
328 if depleted {
329 // And here, we know that the transmit queue is closed,
330 // too. We return with success.
331 Poll::Ready(Ok(()))
332 } else {
333 // The transmit queue is still open, so more data could
334 // pour in to be transmitted.
335 Poll::Pending
336 }
337 }
338
339 /// Drive the stream in transmit simplex mode.
340 ///
341 /// This will block forever (i.e. return [`Poll::Pending`] without
342 /// installing a waker) if the stream is currently being negotiated.
343 /// Otherwise, it will attempt to drain the `transmit_queue`. When the
344 /// queue is empty, `Ok(())` is returned. When write errors occur, this
345 /// will also block forever.
346 ///
347 /// If nothing is to be sent, but the stream could be used for sending,
348 /// this will drive the flush part of the inner stream.
349 ///
350 /// Any errors are reported on the next call to `poll`.
351 pub fn poll_writes(
352 &mut self,
353 stream: Pin<&mut XmppStream>,
354 transmit_queue: &mut TransmitQueue<QueueEntry>,
355 cx: &mut Context<'_>,
356 ) -> Poll<()> {
357 match self {
358 Self::Ready { sm_state, .. } => match ready!(Self::poll_writes_inner(
359 sm_state.as_mut(),
360 stream,
361 transmit_queue,
362 cx
363 )) {
364 Ok(()) => Poll::Ready(()),
365 Err(e) => {
366 *self = Self::Failed {
367 error: Some(e),
368 sm_state: sm_state.take(),
369 };
370 Poll::Pending
371 }
372 },
373
374 _ => Poll::Pending,
375 }
376 }
377
378 /// Drive the stream in full-duplex mode.
379 ///
380 /// Stanzas from the `transmit_queue` are transmitted once the stream is
381 /// ready.
382 ///
383 /// Returns:
384 /// - Poll::Pending if it blocks on the inner stream
385 /// - Poll::Ready(None) if it needs to be called again for a proper result
386 /// - Poll::Ready(Some(.)) when it has proper result
387 pub fn poll(
388 &mut self,
389 mut stream: Pin<&mut XmppStream>,
390 jid: &Jid,
391 features: &StreamFeatures,
392 transmit_queue: &mut TransmitQueue<QueueEntry>,
393 cx: &mut Context<'_>,
394 ) -> Poll<Option<ConnectedEvent>> {
395 match self {
396 Self::Negotiating { ref mut substate } => {
397 match ready!(substate.advance(stream, jid, transmit_queue, cx)) {
398 Break(NegotiationResult::Disconnect { sm_state, error }) => {
399 self.to_failed_state(error, sm_state);
400 Poll::Ready(None)
401 }
402 Break(NegotiationResult::StreamReset {
403 sm_state,
404 bound_jid,
405 }) => {
406 *self = Self::Ready { sm_state };
407 Poll::Ready(Some(ConnectedEvent::Worker(WorkerEvent::Reset {
408 bound_jid,
409 features: features.clone(),
410 })))
411 }
412 Break(NegotiationResult::StreamResumed { sm_state }) => {
413 *self = Self::Ready {
414 sm_state: Some(sm_state),
415 };
416 Poll::Ready(Some(ConnectedEvent::Worker(WorkerEvent::Resumed)))
417 }
418 Break(NegotiationResult::StreamError { error }) => {
419 self.to_stream_error_state(error);
420 Poll::Ready(None)
421 }
422 Continue(Some(stanza)) => {
423 Poll::Ready(Some(ConnectedEvent::Worker(WorkerEvent::Stanza(stanza))))
424 }
425 Continue(None) => Poll::Ready(None),
426 }
427 }
428
429 Self::SendStreamError {
430 ref mut stream_error,
431 ref mut io_error,
432 ref mut deadline,
433 } => {
434 match stream.as_mut().poll_next(cx) {
435 Poll::Pending
436 | Poll::Ready(None)
437 | Poll::Ready(Some(Err(ReadError::StreamFooterReceived)))
438 | Poll::Ready(Some(Err(ReadError::SoftTimeout))) => (),
439 Poll::Ready(Some(Ok(ev))) => {
440 log::trace!("Discarding incoming data while sending stream error: {ev:?}")
441 }
442 Poll::Ready(Some(Err(ReadError::ParseError(e)))) => {
443 log::trace!("Ignoring parse error while sending stream error: {e}")
444 }
445 Poll::Ready(Some(Err(ReadError::HardError(e)))) => {
446 log::warn!("I/O error while sending stream error: {e}")
447 }
448 }
449
450 match deadline.as_mut().poll(cx) {
451 Poll::Pending => (),
452 Poll::Ready(()) => {
453 log::debug!("Timeout while sending stream error. Discarding state.");
454 let error = local_error_for_stream_error(io_error, stream_error);
455 self.to_failed_state(error, None);
456 return Poll::Ready(None);
457 }
458 }
459
460 // Cannot use ready! here because we have to consider the
461 // case where the other side is refusing to accept data
462 // because its outgoing buffer is too full.
463 if stream_error.is_some() {
464 match ready!(<XmppStream as Sink<&StreamError>>::poll_ready(
465 stream.as_mut(),
466 cx
467 ))
468 .and_then(|()| {
469 // The take serves as transition to the next state.
470 let stream_error = stream_error.take().unwrap();
471 let result = stream.as_mut().start_send(&stream_error);
472 *io_error = Some(local_error_for_stream_error(
473 io_error,
474 &mut Some(stream_error),
475 ));
476 result
477 }) {
478 Ok(()) => (),
479 Err(e) => {
480 log::debug!("Got I/O error while sending stream error: {e}. Skipping error transmission.");
481 let error = local_error_for_stream_error(io_error, stream_error);
482 self.to_failed_state(error, None);
483 return Poll::Ready(None);
484 }
485 }
486 }
487
488 match ready!(<XmppStream as Sink<&StreamError>>::poll_flush(
489 stream.as_mut(),
490 cx
491 )) {
492 Ok(()) => (),
493 Err(e) => {
494 log::debug!(
495 "Got I/O error while flushing stream error: {e}. Skipping flush.",
496 );
497 }
498 }
499 log::trace!("Stream error send complete, transitioning to Failed state");
500 *self = Self::Failed {
501 error: Some(local_error_for_stream_error(io_error, stream_error)),
502 // Do *not* resume after we caused a stream error.
503 sm_state: None,
504 };
505
506 // Request the caller to call us again to get the
507 // actual error message.
508 Poll::Ready(None)
509 }
510
511 Self::Ready { ref mut sm_state } => {
512 match Self::poll_writes_inner(
513 sm_state.as_mut(),
514 stream.as_mut(),
515 transmit_queue,
516 cx,
517 ) {
518 Poll::Pending => (),
519 Poll::Ready(Ok(())) => {
520 *self = Self::LocalShutdown {
521 rx_state: RxShutdownState::AwaitingFooter,
522 tx_closed: false,
523 deadline: Box::pin(tokio::time::sleep(LOCAL_SHUTDOWN_TIMEOUT)),
524 };
525 return Poll::Ready(Some(ConnectedEvent::LocalShutdownRequested));
526 }
527 Poll::Ready(Err(e)) => {
528 *self = Self::Failed {
529 error: Some(e),
530 sm_state: sm_state.take(),
531 };
532 return Poll::Ready(None);
533 }
534 }
535
536 let item = ready!(stream.poll_next(cx));
537 // We switch to a TxOpen or non-connected state when we
538 // receive the stream footer, so reading `None` from the
539 // stream is always an unclean closure.
540 let item = item.unwrap_or_else(|| {
541 Err(ReadError::HardError(io::Error::new(
542 io::ErrorKind::UnexpectedEof,
543 "eof before stream footer",
544 )))
545 });
546 match item {
547 // Easy case, we got some data.
548 Ok(XmppStreamElement::Stanza(data)) => {
549 Poll::Ready(Some(ConnectedEvent::Worker(WorkerEvent::Stanza(data))))
550 }
551
552 Ok(XmppStreamElement::SM(sm::Nonza::Ack(ack))) => {
553 if let Some(sm_state) = sm_state {
554 match sm_state.remote_acked(ack.h) {
555 Ok(()) => Poll::Ready(None),
556 Err(e) => {
557 log::error!(
558 "Failed to process <sm:a/> sent by the server: {e}",
559 );
560 self.to_stream_error_state(e.into());
561 return Poll::Ready(None);
562 }
563 }
564 } else {
565 log::debug!("Hmm... I got an <sm:a/> from the peer, but I don't have a stream management state. I'm gonna ignore that...");
566 Poll::Ready(None)
567 }
568 }
569
570 Ok(XmppStreamElement::SM(sm::Nonza::Req(_))) => {
571 if let Some(sm_state) = sm_state {
572 match sm_state.pending_acks.checked_add(1) {
573 None => panic!("Too many pending ACKs, something is wrong."),
574 Some(v) => sm_state.pending_acks = v,
575 }
576 } else {
577 log::warn!("Got an <sm:r/> from the peer, but we don't have any stream management state. Terminating stream with an error.");
578 self.to_stream_error_state(StreamError {
579 condition: DefinedCondition::UnsupportedStanzaType,
580 text: Some((
581 None,
582 "received <sm:r/>, but stream management is not enabled"
583 .to_owned(),
584 )),
585 application_specific: vec![],
586 });
587 }
588 // No matter whether we "enqueued" an ACK for send or
589 // whether we just successfully read something from
590 // the stream, we have to request to be polled again
591 // right away.
592 Poll::Ready(None)
593 }
594
595 Ok(other) => {
596 log::warn!(
597 "Received unsupported stream element: {other:?}. Emitting stream error.",
598 );
599 self.to_stream_error_state(StreamError {
600 condition: DefinedCondition::UnsupportedStanzaType,
601 // TODO: figure out a good way to provide the
602 // sender with more information.
603 text: None,
604 application_specific: vec![],
605 });
606 Poll::Ready(None)
607 }
608
609 // Another easy case: Soft timeouts are passed through
610 // to the caller for handling.
611 Err(ReadError::SoftTimeout) => {
612 Poll::Ready(Some(ConnectedEvent::Worker(WorkerEvent::SoftTimeout)))
613 }
614
615 // Parse errors are also just passed through (and will
616 // likely cause us to send a stream error).
617 Err(ReadError::ParseError(e)) => {
618 Poll::Ready(Some(ConnectedEvent::Worker(WorkerEvent::ParseError(e))))
619 }
620
621 // I/O errors cause the stream to be considerde
622 // broken; we drop it and send a Disconnect event with
623 // the error embedded.
624 Err(ReadError::HardError(e)) => {
625 let sm_state = sm_state.take();
626 Poll::Ready(Some(ConnectedEvent::Disconnect {
627 sm_state,
628 error: Some(e),
629 }))
630 }
631
632 // Stream footer indicates the remote wants to shut this
633 // stream down.
634 // We transition into RemoteShutdown state which makes us
635 // emit a special event until the caller takes care of it.
636 Err(ReadError::StreamFooterReceived) => {
637 *self = Self::RemoteShutdown {
638 sm_state: sm_state.take(),
639 };
640
641 // Let us be called again immediately to emit the
642 // notification.
643 Poll::Ready(None)
644 }
645 }
646 }
647
648 Self::Failed { sm_state, error } => Poll::Ready(Some(ConnectedEvent::Disconnect {
649 error: error.take(),
650 sm_state: sm_state.take(),
651 })),
652
653 Self::LocalShutdown { .. } | Self::LocalShutdownComplete => {
654 panic!("poll_next called in local shutdown");
655 }
656
657 Self::RemoteShutdown { ref mut sm_state } => {
658 Poll::Ready(Some(ConnectedEvent::RemoteShutdown {
659 sm_state: sm_state.take(),
660 }))
661 }
662 }
663 }
664
665 pub(super) fn poll_close(
666 &mut self,
667 mut stream: Pin<&mut XmppStream>,
668 cx: &mut Context<'_>,
669 ) -> Poll<Result<(), io::Error>> {
670 loop {
671 match self {
672 // User initiates shutdown by local choice.
673 // The actual shutdown is driven by the poll_read function and we
674 // only get woken up via the close_poller waker.
675 Self::Ready { .. } | Self::RemoteShutdown { .. } | Self::Negotiating { .. } => {
676 *self = Self::LocalShutdown {
677 rx_state: RxShutdownState::AwaitingFooter,
678 tx_closed: false,
679 deadline: Box::pin(tokio::time::sleep(LOCAL_SHUTDOWN_TIMEOUT)),
680 };
681 }
682
683 Self::Failed { error, .. } => match error.take() {
684 Some(error) => return Poll::Ready(Err(error)),
685 None => return Poll::Ready(Ok(())),
686 },
687
688 // If close is called while an attempt is made to send the
689 // stream error, we abort transmission.
690 Self::SendStreamError { .. } => {
691 log::debug!("close() called while stream error was being sent. Aborting transmission of stream error.");
692 *self = Self::LocalShutdown {
693 rx_state: RxShutdownState::AwaitingFooter,
694 tx_closed: false,
695 deadline: Box::pin(tokio::time::sleep(LOCAL_SHUTDOWN_TIMEOUT)),
696 };
697 }
698
699 // Wait for local shutdown (driven by poll_read) to complete.
700 Self::LocalShutdown {
701 ref mut deadline,
702 ref mut rx_state,
703 ref mut tx_closed,
704 } => {
705 match deadline.as_mut().poll(cx) {
706 Poll::Ready(()) => {
707 log::debug!("Dropping stream after shutdown timeout was exceeded.");
708 *self = Self::LocalShutdownComplete;
709 return Poll::Ready(Ok(()));
710 }
711 Poll::Pending => (),
712 }
713
714 if !*tx_closed {
715 // We cannot use ready! here, because we want to poll the
716 // receiving side in parallel.
717 match stream.as_mut().poll_shutdown(cx) {
718 Poll::Pending => (),
719 Poll::Ready(Ok(())) => {
720 *tx_closed = true;
721 }
722 Poll::Ready(Err(e)) => {
723 log::debug!(
724 "Ignoring write error during local stream shutdown: {e}"
725 );
726 *tx_closed = true;
727 }
728 }
729 }
730
731 match rx_state {
732 RxShutdownState::Done => {
733 if !*tx_closed {
734 // poll_close() returned Poll::Pending, so we have to
735 // return that, too.
736 return Poll::Pending;
737 }
738 }
739 // We can use ready! here because the `poll_close` has
740 // happened already; we don't want to poll anything else
741 // anymore.
742 _ => loop {
743 match ready!(stream.as_mut().poll_next(cx)) {
744 None => {
745 if *rx_state != RxShutdownState::AwaitingEof {
746 log::debug!("Ignoring early EOF during stream shutdown.");
747 }
748 *rx_state = RxShutdownState::Done;
749 break;
750 }
751 Some(Ok(data)) => {
752 log::debug!("Ignoring data received on stream during local shutdown: {data:?}");
753 }
754 Some(Err(ReadError::SoftTimeout)) => (),
755 Some(Err(ReadError::HardError(e))) => {
756 *rx_state = RxShutdownState::Done;
757 log::debug!("Ignoring read error during local shutdown: {e}");
758 break;
759 }
760 Some(Err(ReadError::ParseError(e))) => {
761 log::debug!(
762 "Ignoring parse error during local shutdown: {}",
763 e
764 );
765 }
766 Some(Err(ReadError::StreamFooterReceived)) => match rx_state {
767 RxShutdownState::AwaitingFooter => {
768 *rx_state = RxShutdownState::AwaitingEof;
769 }
770 RxShutdownState::AwaitingEof => {
771 unreachable!("multiple stream footers?!")
772 }
773 RxShutdownState::Done => unreachable!(),
774 },
775 }
776 },
777 }
778
779 if *tx_closed && *rx_state == RxShutdownState::Done {
780 // Now that everything is properly cleaned up on the
781 // xmlstream layer, we go through with closure.
782 ready!(<XmppStream as Sink<&Stanza>>::poll_close(
783 stream.as_mut(),
784 cx
785 ))?;
786 // And now that's done, we can finally call it a day.
787 *self = Self::LocalShutdownComplete;
788 return Poll::Ready(Ok(()));
789 } else {
790 return Poll::Pending;
791 }
792 }
793
794 Self::LocalShutdownComplete => return Poll::Ready(Ok(())),
795 }
796 }
797 }
798
799 pub(super) fn start_send_stream_error(&mut self, error: StreamError) {
800 match self {
801 Self::LocalShutdownComplete
802 | Self::LocalShutdown { .. }
803 | Self::RemoteShutdown { .. }
804 | Self::Failed { .. } => {
805 log::debug!("Request to send stream error ({error}), but we are already shutting down or have already failed. Discarding.");
806 return;
807 }
808
809 Self::Ready { .. } | Self::Negotiating { .. } => {}
810
811 Self::SendStreamError { .. } => {
812 log::debug!("Request to send stream error ({error}) while transmission of another stream error is already in progress. Discarding the new one.");
813 return;
814 }
815 }
816
817 *self = Self::SendStreamError {
818 deadline: Box::pin(tokio::time::sleep(LOCAL_SHUTDOWN_TIMEOUT)),
819 stream_error: Some(error),
820 io_error: None,
821 };
822 }
823
824 pub fn queue_sm_request(&mut self) -> bool {
825 match self {
826 Self::Ready { sm_state, .. } => {
827 if let Some(sm_state) = sm_state {
828 sm_state.pending_req = true;
829 true
830 } else {
831 false
832 }
833 }
834 _ => false,
835 }
836 }
837}