tokio_xmpp/stanzastream/connected.rs
1// Copyright (c) 2019 Emmanuel Gil Peyrot <linkmauve@linkmauve.fr>
2//
3// This Source Code Form is subject to the terms of the Mozilla Public
4// License, v. 2.0. If a copy of the MPL was not distributed with this
5// file, You can obtain one at http://mozilla.org/MPL/2.0/.
6
7use core::future::Future;
8use core::ops::ControlFlow::{Break, Continue};
9use core::pin::Pin;
10use core::task::{Context, Poll};
11use std::io;
12
13use futures::{ready, Sink, Stream};
14
15use xmpp_parsers::{
16 jid::Jid,
17 sm,
18 stream_error::{DefinedCondition, SentStreamError, StreamError},
19 stream_features::StreamFeatures,
20};
21
22use crate::xmlstream::{ReadError, XmppStreamElement};
23use crate::Stanza;
24
25use super::negotiation::{NegotiationResult, NegotiationState};
26use super::queue::{QueueEntry, StanzaState, TransmitQueue};
27use super::stream_management::*;
28use super::worker::{WorkerEvent, XmppStream, LOCAL_SHUTDOWN_TIMEOUT};
29
30#[derive(PartialEq)]
31pub(super) enum RxShutdownState {
32 AwaitingFooter,
33 AwaitingEof,
34 Done,
35}
36
37fn local_error_for_stream_error(
38 io_error: &mut Option<io::Error>,
39 stream_error: &mut Option<StreamError>,
40) -> io::Error {
41 io_error
42 .take()
43 .or_else(|| {
44 stream_error
45 .take()
46 .map(|x| io::Error::new(io::ErrorKind::InvalidData, SentStreamError(x)))
47 })
48 .unwrap_or_else(|| {
49 io::Error::new(
50 io::ErrorKind::InvalidData,
51 "unknown local stream error generated",
52 )
53 })
54}
55
56/// Substate of the [`BackendStream::Connected`] state.
57///
58/// Having the substate and its logic in a separate type allows us to
59/// circumvent problemns with moving data out of `&mut _` when transitioning
60/// between substates.
61pub(super) enum ConnectedState {
62 /// The stream is still being negotiated.
63 Negotiating {
64 /// Current state within the negotiations
65 substate: NegotiationState,
66 },
67
68 /// The stream is ready for transceiving.
69 Ready {
70 /// Stream management state, if any.
71 sm_state: Option<SmState>,
72 },
73
74 SendStreamError {
75 /// Stream error to send.
76 ///
77 /// `None` implies that we now only need to flush.
78 stream_error: Option<StreamError>,
79
80 /// I/O error to return to the caller once the flush is done.
81 ///
82 /// If `None`, an error will be synthesised.
83 io_error: Option<io::Error>,
84
85 /// Deadline until which the error must've been sent and the stream
86 /// must've been shut down.
87 deadline: Pin<Box<tokio::time::Sleep>>,
88 },
89
90 Failed {
91 error: Option<io::Error>,
92 sm_state: Option<SmState>,
93 },
94
95 /// A stream shutdown was initiated locally and we are flushing RX and TX
96 /// queues.
97 LocalShutdown {
98 /// Keep track on whether we have closed the TX side yet.
99 tx_closed: bool,
100
101 /// Keep track on how shut down the receiving side is.
102 rx_state: RxShutdownState,
103
104 /// Deadline until which graceful shutdown must complete; if the
105 /// deadline is exceeded (i.e. the contained Sleep future returns
106 /// ready), the streams will be dropped (and thus closed by the OS).
107 deadline: Pin<Box<tokio::time::Sleep>>,
108 },
109
110 /// The remote side closed the stream.
111 RemoteShutdown {
112 /// Keep the SM state for later resumption.
113 sm_state: Option<SmState>,
114 },
115
116 /// Local shutdown has completed; this is a final state, as local shutdown
117 /// signals an intent of stopping the stream forever.
118 LocalShutdownComplete,
119}
120
121/// Enumeration of events happening while the stream has finished the
122/// connection procedures and is established.
123pub(super) enum ConnectedEvent {
124 /// Event generated by the stream worker.
125 Worker(WorkerEvent),
126
127 /// The remote closed the stream orderly.
128 RemoteShutdown { sm_state: Option<SmState> },
129
130 /// We got disconnected through an error, either an I/O error
131 /// or some kind of stream error.
132 Disconnect {
133 /// Stream management state for later resumption attempts.
134 sm_state: Option<SmState>,
135
136 /// The error which caused the disconnect. This is generally not none,
137 /// but we cannot prove this at compile time because we have to take
138 /// the error from a mutable (i.e. non-owned) place.
139 error: Option<io::Error>,
140 },
141
142 /// A shutdown was requested by the local side of the stream.
143 LocalShutdownRequested,
144}
145
146impl ConnectedState {
147 fn to_stream_error_state(&mut self, stream_error: StreamError) {
148 *self = Self::SendStreamError {
149 stream_error: Some(stream_error),
150 io_error: None,
151 deadline: Box::pin(tokio::time::sleep(LOCAL_SHUTDOWN_TIMEOUT)),
152 };
153 }
154
155 fn to_failed_state(&mut self, error: io::Error, sm_state: Option<SmState>) {
156 *self = Self::Failed {
157 error: Some(error),
158 sm_state,
159 };
160 }
161
162 fn poll_write_sm_req(
163 mut sm_state: Option<&mut SmState>,
164 mut stream: Pin<&mut XmppStream>,
165 cx: &mut Context<'_>,
166 ) -> Poll<io::Result<()>> {
167 if let Some(sm_state) = sm_state.as_mut() {
168 // Request is pending.
169 if sm_state.pending_req {
170 match ready!(<XmppStream as Sink<&XmppStreamElement>>::poll_ready(
171 stream.as_mut(),
172 cx,
173 )) {
174 Ok(()) => (),
175 Err(e) => return Poll::Ready(Err(e)),
176 }
177 match stream
178 .as_mut()
179 .start_send(&XmppStreamElement::SM(sm::Nonza::Req(sm::R)))
180 {
181 Ok(()) => (),
182 Err(e) => {
183 // As the stream promised we would be able to
184 // send, this must be a problem with our
185 // (locally generated) nonza, i.e. this is
186 // fatal.
187 panic!("Failed to send SM Req nonza: {}", e);
188 }
189 }
190 sm_state.pending_req = false;
191 }
192 }
193 Poll::Ready(Ok(()))
194 }
195
196 fn poll_writes_inner(
197 mut sm_state: Option<&mut SmState>,
198 mut stream: Pin<&mut XmppStream>,
199 transmit_queue: &mut TransmitQueue<QueueEntry>,
200 cx: &mut Context<'_>,
201 ) -> Poll<io::Result<()>> {
202 let mut depleted = false;
203
204 // We prefer sending SM reqs before actual data.
205 // SM requests are used in response to soft timeouts as a way to
206 // trigger the remote side to send *something* to us. We may be
207 // sending a lot of data in bulk right now without ever expecting a
208 // response (or at least not anytime soon). In order to ensure that
209 // the server will in fact send a message to us soon, we have to send
210 // the SM request before anything else.
211 //
212 // Example scenario: Sending a bunch of MAM `<message/>`s over a slow,
213 // but low-latency link. The MAM response only triggers a response
214 // from the peer when everything has been transmitted, which may be
215 // longer than the stream timeout.
216 ready!(Self::poll_write_sm_req(
217 sm_state.as_mut().map(|x| x as &mut SmState),
218 stream.as_mut(),
219 cx
220 ))?;
221
222 let mut transmitted = false;
223 // We prefer sending actual data before stream-management ACKs.
224 // While the other side may be waiting for our ACK, we are not obliged
225 // to send it straight away (XEP-0198 explicitly allows us to delay it
226 // for some implementation-defined time if we have stuff to send. Our
227 // implementation-defined time is "infinity"), so we try to make
228 // progress on real data.
229 loop {
230 // If either the queue has nothing for us or the stream isn't
231 // ready to send, we break out of the loop. We don't use ready!
232 // here because we may have SM ACKs to send.
233 let next = match transmit_queue.poll_next(cx) {
234 Poll::Ready(Some(v)) => v,
235 Poll::Ready(None) => {
236 // The transmit_queue is empty, so we set `depleted` to
237 // true in order to ensure that we return Ready if all SM
238 // acks also have been transmitted.
239 depleted = true;
240 break;
241 }
242 Poll::Pending => break,
243 };
244 // If the stream isn't ready to send, none of the other things can
245 // be sent either, so we can use ready!.
246 match ready!(<XmppStream as Sink<&Stanza>>::poll_ready(
247 stream.as_mut(),
248 cx
249 )) {
250 Ok(()) => (),
251 Err(e) => return Poll::Ready(Err(e)),
252 }
253 // We now either send the item or "die trying". It must
254 // be removed from the queue, because even if it fails to
255 // serialise, we don't want to reattempt sending it (
256 // unless by SM resumption retransmission).
257 let next = next.take();
258 match stream.as_mut().start_send(&next.stanza) {
259 Ok(()) => {
260 next.token.send_replace(StanzaState::Sent {});
261 if let Some(sm_state) = sm_state.as_mut() {
262 sm_state.enqueue(next);
263 }
264 transmitted = true;
265 }
266 // Serialisation error, report back to the queue item.
267 Err(e) => {
268 next.token
269 .send_replace(StanzaState::Failed { error: e.into() });
270 }
271 }
272 }
273
274 if let Some(sm_state) = sm_state.as_mut() {
275 // We can set it to transmitted directly, because it has been
276 // cleared by the previous call to poll_write_sm_req.
277 sm_state.pending_req = transmitted;
278 ready!(Self::poll_write_sm_req(Some(sm_state), stream.as_mut(), cx))?;
279 }
280
281 // Now, if the stream will let us and we need to, we can tack
282 // on some SM ACKs.
283 if let Some(sm_state) = sm_state {
284 while sm_state.pending_acks > 0 {
285 match ready!(<XmppStream as Sink<&XmppStreamElement>>::poll_ready(
286 stream.as_mut(),
287 cx,
288 )) {
289 Ok(()) => (),
290 Err(e) => return Poll::Ready(Err(e)),
291 }
292 match stream
293 .as_mut()
294 .start_send(&XmppStreamElement::SM(sm::Nonza::Ack(sm::A {
295 h: sm_state.inbound_ctr(),
296 }))) {
297 Ok(()) => (),
298 Err(e) => {
299 // As the stream promised we would be able to
300 // send, this must be a problem with our
301 // (locally generated) nonza, i.e. this is
302 // fatal.
303 panic!("Failed to send SM Ack nonza: {}", e);
304 }
305 }
306 sm_state.pending_acks -= 1;
307 }
308 }
309
310 // If we haven't transmitted, we may also not have polled
311 // the stream for readiness or flushing. We need to do
312 // that here to ensure progress. Even if our tx queue is
313 // empty, the tx buffer may be nonempty
314 match ready!(<XmppStream as Sink<&Stanza>>::poll_flush(
315 stream.as_mut(),
316 cx
317 )) {
318 Ok(()) => (),
319 Err(e) => return Poll::Ready(Err(e)),
320 }
321
322 // If we end up here, all data we currently have has been
323 // transmitted via the stream and the stream's tx buffers have
324 // been properly flushed.
325 if depleted {
326 // And here, we know that the transmit queue is closed,
327 // too. We return with success.
328 Poll::Ready(Ok(()))
329 } else {
330 // The transmit queue is still open, so more data could
331 // pour in to be transmitted.
332 Poll::Pending
333 }
334 }
335
336 /// Drive the stream in transmit simplex mode.
337 ///
338 /// This will block forever (i.e. return [`Poll::Pending`] without
339 /// installing a waker) if the stream is currently being negotiated.
340 /// Otherwise, it will attempt to drain the `transmit_queue`. When the
341 /// queue is empty, `Ok(())` is returned. When write errors occur, this
342 /// will also block forever.
343 ///
344 /// If nothing is to be sent, but the stream could be used for sending,
345 /// this will drive the flush part of the inner stream.
346 ///
347 /// Any errors are reported on the next call to `poll`.
348 pub fn poll_writes(
349 &mut self,
350 stream: Pin<&mut XmppStream>,
351 transmit_queue: &mut TransmitQueue<QueueEntry>,
352 cx: &mut Context<'_>,
353 ) -> Poll<()> {
354 match self {
355 Self::Ready { sm_state, .. } => match ready!(Self::poll_writes_inner(
356 sm_state.as_mut(),
357 stream,
358 transmit_queue,
359 cx
360 )) {
361 Ok(()) => Poll::Ready(()),
362 Err(e) => {
363 *self = Self::Failed {
364 error: Some(e),
365 sm_state: sm_state.take(),
366 };
367 Poll::Pending
368 }
369 },
370
371 _ => Poll::Pending,
372 }
373 }
374
375 /// Drive the stream in full-duplex mode.
376 ///
377 /// Stanzas from the `transmit_queue` are transmitted once the stream is
378 /// ready.
379 ///
380 /// Returns:
381 /// - Poll::Pending if it blocks on the inner stream
382 /// - Poll::Ready(None) if it needs to be called again for a proper result
383 /// - Poll::Ready(Some(.)) when it has proper result
384 pub fn poll(
385 &mut self,
386 mut stream: Pin<&mut XmppStream>,
387 jid: &Jid,
388 features: &StreamFeatures,
389 transmit_queue: &mut TransmitQueue<QueueEntry>,
390 cx: &mut Context<'_>,
391 ) -> Poll<Option<ConnectedEvent>> {
392 match self {
393 Self::Negotiating { ref mut substate } => {
394 match ready!(substate.advance(stream, jid, transmit_queue, cx)) {
395 Break(NegotiationResult::Disconnect { sm_state, error }) => {
396 self.to_failed_state(error, sm_state);
397 Poll::Ready(None)
398 }
399 Break(NegotiationResult::StreamReset {
400 sm_state,
401 bound_jid,
402 }) => {
403 *self = Self::Ready { sm_state };
404 Poll::Ready(Some(ConnectedEvent::Worker(WorkerEvent::Reset {
405 bound_jid,
406 features: features.clone(),
407 })))
408 }
409 Break(NegotiationResult::StreamResumed { sm_state }) => {
410 *self = Self::Ready {
411 sm_state: Some(sm_state),
412 };
413 Poll::Ready(Some(ConnectedEvent::Worker(WorkerEvent::Resumed)))
414 }
415 Break(NegotiationResult::StreamError { error }) => {
416 self.to_stream_error_state(error);
417 Poll::Ready(None)
418 }
419 Continue(Some(stanza)) => {
420 Poll::Ready(Some(ConnectedEvent::Worker(WorkerEvent::Stanza(stanza))))
421 }
422 Continue(None) => Poll::Ready(None),
423 }
424 }
425
426 Self::SendStreamError {
427 ref mut stream_error,
428 ref mut io_error,
429 ref mut deadline,
430 } => {
431 match stream.as_mut().poll_next(cx) {
432 Poll::Pending
433 | Poll::Ready(None)
434 | Poll::Ready(Some(Err(ReadError::StreamFooterReceived)))
435 | Poll::Ready(Some(Err(ReadError::SoftTimeout))) => (),
436 Poll::Ready(Some(Ok(ev))) => {
437 log::trace!("Discarding incoming data while sending stream error: {ev:?}")
438 }
439 Poll::Ready(Some(Err(ReadError::ParseError(e)))) => {
440 log::trace!("Ignoring parse error while sending stream error: {e}")
441 }
442 Poll::Ready(Some(Err(ReadError::HardError(e)))) => {
443 log::warn!("I/O error while sending stream error: {e}")
444 }
445 }
446
447 match deadline.as_mut().poll(cx) {
448 Poll::Pending => (),
449 Poll::Ready(()) => {
450 log::debug!("Timeout while sending stream error. Discarding state.");
451 let error = local_error_for_stream_error(io_error, stream_error);
452 self.to_failed_state(error, None);
453 return Poll::Ready(None);
454 }
455 }
456
457 // Cannot use ready! here because we have to consider the
458 // case where the other side is refusing to accept data
459 // because its outgoing buffer is too full.
460 if stream_error.is_some() {
461 match ready!(<XmppStream as Sink<&StreamError>>::poll_ready(
462 stream.as_mut(),
463 cx
464 ))
465 .and_then(|()| {
466 // The take serves as transition to the next state.
467 let stream_error = stream_error.take().unwrap();
468 let result = stream.as_mut().start_send(&stream_error);
469 *io_error = Some(local_error_for_stream_error(
470 io_error,
471 &mut Some(stream_error),
472 ));
473 result
474 }) {
475 Ok(()) => (),
476 Err(e) => {
477 log::debug!("Got I/O error while sending stream error: {e}. Skipping error transmission.");
478 let error = local_error_for_stream_error(io_error, stream_error);
479 self.to_failed_state(error, None);
480 return Poll::Ready(None);
481 }
482 }
483 }
484
485 match ready!(<XmppStream as Sink<&StreamError>>::poll_flush(
486 stream.as_mut(),
487 cx
488 )) {
489 Ok(()) => (),
490 Err(e) => {
491 log::debug!(
492 "Got I/O error while flushing stream error: {e}. Skipping flush.",
493 );
494 }
495 }
496 log::trace!("Stream error send complete, transitioning to Failed state");
497 *self = Self::Failed {
498 error: Some(local_error_for_stream_error(io_error, stream_error)),
499 // Do *not* resume after we caused a stream error.
500 sm_state: None,
501 };
502
503 // Request the caller to call us again to get the
504 // actual error message.
505 Poll::Ready(None)
506 }
507
508 Self::Ready { ref mut sm_state } => {
509 match Self::poll_writes_inner(
510 sm_state.as_mut(),
511 stream.as_mut(),
512 transmit_queue,
513 cx,
514 ) {
515 Poll::Pending => (),
516 Poll::Ready(Ok(())) => {
517 *self = Self::LocalShutdown {
518 rx_state: RxShutdownState::AwaitingFooter,
519 tx_closed: false,
520 deadline: Box::pin(tokio::time::sleep(LOCAL_SHUTDOWN_TIMEOUT)),
521 };
522 return Poll::Ready(Some(ConnectedEvent::LocalShutdownRequested));
523 }
524 Poll::Ready(Err(e)) => {
525 *self = Self::Failed {
526 error: Some(e),
527 sm_state: sm_state.take(),
528 };
529 return Poll::Ready(None);
530 }
531 }
532
533 let item = ready!(stream.poll_next(cx));
534 // We switch to a TxOpen or non-connected state when we
535 // receive the stream footer, so reading `None` from the
536 // stream is always an unclean closure.
537 let item = item.unwrap_or_else(|| {
538 Err(ReadError::HardError(io::Error::new(
539 io::ErrorKind::UnexpectedEof,
540 "eof before stream footer",
541 )))
542 });
543 match item {
544 // Easy case, we got some data.
545 Ok(XmppStreamElement::Stanza(data)) => {
546 Poll::Ready(Some(ConnectedEvent::Worker(WorkerEvent::Stanza(data))))
547 }
548
549 Ok(XmppStreamElement::SM(sm::Nonza::Ack(ack))) => {
550 if let Some(sm_state) = sm_state {
551 match sm_state.remote_acked(ack.h) {
552 Ok(()) => Poll::Ready(None),
553 Err(e) => {
554 log::error!(
555 "Failed to process <sm:a/> sent by the server: {e}",
556 );
557 self.to_stream_error_state(e.into());
558 Poll::Ready(None)
559 }
560 }
561 } else {
562 log::debug!("Hmm... I got an <sm:a/> from the peer, but I don't have a stream management state. I'm gonna ignore that...");
563 Poll::Ready(None)
564 }
565 }
566
567 Ok(XmppStreamElement::SM(sm::Nonza::Req(_))) => {
568 if let Some(sm_state) = sm_state {
569 match sm_state.pending_acks.checked_add(1) {
570 None => panic!("Too many pending ACKs, something is wrong."),
571 Some(v) => sm_state.pending_acks = v,
572 }
573 } else {
574 log::warn!("Got an <sm:r/> from the peer, but we don't have any stream management state. Terminating stream with an error.");
575 self.to_stream_error_state(StreamError::new(
576 DefinedCondition::UnsupportedStanzaType,
577 "en",
578 "received <sm:r/>, but stream management is not enabled".to_owned(),
579 ));
580 }
581 // No matter whether we "enqueued" an ACK for send or
582 // whether we just successfully read something from
583 // the stream, we have to request to be polled again
584 // right away.
585 Poll::Ready(None)
586 }
587
588 Ok(other) => {
589 log::warn!(
590 "Received unsupported stream element: {other:?}. Emitting stream error.",
591 );
592 // TODO: figure out a good way to provide the sender
593 // with more information.
594 self.to_stream_error_state(StreamError::new(
595 DefinedCondition::UnsupportedStanzaType,
596 "en",
597 format!("Unsupported stream element: {other:?}"),
598 ));
599 Poll::Ready(None)
600 }
601
602 // Another easy case: Soft timeouts are passed through
603 // to the caller for handling.
604 Err(ReadError::SoftTimeout) => {
605 Poll::Ready(Some(ConnectedEvent::Worker(WorkerEvent::SoftTimeout)))
606 }
607
608 // Parse errors are also just passed through (and will
609 // likely cause us to send a stream error).
610 Err(ReadError::ParseError(e)) => {
611 Poll::Ready(Some(ConnectedEvent::Worker(WorkerEvent::ParseError(e))))
612 }
613
614 // I/O errors cause the stream to be considerde
615 // broken; we drop it and send a Disconnect event with
616 // the error embedded.
617 Err(ReadError::HardError(e)) => {
618 let sm_state = sm_state.take();
619 Poll::Ready(Some(ConnectedEvent::Disconnect {
620 sm_state,
621 error: Some(e),
622 }))
623 }
624
625 // Stream footer indicates the remote wants to shut this
626 // stream down.
627 // We transition into RemoteShutdown state which makes us
628 // emit a special event until the caller takes care of it.
629 Err(ReadError::StreamFooterReceived) => {
630 *self = Self::RemoteShutdown {
631 sm_state: sm_state.take(),
632 };
633
634 // Let us be called again immediately to emit the
635 // notification.
636 Poll::Ready(None)
637 }
638 }
639 }
640
641 Self::Failed { sm_state, error } => Poll::Ready(Some(ConnectedEvent::Disconnect {
642 error: error.take(),
643 sm_state: sm_state.take(),
644 })),
645
646 Self::LocalShutdown { .. } | Self::LocalShutdownComplete => {
647 panic!("poll_next called in local shutdown");
648 }
649
650 Self::RemoteShutdown { ref mut sm_state } => {
651 Poll::Ready(Some(ConnectedEvent::RemoteShutdown {
652 sm_state: sm_state.take(),
653 }))
654 }
655 }
656 }
657
658 pub(super) fn poll_close(
659 &mut self,
660 mut stream: Pin<&mut XmppStream>,
661 cx: &mut Context<'_>,
662 ) -> Poll<Result<(), io::Error>> {
663 loop {
664 match self {
665 // User initiates shutdown by local choice.
666 // The actual shutdown is driven by the poll_read function and we
667 // only get woken up via the close_poller waker.
668 Self::Ready { .. } | Self::RemoteShutdown { .. } | Self::Negotiating { .. } => {
669 *self = Self::LocalShutdown {
670 rx_state: RxShutdownState::AwaitingFooter,
671 tx_closed: false,
672 deadline: Box::pin(tokio::time::sleep(LOCAL_SHUTDOWN_TIMEOUT)),
673 };
674 }
675
676 Self::Failed { error, .. } => match error.take() {
677 Some(error) => return Poll::Ready(Err(error)),
678 None => return Poll::Ready(Ok(())),
679 },
680
681 // If close is called while an attempt is made to send the
682 // stream error, we abort transmission.
683 Self::SendStreamError { .. } => {
684 log::debug!("close() called while stream error was being sent. Aborting transmission of stream error.");
685 *self = Self::LocalShutdown {
686 rx_state: RxShutdownState::AwaitingFooter,
687 tx_closed: false,
688 deadline: Box::pin(tokio::time::sleep(LOCAL_SHUTDOWN_TIMEOUT)),
689 };
690 }
691
692 // Wait for local shutdown (driven by poll_read) to complete.
693 Self::LocalShutdown {
694 ref mut deadline,
695 ref mut rx_state,
696 ref mut tx_closed,
697 } => {
698 match deadline.as_mut().poll(cx) {
699 Poll::Ready(()) => {
700 log::debug!("Dropping stream after shutdown timeout was exceeded.");
701 *self = Self::LocalShutdownComplete;
702 return Poll::Ready(Ok(()));
703 }
704 Poll::Pending => (),
705 }
706
707 if !*tx_closed {
708 // We cannot use ready! here, because we want to poll the
709 // receiving side in parallel.
710 match stream.as_mut().poll_shutdown(cx) {
711 Poll::Pending => (),
712 Poll::Ready(Ok(())) => {
713 *tx_closed = true;
714 }
715 Poll::Ready(Err(e)) => {
716 log::debug!(
717 "Ignoring write error during local stream shutdown: {e}"
718 );
719 *tx_closed = true;
720 }
721 }
722 }
723
724 match rx_state {
725 RxShutdownState::Done => {
726 if !*tx_closed {
727 // poll_close() returned Poll::Pending, so we have to
728 // return that, too.
729 return Poll::Pending;
730 }
731 }
732 // We can use ready! here because the `poll_close` has
733 // happened already; we don't want to poll anything else
734 // anymore.
735 _ => loop {
736 match ready!(stream.as_mut().poll_next(cx)) {
737 None => {
738 if *rx_state != RxShutdownState::AwaitingEof {
739 log::debug!("Ignoring early EOF during stream shutdown.");
740 }
741 *rx_state = RxShutdownState::Done;
742 break;
743 }
744 Some(Ok(data)) => {
745 log::debug!("Ignoring data received on stream during local shutdown: {data:?}");
746 }
747 Some(Err(ReadError::SoftTimeout)) => (),
748 Some(Err(ReadError::HardError(e))) => {
749 *rx_state = RxShutdownState::Done;
750 log::debug!("Ignoring read error during local shutdown: {e}");
751 break;
752 }
753 Some(Err(ReadError::ParseError(e))) => {
754 log::debug!(
755 "Ignoring parse error during local shutdown: {}",
756 e
757 );
758 }
759 Some(Err(ReadError::StreamFooterReceived)) => match rx_state {
760 RxShutdownState::AwaitingFooter => {
761 *rx_state = RxShutdownState::AwaitingEof;
762 }
763 RxShutdownState::AwaitingEof => {
764 unreachable!("multiple stream footers?!")
765 }
766 RxShutdownState::Done => unreachable!(),
767 },
768 }
769 },
770 }
771
772 if *tx_closed && *rx_state == RxShutdownState::Done {
773 // Now that everything is properly cleaned up on the
774 // xmlstream layer, we go through with closure.
775 ready!(<XmppStream as Sink<&Stanza>>::poll_close(
776 stream.as_mut(),
777 cx
778 ))?;
779 // And now that's done, we can finally call it a day.
780 *self = Self::LocalShutdownComplete;
781 return Poll::Ready(Ok(()));
782 } else {
783 return Poll::Pending;
784 }
785 }
786
787 Self::LocalShutdownComplete => return Poll::Ready(Ok(())),
788 }
789 }
790 }
791
792 pub(super) fn start_send_stream_error(&mut self, error: StreamError) {
793 match self {
794 Self::LocalShutdownComplete
795 | Self::LocalShutdown { .. }
796 | Self::RemoteShutdown { .. }
797 | Self::Failed { .. } => {
798 log::debug!("Request to send stream error ({error}), but we are already shutting down or have already failed. Discarding.");
799 return;
800 }
801
802 Self::Ready { .. } | Self::Negotiating { .. } => {}
803
804 Self::SendStreamError { .. } => {
805 log::debug!("Request to send stream error ({error}) while transmission of another stream error is already in progress. Discarding the new one.");
806 return;
807 }
808 }
809
810 *self = Self::SendStreamError {
811 deadline: Box::pin(tokio::time::sleep(LOCAL_SHUTDOWN_TIMEOUT)),
812 stream_error: Some(error),
813 io_error: None,
814 };
815 }
816
817 pub fn queue_sm_request(&mut self) -> bool {
818 match self {
819 Self::Ready {
820 sm_state: Some(sm_state),
821 ..
822 } => {
823 sm_state.pending_req = true;
824 true
825 }
826 _ => false,
827 }
828 }
829}