1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205
// Copyright (c) 2024 Jonas Schäfer <jonas@zombofant.net>
//
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
//! Small helper struct to capture data read from an AsyncBufRead.
use core::pin::Pin;
use core::task::{Context, Poll};
use std::io::{self, IoSlice};
use futures::ready;
use tokio::io::{AsyncBufRead, AsyncRead, AsyncWrite, ReadBuf};
use super::LogXsoBuf;
pin_project_lite::pin_project! {
/// Wrapper around [`AsyncBufRead`] which stores bytes which have been
/// read in an internal vector for later inspection.
///
/// This struct implements [`AsyncRead`] and [`AsyncBufRead`] and passes
/// read requests down to the wrapped [`AsyncBufRead`].
///
/// After capturing has been enabled using [`Self::enable_capture`], any
/// data which is read via the struct will be stored in an internal buffer
/// and can be extracted with [`Self::take_capture`] or discarded using
/// [`Self::discard_capture`].
///
/// This can be used to log data which is being read from a source.
///
/// In addition, this struct implements [`AsyncWrite`] if and only if `T`
/// implements [`AsyncWrite`]. Writing is unaffected by capturing and is
/// implemented solely for convenience purposes (to allow duplex usage
/// of a wrapped I/O object).
pub(super) struct CaptureBufRead<T> {
#[pin]
inner: T,
buf: Option<(Vec<u8>, usize)>,
}
}
impl<T> CaptureBufRead<T> {
/// Wrap a given [`AsyncBufRead`].
///
/// Note that capturing of data which is being read is disabled by default
/// and needs to be enabled using [`Self::enable_capture`].
pub fn wrap(inner: T) -> Self {
Self { inner, buf: None }
}
/// Extract the inner [`AsyncBufRead`] and discard the capture buffer.
pub fn into_inner(self) -> T {
self.inner
}
/// Obtain a reference to the inner [`AsyncBufRead`].
pub fn inner(&self) -> &T {
&self.inner
}
/// Enable capturing of read data into the inner buffer.
///
/// Any data which is read from now on will be copied into the internal
/// buffer. That buffer will grow indefinitely until calls to
/// [`Self::take_capture`] or [`Self::discard_capture`].
pub fn enable_capture(&mut self) {
self.buf = Some((Vec::new(), 0));
}
/// Discard the current buffer data, if any.
///
/// Further data which is read will be captured again.
pub(super) fn discard_capture(self: Pin<&mut Self>) {
let this = self.project();
if let Some((buf, consumed_up_to)) = this.buf.as_mut() {
buf.drain(..*consumed_up_to);
*consumed_up_to = 0;
}
}
/// Take the currently captured data out of the inner buffer.
///
/// Returns `None` unless capturing has been enabled using
/// [`Self::enable_capture`].
pub(super) fn take_capture(self: Pin<&mut Self>) -> Option<Vec<u8>> {
let this = self.project();
let (buf, consumed_up_to) = this.buf.as_mut()?;
let result = buf.drain(..*consumed_up_to).collect();
buf.drain(..*consumed_up_to);
*consumed_up_to = 0;
Some(result)
}
}
impl<T: AsyncRead> AsyncRead for CaptureBufRead<T> {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context,
read_buf: &mut ReadBuf,
) -> Poll<io::Result<()>> {
let this = self.project();
let prev_len = read_buf.filled().len();
let result = ready!(this.inner.poll_read(cx, read_buf));
if let Some((buf, consumed_up_to)) = this.buf.as_mut() {
buf.truncate(*consumed_up_to);
buf.extend(&read_buf.filled()[prev_len..]);
*consumed_up_to = buf.len();
}
Poll::Ready(result)
}
}
impl<T: AsyncBufRead> AsyncBufRead for CaptureBufRead<T> {
fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<&[u8]>> {
let this = self.project();
let result = ready!(this.inner.poll_fill_buf(cx))?;
if let Some((buf, consumed_up_to)) = this.buf.as_mut() {
buf.truncate(*consumed_up_to);
buf.extend(result);
}
Poll::Ready(Ok(result))
}
fn consume(self: Pin<&mut Self>, amt: usize) {
let this = self.project();
this.inner.consume(amt);
if let Some((_, consumed_up_to)) = this.buf.as_mut() {
// Increase the amount of data to preserve.
*consumed_up_to = *consumed_up_to + amt;
}
}
}
impl<T: AsyncWrite> AsyncWrite for CaptureBufRead<T> {
fn poll_write(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<io::Result<usize>> {
self.project().inner.poll_write(cx, buf)
}
fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
self.project().inner.poll_shutdown(cx)
}
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
self.project().inner.poll_flush(cx)
}
fn is_write_vectored(&self) -> bool {
self.inner.is_write_vectored()
}
fn poll_write_vectored(
self: Pin<&mut Self>,
cx: &mut Context,
bufs: &[IoSlice],
) -> Poll<io::Result<usize>> {
self.project().inner.poll_write_vectored(cx, bufs)
}
}
/// Return true if logging via [`log_recv`] or [`log_send`] might be visible
/// to the user.
pub(super) fn log_enabled() -> bool {
log::log_enabled!(log::Level::Trace)
}
/// Log received data.
///
/// `err` is an error which may be logged alongside the received data.
/// `capture` is the data which has been received and which should be logged.
/// If built with the `syntax-highlighting` feature, `capture` data will be
/// logged with XML syntax highlighting.
///
/// If both `err` and `capture` are None, nothing will be logged.
pub(super) fn log_recv(err: Option<&xmpp_parsers::Error>, capture: Option<Vec<u8>>) {
match err {
Some(err) => match capture {
Some(capture) => {
log::trace!("RECV (error: {}) {}", err, LogXsoBuf(&capture));
}
None => {
log::trace!("RECV (error: {}) [data capture disabled]", err);
}
},
None => match capture {
Some(capture) => {
log::trace!("RECV (ok) {}", LogXsoBuf(&capture));
}
None => (),
},
}
}
/// Log sent data.
///
/// If built with the `syntax-highlighting` feature, `data` data will be
/// logged with XML syntax highlighting.
pub(super) fn log_send(data: &[u8]) {
log::trace!("SEND {}", LogXsoBuf(data));
}