tokio_xmpp/stanzastream/
connected.rs

1// Copyright (c) 2019 Emmanuel Gil Peyrot <linkmauve@linkmauve.fr>
2//
3// This Source Code Form is subject to the terms of the Mozilla Public
4// License, v. 2.0. If a copy of the MPL was not distributed with this
5// file, You can obtain one at http://mozilla.org/MPL/2.0/.
6
7use core::future::Future;
8use core::ops::ControlFlow::{Break, Continue};
9use core::pin::Pin;
10use core::task::{Context, Poll};
11use std::io;
12
13use futures::{ready, Sink, Stream};
14
15use xmpp_parsers::{
16    jid::Jid,
17    sm,
18    stream_error::{DefinedCondition, SentStreamError, StreamError},
19    stream_features::StreamFeatures,
20};
21
22use crate::xmlstream::{ReadError, XmppStreamElement};
23use crate::Stanza;
24
25use super::negotiation::{NegotiationResult, NegotiationState};
26use super::queue::{QueueEntry, StanzaState, TransmitQueue};
27use super::stream_management::*;
28use super::worker::{WorkerEvent, XmppStream, LOCAL_SHUTDOWN_TIMEOUT};
29
30#[derive(PartialEq)]
31pub(super) enum RxShutdownState {
32    AwaitingFooter,
33    AwaitingEof,
34    Done,
35}
36
37fn local_error_for_stream_error(
38    io_error: &mut Option<io::Error>,
39    stream_error: &mut Option<StreamError>,
40) -> io::Error {
41    io_error
42        .take()
43        .or_else(|| {
44            stream_error
45                .take()
46                .map(|x| io::Error::new(io::ErrorKind::InvalidData, SentStreamError(x)))
47        })
48        .unwrap_or_else(|| {
49            io::Error::new(
50                io::ErrorKind::InvalidData,
51                "unknown local stream error generated",
52            )
53        })
54}
55
56/// Substate of the [`BackendStream::Connected`] state.
57///
58/// Having the substate and its logic in a separate type allows us to
59/// circumvent problemns with moving data out of `&mut _` when transitioning
60/// between substates.
61pub(super) enum ConnectedState {
62    /// The stream is still being negotiated.
63    Negotiating {
64        /// Current state within the negotiations
65        substate: NegotiationState,
66    },
67
68    /// The stream is ready for transceiving.
69    Ready {
70        /// Stream management state, if any.
71        sm_state: Option<SmState>,
72    },
73
74    SendStreamError {
75        /// Stream error to send.
76        ///
77        /// `None` implies that we now only need to flush.
78        stream_error: Option<StreamError>,
79
80        /// I/O error to return to the caller once the flush is done.
81        ///
82        /// If `None`, an error will be synthesised.
83        io_error: Option<io::Error>,
84
85        /// Deadline until which the error must've been sent and the stream
86        /// must've been shut down.
87        deadline: Pin<Box<tokio::time::Sleep>>,
88    },
89
90    Failed {
91        error: Option<io::Error>,
92        sm_state: Option<SmState>,
93    },
94
95    /// A stream shutdown was initiated locally and we are flushing RX and TX
96    /// queues.
97    LocalShutdown {
98        /// Keep track on whether we have closed the TX side yet.
99        tx_closed: bool,
100
101        /// Keep track on how shut down the receiving side is.
102        rx_state: RxShutdownState,
103
104        /// Deadline until which graceful shutdown must complete; if the
105        /// deadline is exceeded (i.e. the contained Sleep future returns
106        /// ready), the streams will be dropped (and thus closed by the OS).
107        deadline: Pin<Box<tokio::time::Sleep>>,
108    },
109
110    /// The remote side closed the stream.
111    RemoteShutdown {
112        /// Keep the SM state for later resumption.
113        sm_state: Option<SmState>,
114    },
115
116    /// Local shutdown has completed; this is a final state, as local shutdown
117    /// signals an intent of stopping the stream forever.
118    LocalShutdownComplete,
119}
120
121/// Enumeration of events happening while the stream has finished the
122/// connection procedures and is established.
123pub(super) enum ConnectedEvent {
124    /// Event generated by the stream worker.
125    Worker(WorkerEvent),
126
127    /// The remote closed the stream orderly.
128    RemoteShutdown { sm_state: Option<SmState> },
129
130    /// We got disconnected through an error, either an I/O error
131    /// or some kind of stream error.
132    Disconnect {
133        /// Stream management state for later resumption attempts.
134        sm_state: Option<SmState>,
135
136        /// The error which caused the disconnect. This is generally not none,
137        /// but we cannot prove this at compile time because we have to take
138        /// the error from a mutable (i.e. non-owned) place.
139        error: Option<io::Error>,
140    },
141
142    /// A shutdown was requested by the local side of the stream.
143    LocalShutdownRequested,
144}
145
146impl ConnectedState {
147    fn to_stream_error_state(&mut self, stream_error: StreamError) {
148        *self = Self::SendStreamError {
149            stream_error: Some(stream_error),
150            io_error: None,
151            deadline: Box::pin(tokio::time::sleep(LOCAL_SHUTDOWN_TIMEOUT)),
152        };
153    }
154
155    fn to_failed_state(&mut self, error: io::Error, sm_state: Option<SmState>) {
156        *self = Self::Failed {
157            error: Some(error),
158            sm_state,
159        };
160    }
161
162    fn poll_write_sm_req(
163        mut sm_state: Option<&mut SmState>,
164        mut stream: Pin<&mut XmppStream>,
165        cx: &mut Context<'_>,
166    ) -> Poll<io::Result<()>> {
167        if let Some(sm_state) = sm_state.as_mut() {
168            // Request is pending.
169            if sm_state.pending_req {
170                match ready!(<XmppStream as Sink<&XmppStreamElement>>::poll_ready(
171                    stream.as_mut(),
172                    cx,
173                )) {
174                    Ok(()) => (),
175                    Err(e) => return Poll::Ready(Err(e)),
176                }
177                match stream
178                    .as_mut()
179                    .start_send(&XmppStreamElement::SM(sm::Nonza::Req(sm::R)))
180                {
181                    Ok(()) => (),
182                    Err(e) => {
183                        // As the stream promised we would be able to
184                        // send, this must be a problem with our
185                        // (locally generated) nonza, i.e. this is
186                        // fatal.
187                        panic!("Failed to send SM Req nonza: {}", e);
188                    }
189                }
190                sm_state.pending_req = false;
191            }
192        }
193        Poll::Ready(Ok(()))
194    }
195
196    fn poll_writes_inner(
197        mut sm_state: Option<&mut SmState>,
198        mut stream: Pin<&mut XmppStream>,
199        transmit_queue: &mut TransmitQueue<QueueEntry>,
200        cx: &mut Context<'_>,
201    ) -> Poll<io::Result<()>> {
202        let mut depleted = false;
203
204        // We prefer sending SM reqs before actual data.
205        // SM requests are used in response to soft timeouts as a way to
206        // trigger the remote side to send *something* to us. We may be
207        // sending a lot of data in bulk right now without ever expecting a
208        // response (or at least not anytime soon). In order to ensure that
209        // the server will in fact send a message to us soon, we have to send
210        // the SM request before anything else.
211        //
212        // Example scenario: Sending a bunch of MAM `<message/>`s over a slow,
213        // but low-latency link. The MAM response only triggers a response
214        // from the peer when everything has been transmitted, which may be
215        // longer than the stream timeout.
216        ready!(Self::poll_write_sm_req(
217            match sm_state {
218                None => None,
219                Some(ref mut v) => Some(v),
220            },
221            stream.as_mut(),
222            cx
223        ))?;
224
225        let mut transmitted = false;
226        // We prefer sending actual data before stream-management ACKs.
227        // While the other side may be waiting for our ACK, we are not obliged
228        // to send it straight away (XEP-0198 explicitly allows us to delay it
229        // for some implementation-defined time if we have stuff to send. Our
230        // implementation-defined time is "infinity"), so we try to make
231        // progress on real data.
232        loop {
233            // If either the queue has nothing for us or the stream isn't
234            // ready to send, we break out of the loop. We don't use ready!
235            // here because we may have SM ACKs to send.
236            let next = match transmit_queue.poll_next(cx) {
237                Poll::Ready(Some(v)) => v,
238                Poll::Ready(None) => {
239                    // The transmit_queue is empty, so we set `depleted` to
240                    // true in order to ensure that we return Ready if all SM
241                    // acks also have been transmitted.
242                    depleted = true;
243                    break;
244                }
245                Poll::Pending => break,
246            };
247            // If the stream isn't ready to send, none of the other things can
248            // be sent either, so we can use ready!.
249            match ready!(<XmppStream as Sink<&Stanza>>::poll_ready(
250                stream.as_mut(),
251                cx
252            )) {
253                Ok(()) => (),
254                Err(e) => return Poll::Ready(Err(e)),
255            }
256            // We now either send the item or "die trying". It must
257            // be removed from the queue, because even if it fails to
258            // serialise, we don't want to reattempt sending it (
259            // unless by SM resumption retransmission).
260            let next = next.take();
261            match stream.as_mut().start_send(&next.stanza) {
262                Ok(()) => {
263                    next.token.send_replace(StanzaState::Sent {});
264                    if let Some(sm_state) = sm_state.as_mut() {
265                        sm_state.enqueue(next);
266                    }
267                    transmitted = true;
268                }
269                // Serialisation error, report back to the queue item.
270                Err(e) => {
271                    next.token
272                        .send_replace(StanzaState::Failed { error: e.into() });
273                }
274            }
275        }
276
277        if let Some(sm_state) = sm_state.as_mut() {
278            // We can set it to transmitted directly, because it has been
279            // cleared by the previous call to poll_write_sm_req.
280            sm_state.pending_req = transmitted;
281            ready!(Self::poll_write_sm_req(Some(sm_state), stream.as_mut(), cx))?;
282        }
283
284        // Now, if the stream will let us and we need to, we can tack
285        // on some SM ACKs.
286        if let Some(sm_state) = sm_state {
287            while sm_state.pending_acks > 0 {
288                match ready!(<XmppStream as Sink<&XmppStreamElement>>::poll_ready(
289                    stream.as_mut(),
290                    cx,
291                )) {
292                    Ok(()) => (),
293                    Err(e) => return Poll::Ready(Err(e)),
294                }
295                match stream
296                    .as_mut()
297                    .start_send(&XmppStreamElement::SM(sm::Nonza::Ack(sm::A {
298                        h: sm_state.inbound_ctr(),
299                    }))) {
300                    Ok(()) => (),
301                    Err(e) => {
302                        // As the stream promised we would be able to
303                        // send, this must be a problem with our
304                        // (locally generated) nonza, i.e. this is
305                        // fatal.
306                        panic!("Failed to send SM Ack nonza: {}", e);
307                    }
308                }
309                sm_state.pending_acks -= 1;
310            }
311        }
312
313        // If we haven't transmitted, we may also not have polled
314        // the stream for readiness or flushing. We need to do
315        // that here to ensure progress. Even if our tx queue is
316        // empty, the tx buffer may be nonempty
317        match ready!(<XmppStream as Sink<&Stanza>>::poll_flush(
318            stream.as_mut(),
319            cx
320        )) {
321            Ok(()) => (),
322            Err(e) => return Poll::Ready(Err(e)),
323        }
324
325        // If we end up here, all data we currently have has been
326        // transmitted via the stream and the stream's tx buffers have
327        // been properly flushed.
328        if depleted {
329            // And here, we know that the transmit queue is closed,
330            // too. We return with success.
331            Poll::Ready(Ok(()))
332        } else {
333            // The transmit queue is still open, so more data could
334            // pour in to be transmitted.
335            Poll::Pending
336        }
337    }
338
339    /// Drive the stream in transmit simplex mode.
340    ///
341    /// This will block forever (i.e. return [`Poll::Pending`] without
342    /// installing a waker) if the stream is currently being negotiated.
343    /// Otherwise, it will attempt to drain the `transmit_queue`. When the
344    /// queue is empty, `Ok(())` is returned. When write errors occur, this
345    /// will also block forever.
346    ///
347    /// If nothing is to be sent, but the stream could be used for sending,
348    /// this will drive the flush part of the inner stream.
349    ///
350    /// Any errors are reported on the next call to `poll`.
351    pub fn poll_writes(
352        &mut self,
353        stream: Pin<&mut XmppStream>,
354        transmit_queue: &mut TransmitQueue<QueueEntry>,
355        cx: &mut Context<'_>,
356    ) -> Poll<()> {
357        match self {
358            Self::Ready { sm_state, .. } => match ready!(Self::poll_writes_inner(
359                sm_state.as_mut(),
360                stream,
361                transmit_queue,
362                cx
363            )) {
364                Ok(()) => Poll::Ready(()),
365                Err(e) => {
366                    *self = Self::Failed {
367                        error: Some(e),
368                        sm_state: sm_state.take(),
369                    };
370                    Poll::Pending
371                }
372            },
373
374            _ => Poll::Pending,
375        }
376    }
377
378    /// Drive the stream in full-duplex mode.
379    ///
380    /// Stanzas from the `transmit_queue` are transmitted once the stream is
381    /// ready.
382    ///
383    /// Returns:
384    /// - Poll::Pending if it blocks on the inner stream
385    /// - Poll::Ready(None) if it needs to be called again for a proper result
386    /// - Poll::Ready(Some(.)) when it has proper result
387    pub fn poll(
388        &mut self,
389        mut stream: Pin<&mut XmppStream>,
390        jid: &Jid,
391        features: &StreamFeatures,
392        transmit_queue: &mut TransmitQueue<QueueEntry>,
393        cx: &mut Context<'_>,
394    ) -> Poll<Option<ConnectedEvent>> {
395        match self {
396            Self::Negotiating { ref mut substate } => {
397                match ready!(substate.advance(stream, jid, transmit_queue, cx)) {
398                    Break(NegotiationResult::Disconnect { sm_state, error }) => {
399                        self.to_failed_state(error, sm_state);
400                        Poll::Ready(None)
401                    }
402                    Break(NegotiationResult::StreamReset {
403                        sm_state,
404                        bound_jid,
405                    }) => {
406                        *self = Self::Ready { sm_state };
407                        Poll::Ready(Some(ConnectedEvent::Worker(WorkerEvent::Reset {
408                            bound_jid,
409                            features: features.clone(),
410                        })))
411                    }
412                    Break(NegotiationResult::StreamResumed { sm_state }) => {
413                        *self = Self::Ready {
414                            sm_state: Some(sm_state),
415                        };
416                        Poll::Ready(Some(ConnectedEvent::Worker(WorkerEvent::Resumed)))
417                    }
418                    Break(NegotiationResult::StreamError { error }) => {
419                        self.to_stream_error_state(error);
420                        Poll::Ready(None)
421                    }
422                    Continue(Some(stanza)) => {
423                        Poll::Ready(Some(ConnectedEvent::Worker(WorkerEvent::Stanza(stanza))))
424                    }
425                    Continue(None) => Poll::Ready(None),
426                }
427            }
428
429            Self::SendStreamError {
430                ref mut stream_error,
431                ref mut io_error,
432                ref mut deadline,
433            } => {
434                match stream.as_mut().poll_next(cx) {
435                    Poll::Pending
436                    | Poll::Ready(None)
437                    | Poll::Ready(Some(Err(ReadError::StreamFooterReceived)))
438                    | Poll::Ready(Some(Err(ReadError::SoftTimeout))) => (),
439                    Poll::Ready(Some(Ok(ev))) => {
440                        log::trace!("Discarding incoming data while sending stream error: {ev:?}")
441                    }
442                    Poll::Ready(Some(Err(ReadError::ParseError(e)))) => {
443                        log::trace!("Ignoring parse error while sending stream error: {e}")
444                    }
445                    Poll::Ready(Some(Err(ReadError::HardError(e)))) => {
446                        log::warn!("I/O error while sending stream error: {e}")
447                    }
448                }
449
450                match deadline.as_mut().poll(cx) {
451                    Poll::Pending => (),
452                    Poll::Ready(()) => {
453                        log::debug!("Timeout while sending stream error. Discarding state.");
454                        let error = local_error_for_stream_error(io_error, stream_error);
455                        self.to_failed_state(error, None);
456                        return Poll::Ready(None);
457                    }
458                }
459
460                // Cannot use ready! here because we have to consider the
461                // case where the other side is refusing to accept data
462                // because its outgoing buffer is too full.
463                if stream_error.is_some() {
464                    match ready!(<XmppStream as Sink<&StreamError>>::poll_ready(
465                        stream.as_mut(),
466                        cx
467                    ))
468                    .and_then(|()| {
469                        // The take serves as transition to the next state.
470                        let stream_error = stream_error.take().unwrap();
471                        let result = stream.as_mut().start_send(&stream_error);
472                        *io_error = Some(local_error_for_stream_error(
473                            io_error,
474                            &mut Some(stream_error),
475                        ));
476                        result
477                    }) {
478                        Ok(()) => (),
479                        Err(e) => {
480                            log::debug!("Got I/O error while sending stream error: {e}. Skipping error transmission.");
481                            let error = local_error_for_stream_error(io_error, stream_error);
482                            self.to_failed_state(error, None);
483                            return Poll::Ready(None);
484                        }
485                    }
486                }
487
488                match ready!(<XmppStream as Sink<&StreamError>>::poll_flush(
489                    stream.as_mut(),
490                    cx
491                )) {
492                    Ok(()) => (),
493                    Err(e) => {
494                        log::debug!(
495                            "Got I/O error while flushing stream error: {e}. Skipping flush.",
496                        );
497                    }
498                }
499                log::trace!("Stream error send complete, transitioning to Failed state");
500                *self = Self::Failed {
501                    error: Some(local_error_for_stream_error(io_error, stream_error)),
502                    // Do *not* resume after we caused a stream error.
503                    sm_state: None,
504                };
505
506                // Request the caller to call us again to get the
507                // actual error message.
508                Poll::Ready(None)
509            }
510
511            Self::Ready { ref mut sm_state } => {
512                match Self::poll_writes_inner(
513                    sm_state.as_mut(),
514                    stream.as_mut(),
515                    transmit_queue,
516                    cx,
517                ) {
518                    Poll::Pending => (),
519                    Poll::Ready(Ok(())) => {
520                        *self = Self::LocalShutdown {
521                            rx_state: RxShutdownState::AwaitingFooter,
522                            tx_closed: false,
523                            deadline: Box::pin(tokio::time::sleep(LOCAL_SHUTDOWN_TIMEOUT)),
524                        };
525                        return Poll::Ready(Some(ConnectedEvent::LocalShutdownRequested));
526                    }
527                    Poll::Ready(Err(e)) => {
528                        *self = Self::Failed {
529                            error: Some(e),
530                            sm_state: sm_state.take(),
531                        };
532                        return Poll::Ready(None);
533                    }
534                }
535
536                let item = ready!(stream.poll_next(cx));
537                // We switch to a TxOpen or non-connected state when we
538                // receive the stream footer, so reading `None` from the
539                // stream is always an unclean closure.
540                let item = item.unwrap_or_else(|| {
541                    Err(ReadError::HardError(io::Error::new(
542                        io::ErrorKind::UnexpectedEof,
543                        "eof before stream footer",
544                    )))
545                });
546                match item {
547                    // Easy case, we got some data.
548                    Ok(XmppStreamElement::Stanza(data)) => {
549                        Poll::Ready(Some(ConnectedEvent::Worker(WorkerEvent::Stanza(data))))
550                    }
551
552                    Ok(XmppStreamElement::SM(sm::Nonza::Ack(ack))) => {
553                        if let Some(sm_state) = sm_state {
554                            match sm_state.remote_acked(ack.h) {
555                                Ok(()) => Poll::Ready(None),
556                                Err(e) => {
557                                    log::error!(
558                                        "Failed to process <sm:a/> sent by the server: {e}",
559                                    );
560                                    self.to_stream_error_state(e.into());
561                                    return Poll::Ready(None);
562                                }
563                            }
564                        } else {
565                            log::debug!("Hmm... I got an <sm:a/> from the peer, but I don't have a stream management state. I'm gonna ignore that...");
566                            Poll::Ready(None)
567                        }
568                    }
569
570                    Ok(XmppStreamElement::SM(sm::Nonza::Req(_))) => {
571                        if let Some(sm_state) = sm_state {
572                            match sm_state.pending_acks.checked_add(1) {
573                                None => panic!("Too many pending ACKs, something is wrong."),
574                                Some(v) => sm_state.pending_acks = v,
575                            }
576                        } else {
577                            log::warn!("Got an <sm:r/> from the peer, but we don't have any stream management state. Terminating stream with an error.");
578                            self.to_stream_error_state(StreamError {
579                                condition: DefinedCondition::UnsupportedStanzaType,
580                                text: Some((
581                                    None,
582                                    "received <sm:r/>, but stream management is not enabled"
583                                        .to_owned(),
584                                )),
585                                application_specific: vec![],
586                            });
587                        }
588                        // No matter whether we "enqueued" an ACK for send or
589                        // whether we just successfully read something from
590                        // the stream, we have to request to be polled again
591                        // right away.
592                        Poll::Ready(None)
593                    }
594
595                    Ok(other) => {
596                        log::warn!(
597                            "Received unsupported stream element: {other:?}. Emitting stream error.",
598                        );
599                        self.to_stream_error_state(StreamError {
600                            condition: DefinedCondition::UnsupportedStanzaType,
601                            // TODO: figure out a good way to provide the
602                            // sender with more information.
603                            text: None,
604                            application_specific: vec![],
605                        });
606                        Poll::Ready(None)
607                    }
608
609                    // Another easy case: Soft timeouts are passed through
610                    // to the caller for handling.
611                    Err(ReadError::SoftTimeout) => {
612                        Poll::Ready(Some(ConnectedEvent::Worker(WorkerEvent::SoftTimeout)))
613                    }
614
615                    // Parse errors are also just passed through (and will
616                    // likely cause us to send a stream error).
617                    Err(ReadError::ParseError(e)) => {
618                        Poll::Ready(Some(ConnectedEvent::Worker(WorkerEvent::ParseError(e))))
619                    }
620
621                    // I/O errors cause the stream to be considerde
622                    // broken; we drop it and send a Disconnect event with
623                    // the error embedded.
624                    Err(ReadError::HardError(e)) => {
625                        let sm_state = sm_state.take();
626                        Poll::Ready(Some(ConnectedEvent::Disconnect {
627                            sm_state,
628                            error: Some(e),
629                        }))
630                    }
631
632                    // Stream footer indicates the remote wants to shut this
633                    // stream down.
634                    // We transition into RemoteShutdown state which makes us
635                    // emit a special event until the caller takes care of it.
636                    Err(ReadError::StreamFooterReceived) => {
637                        *self = Self::RemoteShutdown {
638                            sm_state: sm_state.take(),
639                        };
640
641                        // Let us be called again immediately to emit the
642                        // notification.
643                        Poll::Ready(None)
644                    }
645                }
646            }
647
648            Self::Failed { sm_state, error } => Poll::Ready(Some(ConnectedEvent::Disconnect {
649                error: error.take(),
650                sm_state: sm_state.take(),
651            })),
652
653            Self::LocalShutdown { .. } | Self::LocalShutdownComplete => {
654                panic!("poll_next called in local shutdown");
655            }
656
657            Self::RemoteShutdown { ref mut sm_state } => {
658                Poll::Ready(Some(ConnectedEvent::RemoteShutdown {
659                    sm_state: sm_state.take(),
660                }))
661            }
662        }
663    }
664
665    pub(super) fn poll_close(
666        &mut self,
667        mut stream: Pin<&mut XmppStream>,
668        cx: &mut Context<'_>,
669    ) -> Poll<Result<(), io::Error>> {
670        loop {
671            match self {
672                // User initiates shutdown by local choice.
673                // The actual shutdown is driven by the poll_read function and we
674                // only get woken up via the close_poller waker.
675                Self::Ready { .. } | Self::RemoteShutdown { .. } | Self::Negotiating { .. } => {
676                    *self = Self::LocalShutdown {
677                        rx_state: RxShutdownState::AwaitingFooter,
678                        tx_closed: false,
679                        deadline: Box::pin(tokio::time::sleep(LOCAL_SHUTDOWN_TIMEOUT)),
680                    };
681                }
682
683                Self::Failed { error, .. } => match error.take() {
684                    Some(error) => return Poll::Ready(Err(error)),
685                    None => return Poll::Ready(Ok(())),
686                },
687
688                // If close is called while an attempt is made to send the
689                // stream error, we abort transmission.
690                Self::SendStreamError { .. } => {
691                    log::debug!("close() called while stream error was being sent. Aborting transmission of stream error.");
692                    *self = Self::LocalShutdown {
693                        rx_state: RxShutdownState::AwaitingFooter,
694                        tx_closed: false,
695                        deadline: Box::pin(tokio::time::sleep(LOCAL_SHUTDOWN_TIMEOUT)),
696                    };
697                }
698
699                // Wait for local shutdown (driven by poll_read) to complete.
700                Self::LocalShutdown {
701                    ref mut deadline,
702                    ref mut rx_state,
703                    ref mut tx_closed,
704                } => {
705                    match deadline.as_mut().poll(cx) {
706                        Poll::Ready(()) => {
707                            log::debug!("Dropping stream after shutdown timeout was exceeded.");
708                            *self = Self::LocalShutdownComplete;
709                            return Poll::Ready(Ok(()));
710                        }
711                        Poll::Pending => (),
712                    }
713
714                    if !*tx_closed {
715                        // We cannot use ready! here, because we want to poll the
716                        // receiving side in parallel.
717                        match stream.as_mut().poll_shutdown(cx) {
718                            Poll::Pending => (),
719                            Poll::Ready(Ok(())) => {
720                                *tx_closed = true;
721                            }
722                            Poll::Ready(Err(e)) => {
723                                log::debug!(
724                                    "Ignoring write error during local stream shutdown: {e}"
725                                );
726                                *tx_closed = true;
727                            }
728                        }
729                    }
730
731                    match rx_state {
732                        RxShutdownState::Done => {
733                            if !*tx_closed {
734                                // poll_close() returned Poll::Pending, so we have to
735                                // return that, too.
736                                return Poll::Pending;
737                            }
738                        }
739                        // We can use ready! here because the `poll_close` has
740                        // happened already; we don't want to poll anything else
741                        // anymore.
742                        _ => loop {
743                            match ready!(stream.as_mut().poll_next(cx)) {
744                                None => {
745                                    if *rx_state != RxShutdownState::AwaitingEof {
746                                        log::debug!("Ignoring early EOF during stream shutdown.");
747                                    }
748                                    *rx_state = RxShutdownState::Done;
749                                    break;
750                                }
751                                Some(Ok(data)) => {
752                                    log::debug!("Ignoring data received on stream during local shutdown: {data:?}");
753                                }
754                                Some(Err(ReadError::SoftTimeout)) => (),
755                                Some(Err(ReadError::HardError(e))) => {
756                                    *rx_state = RxShutdownState::Done;
757                                    log::debug!("Ignoring read error during local shutdown: {e}");
758                                    break;
759                                }
760                                Some(Err(ReadError::ParseError(e))) => {
761                                    log::debug!(
762                                        "Ignoring parse error during local shutdown: {}",
763                                        e
764                                    );
765                                }
766                                Some(Err(ReadError::StreamFooterReceived)) => match rx_state {
767                                    RxShutdownState::AwaitingFooter => {
768                                        *rx_state = RxShutdownState::AwaitingEof;
769                                    }
770                                    RxShutdownState::AwaitingEof => {
771                                        unreachable!("multiple stream footers?!")
772                                    }
773                                    RxShutdownState::Done => unreachable!(),
774                                },
775                            }
776                        },
777                    }
778
779                    if *tx_closed && *rx_state == RxShutdownState::Done {
780                        // Now that everything is properly cleaned up on the
781                        // xmlstream layer, we go through with closure.
782                        ready!(<XmppStream as Sink<&Stanza>>::poll_close(
783                            stream.as_mut(),
784                            cx
785                        ))?;
786                        // And now that's done, we can finally call it a day.
787                        *self = Self::LocalShutdownComplete;
788                        return Poll::Ready(Ok(()));
789                    } else {
790                        return Poll::Pending;
791                    }
792                }
793
794                Self::LocalShutdownComplete => return Poll::Ready(Ok(())),
795            }
796        }
797    }
798
799    pub(super) fn start_send_stream_error(&mut self, error: StreamError) {
800        match self {
801            Self::LocalShutdownComplete
802            | Self::LocalShutdown { .. }
803            | Self::RemoteShutdown { .. }
804            | Self::Failed { .. } => {
805                log::debug!("Request to send stream error ({error}), but we are already shutting down or have already failed. Discarding.");
806                return;
807            }
808
809            Self::Ready { .. } | Self::Negotiating { .. } => {}
810
811            Self::SendStreamError { .. } => {
812                log::debug!("Request to send stream error ({error}) while transmission of another stream error is already in progress. Discarding the new one.");
813                return;
814            }
815        }
816
817        *self = Self::SendStreamError {
818            deadline: Box::pin(tokio::time::sleep(LOCAL_SHUTDOWN_TIMEOUT)),
819            stream_error: Some(error),
820            io_error: None,
821        };
822    }
823
824    pub fn queue_sm_request(&mut self) -> bool {
825        match self {
826            Self::Ready { sm_state, .. } => {
827                if let Some(sm_state) = sm_state {
828                    sm_state.pending_req = true;
829                    true
830                } else {
831                    false
832                }
833            }
834            _ => false,
835        }
836    }
837}