1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
use futures::{sink::SinkExt, Sink, Stream};
use minidom::Element;
use std::pin::Pin;
use std::str::FromStr;
use std::task::{Context, Poll};
use tokio_stream::StreamExt;
use xmpp_parsers::{jid::Jid, ns, stream_features::StreamFeatures};

use crate::connect::ServerConnector;
use crate::starttls::ServerConfig;
use crate::xmpp_codec::Packet;
use crate::xmpp_stream::{add_stanza_id, XMPPStream};
use crate::Error;

use super::connect::client_login;

/// A simple XMPP client connection
///
/// This implements the `futures` crate's [`Stream`](#impl-Stream) and
/// [`Sink`](#impl-Sink<Packet>) traits.
pub struct Client<C: ServerConnector> {
    stream: XMPPStream<C::Stream>,
}

impl Client<ServerConfig> {
    /// Start a new XMPP client and wait for a usable session
    pub async fn new<P: Into<String>>(jid: &str, password: P) -> Result<Self, Error> {
        let jid = Jid::from_str(jid)?;
        Self::new_with_jid(jid, password.into()).await
    }

    /// Start a new client given that the JID is already parsed.
    pub async fn new_with_jid(jid: Jid, password: String) -> Result<Self, Error> {
        Self::new_with_jid_connector(ServerConfig::UseSrv, jid, password).await
    }
}

impl<C: ServerConnector> Client<C> {
    /// Start a new client given that the JID is already parsed.
    pub async fn new_with_jid_connector(
        connector: C,
        jid: Jid,
        password: String,
    ) -> Result<Self, Error> {
        let stream = client_login(connector, jid, password).await?;
        Ok(Client { stream })
    }

    /// Get direct access to inner XMPP Stream
    pub fn into_inner(self) -> XMPPStream<C::Stream> {
        self.stream
    }

    /// Get the client's bound JID (the one reported by the XMPP
    /// server).
    pub fn bound_jid(&self) -> &Jid {
        &self.stream.jid
    }

    /// Send stanza
    pub async fn send_stanza<E>(&mut self, stanza: E) -> Result<(), Error>
    where
        E: Into<Element>,
    {
        self.send(Packet::Stanza(add_stanza_id(
            stanza.into(),
            ns::JABBER_CLIENT,
        )))
        .await
    }

    /// Get the stream features (`<stream:features/>`) of the underlying stream
    pub fn get_stream_features(&self) -> &StreamFeatures {
        &self.stream.stream_features
    }

    /// End connection by sending `</stream:stream>`
    ///
    /// You may expect the server to respond with the same. This
    /// client will then drop its connection.
    pub async fn end(mut self) -> Result<(), Error> {
        self.send(Packet::StreamEnd).await?;

        // Wait for stream end from server
        while let Some(Ok(_)) = self.next().await {}

        Ok(())
    }
}

/// Incoming XMPP events
///
/// In an `async fn` you may want to use this with `use
/// futures::stream::StreamExt;`
impl<C: ServerConnector> Stream for Client<C> {
    type Item = Result<Element, Error>;

    /// Low-level read on the XMPP stream
    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
        loop {
            match Pin::new(&mut self.stream).poll_next(cx) {
                Poll::Pending => return Poll::Pending,
                Poll::Ready(Some(Ok(Packet::Stanza(stanza)))) => {
                    return Poll::Ready(Some(Ok(stanza)))
                }
                Poll::Ready(Some(Ok(Packet::Text(_)))) => {
                    // Ignore, retry
                }
                Poll::Ready(_) =>
                // Unexpected and errors, just end
                {
                    return Poll::Ready(None)
                }
            }
        }
    }
}

/// Outgoing XMPP packets
///
/// See `send_stanza()` for an `async fn`
impl<C: ServerConnector> Sink<Packet> for Client<C> {
    type Error = Error;

    fn start_send(mut self: Pin<&mut Self>, item: Packet) -> Result<(), Self::Error> {
        Pin::new(&mut self.stream).start_send(item)
    }

    fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
        Pin::new(&mut self.stream).poll_ready(cx)
    }

    fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
        Pin::new(&mut self.stream).poll_flush(cx)
    }

    fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
        Pin::new(&mut self.stream).poll_close(cx)
    }
}