tokio_xmpp/stanzastream/connected.rs
1// Copyright (c) 2019 Emmanuel Gil Peyrot <linkmauve@linkmauve.fr>
2//
3// This Source Code Form is subject to the terms of the Mozilla Public
4// License, v. 2.0. If a copy of the MPL was not distributed with this
5// file, You can obtain one at http://mozilla.org/MPL/2.0/.
6
7use core::future::Future;
8use core::ops::ControlFlow::{Break, Continue};
9use core::pin::Pin;
10use core::task::{Context, Poll};
11use std::io;
12
13use futures::{ready, Sink, Stream};
14
15use xmpp_parsers::{
16 jid::Jid,
17 sm,
18 stream_error::{DefinedCondition, SentStreamError, StreamError},
19 stream_features::StreamFeatures,
20};
21
22use crate::xmlstream::{ReadError, XmppStreamElement};
23use crate::Stanza;
24
25use super::negotiation::{NegotiationResult, NegotiationState};
26use super::queue::{QueueEntry, StanzaState, TransmitQueue};
27use super::stream_management::*;
28use super::worker::{WorkerEvent, XmppStream, LOCAL_SHUTDOWN_TIMEOUT};
29
30#[derive(PartialEq)]
31pub(super) enum RxShutdownState {
32 AwaitingFooter,
33 AwaitingEof,
34 Done,
35}
36
37fn local_error_for_stream_error(
38 io_error: &mut Option<io::Error>,
39 stream_error: &mut Option<StreamError>,
40) -> io::Error {
41 io_error
42 .take()
43 .or_else(|| {
44 stream_error
45 .take()
46 .map(|x| io::Error::new(io::ErrorKind::InvalidData, SentStreamError(x)))
47 })
48 .unwrap_or_else(|| {
49 io::Error::new(
50 io::ErrorKind::InvalidData,
51 "unknown local stream error generated",
52 )
53 })
54}
55
56/// Substate of the [`BackendStream::Connected`] state.
57///
58/// Having the substate and its logic in a separate type allows us to
59/// circumvent problemns with moving data out of `&mut _` when transitioning
60/// between substates.
61pub(super) enum ConnectedState {
62 /// The stream is still being negotiated.
63 Negotiating {
64 /// Current state within the negotiations
65 substate: NegotiationState,
66 },
67
68 /// The stream is ready for transceiving.
69 Ready {
70 /// Stream management state, if any.
71 sm_state: Option<SmState>,
72 },
73
74 SendStreamError {
75 /// Stream error to send.
76 ///
77 /// `None` implies that we now only need to flush.
78 stream_error: Option<StreamError>,
79
80 /// I/O error to return to the caller once the flush is done.
81 ///
82 /// If `None`, an error will be synthesised.
83 io_error: Option<io::Error>,
84
85 /// Deadline until which the error must've been sent and the stream
86 /// must've been shut down.
87 deadline: Pin<Box<tokio::time::Sleep>>,
88 },
89
90 Failed {
91 error: Option<io::Error>,
92 sm_state: Option<SmState>,
93 },
94
95 /// A stream shutdown was initiated locally and we are flushing RX and TX
96 /// queues.
97 LocalShutdown {
98 /// Keep track on whether we have closed the TX side yet.
99 tx_closed: bool,
100
101 /// Keep track on how shut down the receiving side is.
102 rx_state: RxShutdownState,
103
104 /// Deadline until which graceful shutdown must complete; if the
105 /// deadline is exceeded (i.e. the contained Sleep future returns
106 /// ready), the streams will be dropped (and thus closed by the OS).
107 deadline: Pin<Box<tokio::time::Sleep>>,
108 },
109
110 /// The remote side closed the stream.
111 RemoteShutdown {
112 /// Keep the SM state for later resumption.
113 sm_state: Option<SmState>,
114 },
115
116 /// Local shutdown has completed; this is a final state, as local shutdown
117 /// signals an intent of stopping the stream forever.
118 LocalShutdownComplete,
119}
120
121/// Enumeration of events happening while the stream has finished the
122/// connection procedures and is established.
123pub(super) enum ConnectedEvent {
124 /// Event generated by the stream worker.
125 Worker(WorkerEvent),
126
127 /// The remote closed the stream orderly.
128 RemoteShutdown { sm_state: Option<SmState> },
129
130 /// We got disconnected through an error, either an I/O error
131 /// or some kind of stream error.
132 Disconnect {
133 /// Stream management state for later resumption attempts.
134 sm_state: Option<SmState>,
135
136 /// The error which caused the disconnect. This is generally not none,
137 /// but we cannot prove this at compile time because we have to take
138 /// the error from a mutable (i.e. non-owned) place.
139 error: Option<io::Error>,
140 },
141
142 /// A shutdown was requested by the local side of the stream.
143 LocalShutdownRequested,
144}
145
146impl ConnectedState {
147 fn to_stream_error_state(&mut self, stream_error: StreamError) {
148 *self = Self::SendStreamError {
149 stream_error: Some(stream_error),
150 io_error: None,
151 deadline: Box::pin(tokio::time::sleep(LOCAL_SHUTDOWN_TIMEOUT)),
152 };
153 }
154
155 fn to_failed_state(&mut self, error: io::Error, sm_state: Option<SmState>) {
156 *self = Self::Failed {
157 error: Some(error),
158 sm_state,
159 };
160 }
161
162 fn poll_write_sm_req(
163 mut sm_state: Option<&mut SmState>,
164 mut stream: Pin<&mut XmppStream>,
165 cx: &mut Context<'_>,
166 ) -> Poll<io::Result<()>> {
167 if let Some(sm_state) = sm_state.as_mut() {
168 // Request is pending.
169 if sm_state.pending_req {
170 match ready!(<XmppStream as Sink<&XmppStreamElement>>::poll_ready(
171 stream.as_mut(),
172 cx,
173 )) {
174 Ok(()) => (),
175 Err(e) => return Poll::Ready(Err(e)),
176 }
177 match stream
178 .as_mut()
179 .start_send(&XmppStreamElement::SM(sm::Nonza::Req(sm::R)))
180 {
181 Ok(()) => (),
182 Err(e) => {
183 // As the stream promised we would be able to
184 // send, this must be a problem with our
185 // (locally generated) nonza, i.e. this is
186 // fatal.
187 panic!("Failed to send SM Req nonza: {}", e);
188 }
189 }
190 sm_state.pending_req = false;
191 }
192 }
193 Poll::Ready(Ok(()))
194 }
195
196 fn poll_writes_inner(
197 mut sm_state: Option<&mut SmState>,
198 mut stream: Pin<&mut XmppStream>,
199 transmit_queue: &mut TransmitQueue<QueueEntry>,
200 cx: &mut Context<'_>,
201 ) -> Poll<io::Result<()>> {
202 let mut depleted = false;
203
204 // We prefer sending SM reqs before actual data.
205 // SM requests are used in response to soft timeouts as a way to
206 // trigger the remote side to send *something* to us. We may be
207 // sending a lot of data in bulk right now without ever expecting a
208 // response (or at least not anytime soon). In order to ensure that
209 // the server will in fact send a message to us soon, we have to send
210 // the SM request before anything else.
211 //
212 // Example scenario: Sending a bunch of MAM `<message/>`s over a slow,
213 // but low-latency link. The MAM response only triggers a response
214 // from the peer when everything has been transmitted, which may be
215 // longer than the stream timeout.
216 ready!(Self::poll_write_sm_req(
217 sm_state.as_mut().map(|x| x as &mut SmState),
218 stream.as_mut(),
219 cx
220 ))?;
221
222 let mut transmitted = false;
223 // We prefer sending actual data before stream-management ACKs.
224 // While the other side may be waiting for our ACK, we are not obliged
225 // to send it straight away (XEP-0198 explicitly allows us to delay it
226 // for some implementation-defined time if we have stuff to send. Our
227 // implementation-defined time is "infinity"), so we try to make
228 // progress on real data.
229 loop {
230 // If either the queue has nothing for us or the stream isn't
231 // ready to send, we break out of the loop. We don't use ready!
232 // here because we may have SM ACKs to send.
233 let next = match transmit_queue.poll_next(cx) {
234 Poll::Ready(Some(v)) => v,
235 Poll::Ready(None) => {
236 // The transmit_queue is empty, so we set `depleted` to
237 // true in order to ensure that we return Ready if all SM
238 // acks also have been transmitted.
239 depleted = true;
240 break;
241 }
242 Poll::Pending => break,
243 };
244 // If the stream isn't ready to send, none of the other things can
245 // be sent either, so we can use ready!.
246 match ready!(<XmppStream as Sink<&Stanza>>::poll_ready(
247 stream.as_mut(),
248 cx
249 )) {
250 Ok(()) => (),
251 Err(e) => return Poll::Ready(Err(e)),
252 }
253 // We now either send the item or "die trying". It must
254 // be removed from the queue, because even if it fails to
255 // serialise, we don't want to reattempt sending it (
256 // unless by SM resumption retransmission).
257 let next = next.take();
258 match stream.as_mut().start_send(&next.stanza) {
259 Ok(()) => {
260 next.token.send_replace(StanzaState::Sent {});
261 if let Some(sm_state) = sm_state.as_mut() {
262 sm_state.enqueue(next);
263 }
264 transmitted = true;
265 }
266 // Serialisation error, report back to the queue item.
267 Err(e) => {
268 next.token
269 .send_replace(StanzaState::Failed { error: e.into() });
270 }
271 }
272 }
273
274 if let Some(sm_state) = sm_state.as_mut() {
275 // We can set it to transmitted directly, because it has been
276 // cleared by the previous call to poll_write_sm_req.
277 sm_state.pending_req = transmitted;
278 ready!(Self::poll_write_sm_req(Some(sm_state), stream.as_mut(), cx))?;
279 }
280
281 // Now, if the stream will let us and we need to, we can tack
282 // on some SM ACKs.
283 if let Some(sm_state) = sm_state {
284 while sm_state.pending_acks > 0 {
285 match ready!(<XmppStream as Sink<&XmppStreamElement>>::poll_ready(
286 stream.as_mut(),
287 cx,
288 )) {
289 Ok(()) => (),
290 Err(e) => return Poll::Ready(Err(e)),
291 }
292 match stream
293 .as_mut()
294 .start_send(&XmppStreamElement::SM(sm::Nonza::Ack(sm::A {
295 h: sm_state.inbound_ctr(),
296 }))) {
297 Ok(()) => (),
298 Err(e) => {
299 // As the stream promised we would be able to
300 // send, this must be a problem with our
301 // (locally generated) nonza, i.e. this is
302 // fatal.
303 panic!("Failed to send SM Ack nonza: {}", e);
304 }
305 }
306 sm_state.pending_acks -= 1;
307 }
308 }
309
310 // If we haven't transmitted, we may also not have polled
311 // the stream for readiness or flushing. We need to do
312 // that here to ensure progress. Even if our tx queue is
313 // empty, the tx buffer may be nonempty
314 match ready!(<XmppStream as Sink<&Stanza>>::poll_flush(
315 stream.as_mut(),
316 cx
317 )) {
318 Ok(()) => (),
319 Err(e) => return Poll::Ready(Err(e)),
320 }
321
322 // If we end up here, all data we currently have has been
323 // transmitted via the stream and the stream's tx buffers have
324 // been properly flushed.
325 if depleted {
326 // And here, we know that the transmit queue is closed,
327 // too. We return with success.
328 Poll::Ready(Ok(()))
329 } else {
330 // The transmit queue is still open, so more data could
331 // pour in to be transmitted.
332 Poll::Pending
333 }
334 }
335
336 /// Drive the stream in transmit simplex mode.
337 ///
338 /// This will block forever (i.e. return [`Poll::Pending`] without
339 /// installing a waker) if the stream is currently being negotiated.
340 /// Otherwise, it will attempt to drain the `transmit_queue`. When the
341 /// queue is empty, `Ok(())` is returned. When write errors occur, this
342 /// will also block forever.
343 ///
344 /// If nothing is to be sent, but the stream could be used for sending,
345 /// this will drive the flush part of the inner stream.
346 ///
347 /// Any errors are reported on the next call to `poll`.
348 pub fn poll_writes(
349 &mut self,
350 stream: Pin<&mut XmppStream>,
351 transmit_queue: &mut TransmitQueue<QueueEntry>,
352 cx: &mut Context<'_>,
353 ) -> Poll<()> {
354 match self {
355 Self::Ready { sm_state, .. } => match ready!(Self::poll_writes_inner(
356 sm_state.as_mut(),
357 stream,
358 transmit_queue,
359 cx
360 )) {
361 Ok(()) => Poll::Ready(()),
362 Err(e) => {
363 *self = Self::Failed {
364 error: Some(e),
365 sm_state: sm_state.take(),
366 };
367 Poll::Pending
368 }
369 },
370
371 _ => Poll::Pending,
372 }
373 }
374
375 /// Drive the stream in full-duplex mode.
376 ///
377 /// Stanzas from the `transmit_queue` are transmitted once the stream is
378 /// ready.
379 ///
380 /// Returns:
381 /// - Poll::Pending if it blocks on the inner stream
382 /// - Poll::Ready(None) if it needs to be called again for a proper result
383 /// - Poll::Ready(Some(.)) when it has proper result
384 pub fn poll(
385 &mut self,
386 mut stream: Pin<&mut XmppStream>,
387 jid: &Jid,
388 features: &StreamFeatures,
389 transmit_queue: &mut TransmitQueue<QueueEntry>,
390 cx: &mut Context<'_>,
391 ) -> Poll<Option<ConnectedEvent>> {
392 match self {
393 Self::Negotiating { ref mut substate } => {
394 match ready!(substate.advance(stream, jid, transmit_queue, cx)) {
395 Break(NegotiationResult::Disconnect { sm_state, error }) => {
396 self.to_failed_state(error, sm_state);
397 Poll::Ready(None)
398 }
399 Break(NegotiationResult::StreamReset {
400 sm_state,
401 bound_jid,
402 }) => {
403 *self = Self::Ready { sm_state };
404 Poll::Ready(Some(ConnectedEvent::Worker(WorkerEvent::Reset {
405 bound_jid,
406 features: features.clone(),
407 })))
408 }
409 Break(NegotiationResult::StreamResumed { sm_state }) => {
410 *self = Self::Ready {
411 sm_state: Some(sm_state),
412 };
413 Poll::Ready(Some(ConnectedEvent::Worker(WorkerEvent::Resumed)))
414 }
415 Break(NegotiationResult::StreamError { error }) => {
416 self.to_stream_error_state(error);
417 Poll::Ready(None)
418 }
419 Continue(Some(stanza)) => {
420 Poll::Ready(Some(ConnectedEvent::Worker(WorkerEvent::Stanza(stanza))))
421 }
422 Continue(None) => Poll::Ready(None),
423 }
424 }
425
426 Self::SendStreamError {
427 ref mut stream_error,
428 ref mut io_error,
429 ref mut deadline,
430 } => {
431 match stream.as_mut().poll_next(cx) {
432 Poll::Pending
433 | Poll::Ready(None)
434 | Poll::Ready(Some(Err(ReadError::StreamFooterReceived)))
435 | Poll::Ready(Some(Err(ReadError::SoftTimeout))) => (),
436 Poll::Ready(Some(Ok(ev))) => {
437 log::trace!("Discarding incoming data while sending stream error: {ev:?}")
438 }
439 Poll::Ready(Some(Err(ReadError::ParseError(e)))) => {
440 log::trace!("Ignoring parse error while sending stream error: {e}")
441 }
442 Poll::Ready(Some(Err(ReadError::HardError(e)))) => {
443 log::warn!("I/O error while sending stream error: {e}")
444 }
445 }
446
447 match deadline.as_mut().poll(cx) {
448 Poll::Pending => (),
449 Poll::Ready(()) => {
450 log::debug!("Timeout while sending stream error. Discarding state.");
451 let error = local_error_for_stream_error(io_error, stream_error);
452 self.to_failed_state(error, None);
453 return Poll::Ready(None);
454 }
455 }
456
457 // Cannot use ready! here because we have to consider the
458 // case where the other side is refusing to accept data
459 // because its outgoing buffer is too full.
460 if stream_error.is_some() {
461 match ready!(<XmppStream as Sink<&StreamError>>::poll_ready(
462 stream.as_mut(),
463 cx
464 ))
465 .and_then(|()| {
466 // The take serves as transition to the next state.
467 let stream_error = stream_error.take().unwrap();
468 let result = stream.as_mut().start_send(&stream_error);
469 *io_error = Some(local_error_for_stream_error(
470 io_error,
471 &mut Some(stream_error),
472 ));
473 result
474 }) {
475 Ok(()) => (),
476 Err(e) => {
477 log::debug!("Got I/O error while sending stream error: {e}. Skipping error transmission.");
478 let error = local_error_for_stream_error(io_error, stream_error);
479 self.to_failed_state(error, None);
480 return Poll::Ready(None);
481 }
482 }
483 }
484
485 match ready!(<XmppStream as Sink<&StreamError>>::poll_flush(
486 stream.as_mut(),
487 cx
488 )) {
489 Ok(()) => (),
490 Err(e) => {
491 log::debug!(
492 "Got I/O error while flushing stream error: {e}. Skipping flush.",
493 );
494 }
495 }
496 log::trace!("Stream error send complete, transitioning to Failed state");
497 *self = Self::Failed {
498 error: Some(local_error_for_stream_error(io_error, stream_error)),
499 // Do *not* resume after we caused a stream error.
500 sm_state: None,
501 };
502
503 // Request the caller to call us again to get the
504 // actual error message.
505 Poll::Ready(None)
506 }
507
508 Self::Ready { ref mut sm_state } => {
509 match Self::poll_writes_inner(
510 sm_state.as_mut(),
511 stream.as_mut(),
512 transmit_queue,
513 cx,
514 ) {
515 Poll::Pending => (),
516 Poll::Ready(Ok(())) => {
517 *self = Self::LocalShutdown {
518 rx_state: RxShutdownState::AwaitingFooter,
519 tx_closed: false,
520 deadline: Box::pin(tokio::time::sleep(LOCAL_SHUTDOWN_TIMEOUT)),
521 };
522 return Poll::Ready(Some(ConnectedEvent::LocalShutdownRequested));
523 }
524 Poll::Ready(Err(e)) => {
525 *self = Self::Failed {
526 error: Some(e),
527 sm_state: sm_state.take(),
528 };
529 return Poll::Ready(None);
530 }
531 }
532
533 let item = ready!(stream.poll_next(cx));
534 // We switch to a TxOpen or non-connected state when we
535 // receive the stream footer, so reading `None` from the
536 // stream is always an unclean closure.
537 let item = item.unwrap_or_else(|| {
538 Err(ReadError::HardError(io::Error::new(
539 io::ErrorKind::UnexpectedEof,
540 "eof before stream footer",
541 )))
542 });
543 match item {
544 // Easy case, we got some data.
545 Ok(XmppStreamElement::Stanza(data)) => {
546 if let Some(sm_state) = sm_state.as_mut() {
547 sm_state.received();
548 }
549 Poll::Ready(Some(ConnectedEvent::Worker(WorkerEvent::Stanza(data))))
550 }
551
552 Ok(XmppStreamElement::SM(sm::Nonza::Ack(ack))) => {
553 if let Some(sm_state) = sm_state {
554 match sm_state.remote_acked(ack.h) {
555 Ok(()) => Poll::Ready(None),
556 Err(e) => {
557 log::error!(
558 "Failed to process <sm:a/> sent by the server: {e}",
559 );
560 self.to_stream_error_state(e.into());
561 Poll::Ready(None)
562 }
563 }
564 } else {
565 log::debug!("Hmm... I got an <sm:a/> from the peer, but I don't have a stream management state. I'm gonna ignore that...");
566 Poll::Ready(None)
567 }
568 }
569
570 Ok(XmppStreamElement::SM(sm::Nonza::Req(_))) => {
571 if let Some(sm_state) = sm_state {
572 match sm_state.pending_acks.checked_add(1) {
573 None => panic!("Too many pending ACKs, something is wrong."),
574 Some(v) => sm_state.pending_acks = v,
575 }
576 } else {
577 log::warn!("Got an <sm:r/> from the peer, but we don't have any stream management state. Terminating stream with an error.");
578 self.to_stream_error_state(StreamError::new(
579 DefinedCondition::UnsupportedStanzaType,
580 "en",
581 "received <sm:r/>, but stream management is not enabled".to_owned(),
582 ));
583 }
584 // No matter whether we "enqueued" an ACK for send or
585 // whether we just successfully read something from
586 // the stream, we have to request to be polled again
587 // right away.
588 Poll::Ready(None)
589 }
590
591 Ok(other) => {
592 log::warn!(
593 "Received unsupported stream element: {other:?}. Emitting stream error.",
594 );
595 // TODO: figure out a good way to provide the sender
596 // with more information.
597 self.to_stream_error_state(StreamError::new(
598 DefinedCondition::UnsupportedStanzaType,
599 "en",
600 format!("Unsupported stream element: {other:?}"),
601 ));
602 Poll::Ready(None)
603 }
604
605 // Another easy case: Soft timeouts are passed through
606 // to the caller for handling.
607 Err(ReadError::SoftTimeout) => {
608 Poll::Ready(Some(ConnectedEvent::Worker(WorkerEvent::SoftTimeout)))
609 }
610
611 // Parse errors are also just passed through (and will
612 // likely cause us to send a stream error).
613 Err(ReadError::ParseError(e)) => {
614 Poll::Ready(Some(ConnectedEvent::Worker(WorkerEvent::ParseError(e))))
615 }
616
617 // I/O errors cause the stream to be considerde
618 // broken; we drop it and send a Disconnect event with
619 // the error embedded.
620 Err(ReadError::HardError(e)) => {
621 let sm_state = sm_state.take();
622 Poll::Ready(Some(ConnectedEvent::Disconnect {
623 sm_state,
624 error: Some(e),
625 }))
626 }
627
628 // Stream footer indicates the remote wants to shut this
629 // stream down.
630 // We transition into RemoteShutdown state which makes us
631 // emit a special event until the caller takes care of it.
632 Err(ReadError::StreamFooterReceived) => {
633 *self = Self::RemoteShutdown {
634 sm_state: sm_state.take(),
635 };
636
637 // Let us be called again immediately to emit the
638 // notification.
639 Poll::Ready(None)
640 }
641 }
642 }
643
644 Self::Failed { sm_state, error } => Poll::Ready(Some(ConnectedEvent::Disconnect {
645 error: error.take(),
646 sm_state: sm_state.take(),
647 })),
648
649 Self::LocalShutdown { .. } | Self::LocalShutdownComplete => {
650 panic!("poll_next called in local shutdown");
651 }
652
653 Self::RemoteShutdown { ref mut sm_state } => {
654 Poll::Ready(Some(ConnectedEvent::RemoteShutdown {
655 sm_state: sm_state.take(),
656 }))
657 }
658 }
659 }
660
661 pub(super) fn poll_close(
662 &mut self,
663 mut stream: Pin<&mut XmppStream>,
664 cx: &mut Context<'_>,
665 ) -> Poll<Result<(), io::Error>> {
666 loop {
667 match self {
668 // User initiates shutdown by local choice.
669 // The actual shutdown is driven by the poll_read function and we
670 // only get woken up via the close_poller waker.
671 Self::Ready { .. } | Self::RemoteShutdown { .. } | Self::Negotiating { .. } => {
672 *self = Self::LocalShutdown {
673 rx_state: RxShutdownState::AwaitingFooter,
674 tx_closed: false,
675 deadline: Box::pin(tokio::time::sleep(LOCAL_SHUTDOWN_TIMEOUT)),
676 };
677 }
678
679 Self::Failed { error, .. } => match error.take() {
680 Some(error) => return Poll::Ready(Err(error)),
681 None => return Poll::Ready(Ok(())),
682 },
683
684 // If close is called while an attempt is made to send the
685 // stream error, we abort transmission.
686 Self::SendStreamError { .. } => {
687 log::debug!("close() called while stream error was being sent. Aborting transmission of stream error.");
688 *self = Self::LocalShutdown {
689 rx_state: RxShutdownState::AwaitingFooter,
690 tx_closed: false,
691 deadline: Box::pin(tokio::time::sleep(LOCAL_SHUTDOWN_TIMEOUT)),
692 };
693 }
694
695 // Wait for local shutdown (driven by poll_read) to complete.
696 Self::LocalShutdown {
697 ref mut deadline,
698 ref mut rx_state,
699 ref mut tx_closed,
700 } => {
701 match deadline.as_mut().poll(cx) {
702 Poll::Ready(()) => {
703 log::debug!("Dropping stream after shutdown timeout was exceeded.");
704 *self = Self::LocalShutdownComplete;
705 return Poll::Ready(Ok(()));
706 }
707 Poll::Pending => (),
708 }
709
710 if !*tx_closed {
711 // We cannot use ready! here, because we want to poll the
712 // receiving side in parallel.
713 match stream.as_mut().poll_shutdown(cx) {
714 Poll::Pending => (),
715 Poll::Ready(Ok(())) => {
716 *tx_closed = true;
717 }
718 Poll::Ready(Err(e)) => {
719 log::debug!(
720 "Ignoring write error during local stream shutdown: {e}"
721 );
722 *tx_closed = true;
723 }
724 }
725 }
726
727 match rx_state {
728 RxShutdownState::Done => {
729 if !*tx_closed {
730 // poll_close() returned Poll::Pending, so we have to
731 // return that, too.
732 return Poll::Pending;
733 }
734 }
735 // We can use ready! here because the `poll_close` has
736 // happened already; we don't want to poll anything else
737 // anymore.
738 _ => loop {
739 match ready!(stream.as_mut().poll_next(cx)) {
740 None => {
741 if *rx_state != RxShutdownState::AwaitingEof {
742 log::debug!("Ignoring early EOF during stream shutdown.");
743 }
744 *rx_state = RxShutdownState::Done;
745 break;
746 }
747 Some(Ok(data)) => {
748 log::debug!("Ignoring data received on stream during local shutdown: {data:?}");
749 }
750 Some(Err(ReadError::SoftTimeout)) => (),
751 Some(Err(ReadError::HardError(e))) => {
752 *rx_state = RxShutdownState::Done;
753 log::debug!("Ignoring read error during local shutdown: {e}");
754 break;
755 }
756 Some(Err(ReadError::ParseError(e))) => {
757 log::debug!(
758 "Ignoring parse error during local shutdown: {}",
759 e
760 );
761 }
762 Some(Err(ReadError::StreamFooterReceived)) => match rx_state {
763 RxShutdownState::AwaitingFooter => {
764 *rx_state = RxShutdownState::AwaitingEof;
765 }
766 RxShutdownState::AwaitingEof => {
767 unreachable!("multiple stream footers?!")
768 }
769 RxShutdownState::Done => unreachable!(),
770 },
771 }
772 },
773 }
774
775 if *tx_closed && *rx_state == RxShutdownState::Done {
776 // Now that everything is properly cleaned up on the
777 // xmlstream layer, we go through with closure.
778 ready!(<XmppStream as Sink<&Stanza>>::poll_close(
779 stream.as_mut(),
780 cx
781 ))?;
782 // And now that's done, we can finally call it a day.
783 *self = Self::LocalShutdownComplete;
784 return Poll::Ready(Ok(()));
785 } else {
786 return Poll::Pending;
787 }
788 }
789
790 Self::LocalShutdownComplete => return Poll::Ready(Ok(())),
791 }
792 }
793 }
794
795 pub(super) fn start_send_stream_error(&mut self, error: StreamError) {
796 match self {
797 Self::LocalShutdownComplete
798 | Self::LocalShutdown { .. }
799 | Self::RemoteShutdown { .. }
800 | Self::Failed { .. } => {
801 log::debug!("Request to send stream error ({error}), but we are already shutting down or have already failed. Discarding.");
802 return;
803 }
804
805 Self::Ready { .. } | Self::Negotiating { .. } => {}
806
807 Self::SendStreamError { .. } => {
808 log::debug!("Request to send stream error ({error}) while transmission of another stream error is already in progress. Discarding the new one.");
809 return;
810 }
811 }
812
813 *self = Self::SendStreamError {
814 deadline: Box::pin(tokio::time::sleep(LOCAL_SHUTDOWN_TIMEOUT)),
815 stream_error: Some(error),
816 io_error: None,
817 };
818 }
819
820 pub fn queue_sm_request(&mut self) -> bool {
821 match self {
822 Self::Ready {
823 sm_state: Some(sm_state),
824 ..
825 } => {
826 sm_state.pending_req = true;
827 true
828 }
829 _ => false,
830 }
831 }
832}