tokio_xmpp/stanzastream/
connected.rs

1// Copyright (c) 2019 Emmanuel Gil Peyrot <linkmauve@linkmauve.fr>
2//
3// This Source Code Form is subject to the terms of the Mozilla Public
4// License, v. 2.0. If a copy of the MPL was not distributed with this
5// file, You can obtain one at http://mozilla.org/MPL/2.0/.
6
7use core::future::Future;
8use core::ops::ControlFlow::{Break, Continue};
9use core::pin::Pin;
10use core::task::{Context, Poll};
11use std::io;
12
13use futures::{ready, Sink, Stream};
14
15use xmpp_parsers::{
16    jid::Jid,
17    sm,
18    stream_error::{DefinedCondition, SentStreamError, StreamError},
19    stream_features::StreamFeatures,
20};
21
22use crate::xmlstream::{ReadError, XmppStreamElement};
23use crate::Stanza;
24
25use super::negotiation::{NegotiationResult, NegotiationState};
26use super::queue::{QueueEntry, StanzaState, TransmitQueue};
27use super::stream_management::*;
28use super::worker::{WorkerEvent, XmppStream, LOCAL_SHUTDOWN_TIMEOUT};
29
30#[derive(PartialEq)]
31pub(super) enum RxShutdownState {
32    AwaitingFooter,
33    AwaitingEof,
34    Done,
35}
36
37fn local_error_for_stream_error(
38    io_error: &mut Option<io::Error>,
39    stream_error: &mut Option<StreamError>,
40) -> io::Error {
41    io_error
42        .take()
43        .or_else(|| {
44            stream_error
45                .take()
46                .map(|x| io::Error::new(io::ErrorKind::InvalidData, SentStreamError(x)))
47        })
48        .unwrap_or_else(|| {
49            io::Error::new(
50                io::ErrorKind::InvalidData,
51                "unknown local stream error generated",
52            )
53        })
54}
55
56/// Substate of the [`BackendStream::Connected`] state.
57///
58/// Having the substate and its logic in a separate type allows us to
59/// circumvent problemns with moving data out of `&mut _` when transitioning
60/// between substates.
61pub(super) enum ConnectedState {
62    /// The stream is still being negotiated.
63    Negotiating {
64        /// Current state within the negotiations
65        substate: NegotiationState,
66    },
67
68    /// The stream is ready for transceiving.
69    Ready {
70        /// Stream management state, if any.
71        sm_state: Option<SmState>,
72    },
73
74    SendStreamError {
75        /// Stream error to send.
76        ///
77        /// `None` implies that we now only need to flush.
78        stream_error: Option<StreamError>,
79
80        /// I/O error to return to the caller once the flush is done.
81        ///
82        /// If `None`, an error will be synthesised.
83        io_error: Option<io::Error>,
84
85        /// Deadline until which the error must've been sent and the stream
86        /// must've been shut down.
87        deadline: Pin<Box<tokio::time::Sleep>>,
88    },
89
90    Failed {
91        error: Option<io::Error>,
92        sm_state: Option<SmState>,
93    },
94
95    /// A stream shutdown was initiated locally and we are flushing RX and TX
96    /// queues.
97    LocalShutdown {
98        /// Keep track on whether we have closed the TX side yet.
99        tx_closed: bool,
100
101        /// Keep track on how shut down the receiving side is.
102        rx_state: RxShutdownState,
103
104        /// Deadline until which graceful shutdown must complete; if the
105        /// deadline is exceeded (i.e. the contained Sleep future returns
106        /// ready), the streams will be dropped (and thus closed by the OS).
107        deadline: Pin<Box<tokio::time::Sleep>>,
108    },
109
110    /// The remote side closed the stream.
111    RemoteShutdown {
112        /// Keep the SM state for later resumption.
113        sm_state: Option<SmState>,
114    },
115
116    /// Local shutdown has completed; this is a final state, as local shutdown
117    /// signals an intent of stopping the stream forever.
118    LocalShutdownComplete,
119}
120
121/// Enumeration of events happening while the stream has finished the
122/// connection procedures and is established.
123pub(super) enum ConnectedEvent {
124    /// Event generated by the stream worker.
125    Worker(WorkerEvent),
126
127    /// The remote closed the stream orderly.
128    RemoteShutdown { sm_state: Option<SmState> },
129
130    /// We got disconnected through an error, either an I/O error
131    /// or some kind of stream error.
132    Disconnect {
133        /// Stream management state for later resumption attempts.
134        sm_state: Option<SmState>,
135
136        /// The error which caused the disconnect. This is generally not none,
137        /// but we cannot prove this at compile time because we have to take
138        /// the error from a mutable (i.e. non-owned) place.
139        error: Option<io::Error>,
140    },
141
142    /// A shutdown was requested by the local side of the stream.
143    LocalShutdownRequested,
144}
145
146impl ConnectedState {
147    fn to_stream_error_state(&mut self, stream_error: StreamError) {
148        *self = Self::SendStreamError {
149            stream_error: Some(stream_error),
150            io_error: None,
151            deadline: Box::pin(tokio::time::sleep(LOCAL_SHUTDOWN_TIMEOUT)),
152        };
153    }
154
155    fn to_failed_state(&mut self, error: io::Error, sm_state: Option<SmState>) {
156        *self = Self::Failed {
157            error: Some(error),
158            sm_state,
159        };
160    }
161
162    fn poll_write_sm_req(
163        mut sm_state: Option<&mut SmState>,
164        mut stream: Pin<&mut XmppStream>,
165        cx: &mut Context<'_>,
166    ) -> Poll<io::Result<()>> {
167        if let Some(sm_state) = sm_state.as_mut() {
168            // Request is pending.
169            if sm_state.pending_req {
170                match ready!(<XmppStream as Sink<&XmppStreamElement>>::poll_ready(
171                    stream.as_mut(),
172                    cx,
173                )) {
174                    Ok(()) => (),
175                    Err(e) => return Poll::Ready(Err(e)),
176                }
177                match stream
178                    .as_mut()
179                    .start_send(&XmppStreamElement::SM(sm::Nonza::Req(sm::R)))
180                {
181                    Ok(()) => (),
182                    Err(e) => {
183                        // As the stream promised we would be able to
184                        // send, this must be a problem with our
185                        // (locally generated) nonza, i.e. this is
186                        // fatal.
187                        panic!("Failed to send SM Req nonza: {}", e);
188                    }
189                }
190                sm_state.pending_req = false;
191            }
192        }
193        Poll::Ready(Ok(()))
194    }
195
196    fn poll_writes_inner(
197        mut sm_state: Option<&mut SmState>,
198        mut stream: Pin<&mut XmppStream>,
199        transmit_queue: &mut TransmitQueue<QueueEntry>,
200        cx: &mut Context<'_>,
201    ) -> Poll<io::Result<()>> {
202        let mut depleted = false;
203
204        // We prefer sending SM reqs before actual data.
205        // SM requests are used in response to soft timeouts as a way to
206        // trigger the remote side to send *something* to us. We may be
207        // sending a lot of data in bulk right now without ever expecting a
208        // response (or at least not anytime soon). In order to ensure that
209        // the server will in fact send a message to us soon, we have to send
210        // the SM request before anything else.
211        //
212        // Example scenario: Sending a bunch of MAM `<message/>`s over a slow,
213        // but low-latency link. The MAM response only triggers a response
214        // from the peer when everything has been transmitted, which may be
215        // longer than the stream timeout.
216        ready!(Self::poll_write_sm_req(
217            sm_state.as_mut().map(|x| x as &mut SmState),
218            stream.as_mut(),
219            cx
220        ))?;
221
222        let mut transmitted = false;
223        // We prefer sending actual data before stream-management ACKs.
224        // While the other side may be waiting for our ACK, we are not obliged
225        // to send it straight away (XEP-0198 explicitly allows us to delay it
226        // for some implementation-defined time if we have stuff to send. Our
227        // implementation-defined time is "infinity"), so we try to make
228        // progress on real data.
229        loop {
230            // If either the queue has nothing for us or the stream isn't
231            // ready to send, we break out of the loop. We don't use ready!
232            // here because we may have SM ACKs to send.
233            let next = match transmit_queue.poll_next(cx) {
234                Poll::Ready(Some(v)) => v,
235                Poll::Ready(None) => {
236                    // The transmit_queue is empty, so we set `depleted` to
237                    // true in order to ensure that we return Ready if all SM
238                    // acks also have been transmitted.
239                    depleted = true;
240                    break;
241                }
242                Poll::Pending => break,
243            };
244            // If the stream isn't ready to send, none of the other things can
245            // be sent either, so we can use ready!.
246            match ready!(<XmppStream as Sink<&Stanza>>::poll_ready(
247                stream.as_mut(),
248                cx
249            )) {
250                Ok(()) => (),
251                Err(e) => return Poll::Ready(Err(e)),
252            }
253            // We now either send the item or "die trying". It must
254            // be removed from the queue, because even if it fails to
255            // serialise, we don't want to reattempt sending it (
256            // unless by SM resumption retransmission).
257            let next = next.take();
258            match stream.as_mut().start_send(&next.stanza) {
259                Ok(()) => {
260                    next.token.send_replace(StanzaState::Sent {});
261                    if let Some(sm_state) = sm_state.as_mut() {
262                        sm_state.enqueue(next);
263                    }
264                    transmitted = true;
265                }
266                // Serialisation error, report back to the queue item.
267                Err(e) => {
268                    next.token
269                        .send_replace(StanzaState::Failed { error: e.into() });
270                }
271            }
272        }
273
274        if let Some(sm_state) = sm_state.as_mut() {
275            // We can set it to transmitted directly, because it has been
276            // cleared by the previous call to poll_write_sm_req.
277            sm_state.pending_req = transmitted;
278            ready!(Self::poll_write_sm_req(Some(sm_state), stream.as_mut(), cx))?;
279        }
280
281        // Now, if the stream will let us and we need to, we can tack
282        // on some SM ACKs.
283        if let Some(sm_state) = sm_state {
284            while sm_state.pending_acks > 0 {
285                match ready!(<XmppStream as Sink<&XmppStreamElement>>::poll_ready(
286                    stream.as_mut(),
287                    cx,
288                )) {
289                    Ok(()) => (),
290                    Err(e) => return Poll::Ready(Err(e)),
291                }
292                match stream
293                    .as_mut()
294                    .start_send(&XmppStreamElement::SM(sm::Nonza::Ack(sm::A {
295                        h: sm_state.inbound_ctr(),
296                    }))) {
297                    Ok(()) => (),
298                    Err(e) => {
299                        // As the stream promised we would be able to
300                        // send, this must be a problem with our
301                        // (locally generated) nonza, i.e. this is
302                        // fatal.
303                        panic!("Failed to send SM Ack nonza: {}", e);
304                    }
305                }
306                sm_state.pending_acks -= 1;
307            }
308        }
309
310        // If we haven't transmitted, we may also not have polled
311        // the stream for readiness or flushing. We need to do
312        // that here to ensure progress. Even if our tx queue is
313        // empty, the tx buffer may be nonempty
314        match ready!(<XmppStream as Sink<&Stanza>>::poll_flush(
315            stream.as_mut(),
316            cx
317        )) {
318            Ok(()) => (),
319            Err(e) => return Poll::Ready(Err(e)),
320        }
321
322        // If we end up here, all data we currently have has been
323        // transmitted via the stream and the stream's tx buffers have
324        // been properly flushed.
325        if depleted {
326            // And here, we know that the transmit queue is closed,
327            // too. We return with success.
328            Poll::Ready(Ok(()))
329        } else {
330            // The transmit queue is still open, so more data could
331            // pour in to be transmitted.
332            Poll::Pending
333        }
334    }
335
336    /// Drive the stream in transmit simplex mode.
337    ///
338    /// This will block forever (i.e. return [`Poll::Pending`] without
339    /// installing a waker) if the stream is currently being negotiated.
340    /// Otherwise, it will attempt to drain the `transmit_queue`. When the
341    /// queue is empty, `Ok(())` is returned. When write errors occur, this
342    /// will also block forever.
343    ///
344    /// If nothing is to be sent, but the stream could be used for sending,
345    /// this will drive the flush part of the inner stream.
346    ///
347    /// Any errors are reported on the next call to `poll`.
348    pub fn poll_writes(
349        &mut self,
350        stream: Pin<&mut XmppStream>,
351        transmit_queue: &mut TransmitQueue<QueueEntry>,
352        cx: &mut Context<'_>,
353    ) -> Poll<()> {
354        match self {
355            Self::Ready { sm_state, .. } => match ready!(Self::poll_writes_inner(
356                sm_state.as_mut(),
357                stream,
358                transmit_queue,
359                cx
360            )) {
361                Ok(()) => Poll::Ready(()),
362                Err(e) => {
363                    *self = Self::Failed {
364                        error: Some(e),
365                        sm_state: sm_state.take(),
366                    };
367                    Poll::Pending
368                }
369            },
370
371            _ => Poll::Pending,
372        }
373    }
374
375    /// Drive the stream in full-duplex mode.
376    ///
377    /// Stanzas from the `transmit_queue` are transmitted once the stream is
378    /// ready.
379    ///
380    /// Returns:
381    /// - Poll::Pending if it blocks on the inner stream
382    /// - Poll::Ready(None) if it needs to be called again for a proper result
383    /// - Poll::Ready(Some(.)) when it has proper result
384    pub fn poll(
385        &mut self,
386        mut stream: Pin<&mut XmppStream>,
387        jid: &Jid,
388        features: &StreamFeatures,
389        transmit_queue: &mut TransmitQueue<QueueEntry>,
390        cx: &mut Context<'_>,
391    ) -> Poll<Option<ConnectedEvent>> {
392        match self {
393            Self::Negotiating { ref mut substate } => {
394                match ready!(substate.advance(stream, jid, transmit_queue, cx)) {
395                    Break(NegotiationResult::Disconnect { sm_state, error }) => {
396                        self.to_failed_state(error, sm_state);
397                        Poll::Ready(None)
398                    }
399                    Break(NegotiationResult::StreamReset {
400                        sm_state,
401                        bound_jid,
402                    }) => {
403                        *self = Self::Ready { sm_state };
404                        Poll::Ready(Some(ConnectedEvent::Worker(WorkerEvent::Reset {
405                            bound_jid,
406                            features: features.clone(),
407                        })))
408                    }
409                    Break(NegotiationResult::StreamResumed { sm_state }) => {
410                        *self = Self::Ready {
411                            sm_state: Some(sm_state),
412                        };
413                        Poll::Ready(Some(ConnectedEvent::Worker(WorkerEvent::Resumed)))
414                    }
415                    Break(NegotiationResult::StreamError { error }) => {
416                        self.to_stream_error_state(error);
417                        Poll::Ready(None)
418                    }
419                    Continue(Some(stanza)) => {
420                        Poll::Ready(Some(ConnectedEvent::Worker(WorkerEvent::Stanza(stanza))))
421                    }
422                    Continue(None) => Poll::Ready(None),
423                }
424            }
425
426            Self::SendStreamError {
427                ref mut stream_error,
428                ref mut io_error,
429                ref mut deadline,
430            } => {
431                match stream.as_mut().poll_next(cx) {
432                    Poll::Pending
433                    | Poll::Ready(None)
434                    | Poll::Ready(Some(Err(ReadError::StreamFooterReceived)))
435                    | Poll::Ready(Some(Err(ReadError::SoftTimeout))) => (),
436                    Poll::Ready(Some(Ok(ev))) => {
437                        log::trace!("Discarding incoming data while sending stream error: {ev:?}")
438                    }
439                    Poll::Ready(Some(Err(ReadError::ParseError(e)))) => {
440                        log::trace!("Ignoring parse error while sending stream error: {e}")
441                    }
442                    Poll::Ready(Some(Err(ReadError::HardError(e)))) => {
443                        log::warn!("I/O error while sending stream error: {e}")
444                    }
445                }
446
447                match deadline.as_mut().poll(cx) {
448                    Poll::Pending => (),
449                    Poll::Ready(()) => {
450                        log::debug!("Timeout while sending stream error. Discarding state.");
451                        let error = local_error_for_stream_error(io_error, stream_error);
452                        self.to_failed_state(error, None);
453                        return Poll::Ready(None);
454                    }
455                }
456
457                // Cannot use ready! here because we have to consider the
458                // case where the other side is refusing to accept data
459                // because its outgoing buffer is too full.
460                if stream_error.is_some() {
461                    match ready!(<XmppStream as Sink<&StreamError>>::poll_ready(
462                        stream.as_mut(),
463                        cx
464                    ))
465                    .and_then(|()| {
466                        // The take serves as transition to the next state.
467                        let stream_error = stream_error.take().unwrap();
468                        let result = stream.as_mut().start_send(&stream_error);
469                        *io_error = Some(local_error_for_stream_error(
470                            io_error,
471                            &mut Some(stream_error),
472                        ));
473                        result
474                    }) {
475                        Ok(()) => (),
476                        Err(e) => {
477                            log::debug!("Got I/O error while sending stream error: {e}. Skipping error transmission.");
478                            let error = local_error_for_stream_error(io_error, stream_error);
479                            self.to_failed_state(error, None);
480                            return Poll::Ready(None);
481                        }
482                    }
483                }
484
485                match ready!(<XmppStream as Sink<&StreamError>>::poll_flush(
486                    stream.as_mut(),
487                    cx
488                )) {
489                    Ok(()) => (),
490                    Err(e) => {
491                        log::debug!(
492                            "Got I/O error while flushing stream error: {e}. Skipping flush.",
493                        );
494                    }
495                }
496                log::trace!("Stream error send complete, transitioning to Failed state");
497                *self = Self::Failed {
498                    error: Some(local_error_for_stream_error(io_error, stream_error)),
499                    // Do *not* resume after we caused a stream error.
500                    sm_state: None,
501                };
502
503                // Request the caller to call us again to get the
504                // actual error message.
505                Poll::Ready(None)
506            }
507
508            Self::Ready { ref mut sm_state } => {
509                match Self::poll_writes_inner(
510                    sm_state.as_mut(),
511                    stream.as_mut(),
512                    transmit_queue,
513                    cx,
514                ) {
515                    Poll::Pending => (),
516                    Poll::Ready(Ok(())) => {
517                        *self = Self::LocalShutdown {
518                            rx_state: RxShutdownState::AwaitingFooter,
519                            tx_closed: false,
520                            deadline: Box::pin(tokio::time::sleep(LOCAL_SHUTDOWN_TIMEOUT)),
521                        };
522                        return Poll::Ready(Some(ConnectedEvent::LocalShutdownRequested));
523                    }
524                    Poll::Ready(Err(e)) => {
525                        *self = Self::Failed {
526                            error: Some(e),
527                            sm_state: sm_state.take(),
528                        };
529                        return Poll::Ready(None);
530                    }
531                }
532
533                let item = ready!(stream.poll_next(cx));
534                // We switch to a TxOpen or non-connected state when we
535                // receive the stream footer, so reading `None` from the
536                // stream is always an unclean closure.
537                let item = item.unwrap_or_else(|| {
538                    Err(ReadError::HardError(io::Error::new(
539                        io::ErrorKind::UnexpectedEof,
540                        "eof before stream footer",
541                    )))
542                });
543                match item {
544                    // Easy case, we got some data.
545                    Ok(XmppStreamElement::Stanza(data)) => {
546                        Poll::Ready(Some(ConnectedEvent::Worker(WorkerEvent::Stanza(data))))
547                    }
548
549                    Ok(XmppStreamElement::SM(sm::Nonza::Ack(ack))) => {
550                        if let Some(sm_state) = sm_state {
551                            match sm_state.remote_acked(ack.h) {
552                                Ok(()) => Poll::Ready(None),
553                                Err(e) => {
554                                    log::error!(
555                                        "Failed to process <sm:a/> sent by the server: {e}",
556                                    );
557                                    self.to_stream_error_state(e.into());
558                                    Poll::Ready(None)
559                                }
560                            }
561                        } else {
562                            log::debug!("Hmm... I got an <sm:a/> from the peer, but I don't have a stream management state. I'm gonna ignore that...");
563                            Poll::Ready(None)
564                        }
565                    }
566
567                    Ok(XmppStreamElement::SM(sm::Nonza::Req(_))) => {
568                        if let Some(sm_state) = sm_state {
569                            match sm_state.pending_acks.checked_add(1) {
570                                None => panic!("Too many pending ACKs, something is wrong."),
571                                Some(v) => sm_state.pending_acks = v,
572                            }
573                        } else {
574                            log::warn!("Got an <sm:r/> from the peer, but we don't have any stream management state. Terminating stream with an error.");
575                            self.to_stream_error_state(StreamError::new(
576                                DefinedCondition::UnsupportedStanzaType,
577                                "en",
578                                "received <sm:r/>, but stream management is not enabled".to_owned(),
579                            ));
580                        }
581                        // No matter whether we "enqueued" an ACK for send or
582                        // whether we just successfully read something from
583                        // the stream, we have to request to be polled again
584                        // right away.
585                        Poll::Ready(None)
586                    }
587
588                    Ok(other) => {
589                        log::warn!(
590                            "Received unsupported stream element: {other:?}. Emitting stream error.",
591                        );
592                        // TODO: figure out a good way to provide the sender
593                        // with more information.
594                        self.to_stream_error_state(StreamError::new(
595                            DefinedCondition::UnsupportedStanzaType,
596                            "en",
597                            format!("Unsupported stream element: {other:?}"),
598                        ));
599                        Poll::Ready(None)
600                    }
601
602                    // Another easy case: Soft timeouts are passed through
603                    // to the caller for handling.
604                    Err(ReadError::SoftTimeout) => {
605                        Poll::Ready(Some(ConnectedEvent::Worker(WorkerEvent::SoftTimeout)))
606                    }
607
608                    // Parse errors are also just passed through (and will
609                    // likely cause us to send a stream error).
610                    Err(ReadError::ParseError(e)) => {
611                        Poll::Ready(Some(ConnectedEvent::Worker(WorkerEvent::ParseError(e))))
612                    }
613
614                    // I/O errors cause the stream to be considerde
615                    // broken; we drop it and send a Disconnect event with
616                    // the error embedded.
617                    Err(ReadError::HardError(e)) => {
618                        let sm_state = sm_state.take();
619                        Poll::Ready(Some(ConnectedEvent::Disconnect {
620                            sm_state,
621                            error: Some(e),
622                        }))
623                    }
624
625                    // Stream footer indicates the remote wants to shut this
626                    // stream down.
627                    // We transition into RemoteShutdown state which makes us
628                    // emit a special event until the caller takes care of it.
629                    Err(ReadError::StreamFooterReceived) => {
630                        *self = Self::RemoteShutdown {
631                            sm_state: sm_state.take(),
632                        };
633
634                        // Let us be called again immediately to emit the
635                        // notification.
636                        Poll::Ready(None)
637                    }
638                }
639            }
640
641            Self::Failed { sm_state, error } => Poll::Ready(Some(ConnectedEvent::Disconnect {
642                error: error.take(),
643                sm_state: sm_state.take(),
644            })),
645
646            Self::LocalShutdown { .. } | Self::LocalShutdownComplete => {
647                panic!("poll_next called in local shutdown");
648            }
649
650            Self::RemoteShutdown { ref mut sm_state } => {
651                Poll::Ready(Some(ConnectedEvent::RemoteShutdown {
652                    sm_state: sm_state.take(),
653                }))
654            }
655        }
656    }
657
658    pub(super) fn poll_close(
659        &mut self,
660        mut stream: Pin<&mut XmppStream>,
661        cx: &mut Context<'_>,
662    ) -> Poll<Result<(), io::Error>> {
663        loop {
664            match self {
665                // User initiates shutdown by local choice.
666                // The actual shutdown is driven by the poll_read function and we
667                // only get woken up via the close_poller waker.
668                Self::Ready { .. } | Self::RemoteShutdown { .. } | Self::Negotiating { .. } => {
669                    *self = Self::LocalShutdown {
670                        rx_state: RxShutdownState::AwaitingFooter,
671                        tx_closed: false,
672                        deadline: Box::pin(tokio::time::sleep(LOCAL_SHUTDOWN_TIMEOUT)),
673                    };
674                }
675
676                Self::Failed { error, .. } => match error.take() {
677                    Some(error) => return Poll::Ready(Err(error)),
678                    None => return Poll::Ready(Ok(())),
679                },
680
681                // If close is called while an attempt is made to send the
682                // stream error, we abort transmission.
683                Self::SendStreamError { .. } => {
684                    log::debug!("close() called while stream error was being sent. Aborting transmission of stream error.");
685                    *self = Self::LocalShutdown {
686                        rx_state: RxShutdownState::AwaitingFooter,
687                        tx_closed: false,
688                        deadline: Box::pin(tokio::time::sleep(LOCAL_SHUTDOWN_TIMEOUT)),
689                    };
690                }
691
692                // Wait for local shutdown (driven by poll_read) to complete.
693                Self::LocalShutdown {
694                    ref mut deadline,
695                    ref mut rx_state,
696                    ref mut tx_closed,
697                } => {
698                    match deadline.as_mut().poll(cx) {
699                        Poll::Ready(()) => {
700                            log::debug!("Dropping stream after shutdown timeout was exceeded.");
701                            *self = Self::LocalShutdownComplete;
702                            return Poll::Ready(Ok(()));
703                        }
704                        Poll::Pending => (),
705                    }
706
707                    if !*tx_closed {
708                        // We cannot use ready! here, because we want to poll the
709                        // receiving side in parallel.
710                        match stream.as_mut().poll_shutdown(cx) {
711                            Poll::Pending => (),
712                            Poll::Ready(Ok(())) => {
713                                *tx_closed = true;
714                            }
715                            Poll::Ready(Err(e)) => {
716                                log::debug!(
717                                    "Ignoring write error during local stream shutdown: {e}"
718                                );
719                                *tx_closed = true;
720                            }
721                        }
722                    }
723
724                    match rx_state {
725                        RxShutdownState::Done => {
726                            if !*tx_closed {
727                                // poll_close() returned Poll::Pending, so we have to
728                                // return that, too.
729                                return Poll::Pending;
730                            }
731                        }
732                        // We can use ready! here because the `poll_close` has
733                        // happened already; we don't want to poll anything else
734                        // anymore.
735                        _ => loop {
736                            match ready!(stream.as_mut().poll_next(cx)) {
737                                None => {
738                                    if *rx_state != RxShutdownState::AwaitingEof {
739                                        log::debug!("Ignoring early EOF during stream shutdown.");
740                                    }
741                                    *rx_state = RxShutdownState::Done;
742                                    break;
743                                }
744                                Some(Ok(data)) => {
745                                    log::debug!("Ignoring data received on stream during local shutdown: {data:?}");
746                                }
747                                Some(Err(ReadError::SoftTimeout)) => (),
748                                Some(Err(ReadError::HardError(e))) => {
749                                    *rx_state = RxShutdownState::Done;
750                                    log::debug!("Ignoring read error during local shutdown: {e}");
751                                    break;
752                                }
753                                Some(Err(ReadError::ParseError(e))) => {
754                                    log::debug!(
755                                        "Ignoring parse error during local shutdown: {}",
756                                        e
757                                    );
758                                }
759                                Some(Err(ReadError::StreamFooterReceived)) => match rx_state {
760                                    RxShutdownState::AwaitingFooter => {
761                                        *rx_state = RxShutdownState::AwaitingEof;
762                                    }
763                                    RxShutdownState::AwaitingEof => {
764                                        unreachable!("multiple stream footers?!")
765                                    }
766                                    RxShutdownState::Done => unreachable!(),
767                                },
768                            }
769                        },
770                    }
771
772                    if *tx_closed && *rx_state == RxShutdownState::Done {
773                        // Now that everything is properly cleaned up on the
774                        // xmlstream layer, we go through with closure.
775                        ready!(<XmppStream as Sink<&Stanza>>::poll_close(
776                            stream.as_mut(),
777                            cx
778                        ))?;
779                        // And now that's done, we can finally call it a day.
780                        *self = Self::LocalShutdownComplete;
781                        return Poll::Ready(Ok(()));
782                    } else {
783                        return Poll::Pending;
784                    }
785                }
786
787                Self::LocalShutdownComplete => return Poll::Ready(Ok(())),
788            }
789        }
790    }
791
792    pub(super) fn start_send_stream_error(&mut self, error: StreamError) {
793        match self {
794            Self::LocalShutdownComplete
795            | Self::LocalShutdown { .. }
796            | Self::RemoteShutdown { .. }
797            | Self::Failed { .. } => {
798                log::debug!("Request to send stream error ({error}), but we are already shutting down or have already failed. Discarding.");
799                return;
800            }
801
802            Self::Ready { .. } | Self::Negotiating { .. } => {}
803
804            Self::SendStreamError { .. } => {
805                log::debug!("Request to send stream error ({error}) while transmission of another stream error is already in progress. Discarding the new one.");
806                return;
807            }
808        }
809
810        *self = Self::SendStreamError {
811            deadline: Box::pin(tokio::time::sleep(LOCAL_SHUTDOWN_TIMEOUT)),
812            stream_error: Some(error),
813            io_error: None,
814        };
815    }
816
817    pub fn queue_sm_request(&mut self) -> bool {
818        match self {
819            Self::Ready {
820                sm_state: Some(sm_state),
821                ..
822            } => {
823                sm_state.pending_req = true;
824                true
825            }
826            _ => false,
827        }
828    }
829}