1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
//! Components in XMPP are services/gateways that are logged into an
//! XMPP server under a JID consisting of just a domain name. They are
//! allowed to use any user and resource identifiers in their stanzas.
use futures::{sink::SinkExt, task::Poll, Sink, Stream};
use minidom::Element;
use std::pin::Pin;
use std::str::FromStr;
use std::task::Context;
use xmpp_parsers::{jid::Jid, ns};

use self::connect::component_login;

use crate::connect::ServerConnector;
use crate::proto::{add_stanza_id, Packet, XmppStream};
use crate::Error;

#[cfg(any(feature = "starttls", feature = "insecure-tcp"))]
use crate::connect::DnsConfig;
#[cfg(feature = "insecure-tcp")]
use crate::connect::TcpServerConnector;

mod auth;

pub(crate) mod connect;

/// Component connection to an XMPP server
///
/// This simplifies the `XmppStream` to a `Stream`/`Sink` of `Element`
/// (stanzas). Connection handling however is up to the user.
pub struct Component<C: ServerConnector> {
    /// The component's Jabber-Id
    pub jid: Jid,
    stream: XmppStream<C::Stream>,
}

#[cfg(feature = "insecure-tcp")]
impl Component<TcpServerConnector> {
    /// Start a new XMPP component over plaintext TCP to localhost:5347
    pub async fn new(jid: &str, password: &str) -> Result<Self, Error> {
        Self::new_plaintext(jid, password, DnsConfig::addr("127.0.0.1:5347")).await
    }

    /// Start a new XMPP component over plaintext TCP
    pub async fn new_plaintext(
        jid: &str,
        password: &str,
        dns_config: DnsConfig,
    ) -> Result<Self, Error> {
        Self::new_with_connector(jid, password, TcpServerConnector::from(dns_config)).await
    }
}

impl<C: ServerConnector> Component<C> {
    /// Start a new XMPP component.
    ///
    /// Unfortunately [`StartTlsConnector`](crate::connect::StartTlsServerConnector) is not supported yet.
    /// The tracking issue is [#143](https://gitlab.com/xmpp-rs/xmpp-rs/-/issues/143).
    pub async fn new_with_connector(
        jid: &str,
        password: &str,
        connector: C,
    ) -> Result<Self, Error> {
        let jid = Jid::from_str(jid)?;
        let password = password.to_owned();
        let stream = component_login(connector, jid.clone(), password).await?;
        Ok(Component { jid, stream })
    }

    /// Send stanza
    pub async fn send_stanza(&mut self, stanza: Element) -> Result<(), Error> {
        self.send(add_stanza_id(stanza, ns::COMPONENT_ACCEPT)).await
    }

    /// End connection
    pub async fn send_end(&mut self) -> Result<(), Error> {
        self.close().await
    }
}

impl<C: ServerConnector> Stream for Component<C> {
    type Item = Element;

    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
        loop {
            match Pin::new(&mut self.stream).poll_next(cx) {
                Poll::Ready(Some(Ok(Packet::Stanza(stanza)))) => return Poll::Ready(Some(stanza)),
                Poll::Ready(Some(Ok(Packet::Text(_)))) => {
                    // retry
                }
                Poll::Ready(Some(Ok(_))) =>
                // unexpected
                {
                    return Poll::Ready(None)
                }
                Poll::Ready(Some(Err(_))) => return Poll::Ready(None),
                Poll::Ready(None) => return Poll::Ready(None),
                Poll::Pending => return Poll::Pending,
            }
        }
    }
}

impl<C: ServerConnector> Sink<Element> for Component<C> {
    type Error = Error;

    fn start_send(mut self: Pin<&mut Self>, item: Element) -> Result<(), Self::Error> {
        Pin::new(&mut self.stream)
            .start_send(Packet::Stanza(item))
            .map_err(|e| e.into())
    }

    fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
        Pin::new(&mut self.stream)
            .poll_ready(cx)
            .map_err(|e| e.into())
    }

    fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
        Pin::new(&mut self.stream)
            .poll_flush(cx)
            .map_err(|e| e.into())
    }

    fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
        Pin::new(&mut self.stream)
            .poll_close(cx)
            .map_err(|e| e.into())
    }
}