tokio_xmpp/client/
mod.rs

1// Copyright (c) 2019 Emmanuel Gil Peyrot <linkmauve@linkmauve.fr>
2//
3// This Source Code Form is subject to the terms of the Mozilla Public
4// License, v. 2.0. If a copy of the MPL was not distributed with this
5// file, You can obtain one at http://mozilla.org/MPL/2.0/.
6
7use std::io;
8
9use xmpp_parsers::{jid::Jid, stream_features::StreamFeatures};
10
11use crate::{
12    connect::ServerConnector,
13    error::Error,
14    stanzastream::{StanzaStage, StanzaState, StanzaStream, StanzaToken},
15    xmlstream::Timeouts,
16    Stanza,
17};
18
19#[cfg(any(feature = "starttls", feature = "insecure-tcp"))]
20use crate::connect::DnsConfig;
21#[cfg(feature = "starttls")]
22use crate::connect::StartTlsServerConnector;
23#[cfg(feature = "insecure-tcp")]
24use crate::connect::TcpServerConnector;
25
26mod iq;
27pub(crate) mod login;
28mod stream;
29
30pub use iq::{IqFailure, IqRequest, IqResponse, IqResponseToken};
31
32/// XMPP client connection and state
33///
34/// This implements the `futures` crate's [`Stream`](#impl-Stream) to receive
35/// stream state changes as well as stanzas received via the stream.
36///
37/// To send stanzas, the [`send_stanza`][`Client::send_stanza`] method can be
38/// used.
39pub struct Client {
40    stream: StanzaStream,
41    bound_jid: Option<Jid>,
42    features: Option<StreamFeatures>,
43    iq_response_tracker: iq::IqResponseTracker,
44}
45
46impl Client {
47    /// Get the client's bound JID (the one reported by the XMPP
48    /// server).
49    pub fn bound_jid(&self) -> Option<&Jid> {
50        self.bound_jid.as_ref()
51    }
52
53    /// Send a stanza.
54    ///
55    /// This will automatically allocate an ID if the stanza has no ID set.
56    /// The returned `StanzaToken` is awaited up to the [`StanzaStage::Sent`]
57    /// stage, which means that this coroutine only returns once the stanza
58    /// has actually been written to the XMPP transport.
59    ///
60    /// Note that this does not imply that it has been *reeceived* by the
61    /// peer, nor that it has been successfully processed. To confirm that a
62    /// stanza has been received by a peer, the [`StanzaToken::wait_for`]
63    /// method can be called with [`StanzaStage::Acked`], but that stage will
64    /// only ever be reached if the server supports XEP-0198 and it has been
65    /// negotiated successfully (this may change in the future).
66    ///
67    /// For sending Iq request stanzas, it is recommended to use
68    /// [`send_iq`][`Self::send_iq`], which allows awaiting the response.
69    pub async fn send_stanza(&mut self, mut stanza: Stanza) -> Result<StanzaToken, io::Error> {
70        stanza.ensure_id();
71        let mut token = self.stream.send(Box::new(stanza)).await;
72        match token.wait_for(StanzaStage::Sent).await {
73            // Queued < Sent, so it cannot be reached.
74            Some(StanzaState::Queued) => unreachable!(),
75
76            None | Some(StanzaState::Dropped) => Err(io::Error::new(
77                io::ErrorKind::NotConnected,
78                "stream disconnected fatally before stanza could be sent",
79            )),
80            Some(StanzaState::Failed { error }) => Err(error.into_io_error()),
81            Some(StanzaState::Sent { .. }) | Some(StanzaState::Acked { .. }) => Ok(token),
82        }
83    }
84
85    /// Send an IQ request and return a token to retrieve the response.
86    ///
87    /// This coroutine method will complete once the Iq has been sent to the
88    /// server. The returned `IqResponseToken` can be used to await the
89    /// response. See also the documentation of [`IqResponseToken`] for more
90    /// information on the behaviour of these tokens.
91    ///
92    /// **Important**: Even though IQ responses are delivered through the
93    /// returned token (and never through the `Stream`), the
94    /// [`Stream`][`futures::Stream`]
95    /// implementation of the [`Client`] **must be polled** to make progress
96    /// on the stream and to process incoming stanzas and thus to deliver them
97    /// to the returned token.
98    ///
99    /// **Note**: If an IQ response arrives after the `token` has been
100    /// dropped (e.g. due to a timeout), it will be delivered through the
101    /// `Stream` like any other stanza.
102    pub async fn send_iq(&mut self, to: Option<Jid>, req: IqRequest) -> IqResponseToken {
103        let (iq, mut token) = self.iq_response_tracker.allocate_iq_handle(
104            // from is always None for a client
105            None, to, req,
106        );
107        let stanza_token = self.stream.send(Box::new(iq.into())).await;
108        token.set_stanza_token(stanza_token);
109        token
110    }
111
112    /// Get the stream features (`<stream:features/>`) of the underlying
113    /// stream.
114    ///
115    /// If the stream has not completed negotiation yet, this will return
116    /// `None`. Note that stream features may change at any point due to a
117    /// transparent reconnect.
118    pub fn get_stream_features(&self) -> Option<&StreamFeatures> {
119        self.features.as_ref()
120    }
121
122    /// Close the client cleanly.
123    ///
124    /// This performs an orderly stream shutdown, ensuring that all resources
125    /// are correctly cleaned up.
126    pub async fn send_end(self) -> Result<(), Error> {
127        self.stream.close().await;
128        Ok(())
129    }
130}
131
132#[cfg(feature = "starttls")]
133impl Client {
134    /// Start a new XMPP client using StartTLS transport and autoreconnect
135    ///
136    /// Start polling the returned instance so that it will connect
137    /// and yield events.
138    pub fn new<J: Into<Jid>, P: Into<String>>(jid: J, password: P) -> Self {
139        let jid = jid.into();
140        let dns_config = DnsConfig::srv(&jid.domain().to_string(), "_xmpp-client._tcp", 5222);
141        Self::new_starttls(jid, password, dns_config, Timeouts::default())
142    }
143
144    /// Start a new XMPP client with StartTLS transport and specific DNS config
145    pub fn new_starttls<J: Into<Jid>, P: Into<String>>(
146        jid: J,
147        password: P,
148        dns_config: DnsConfig,
149        timeouts: Timeouts,
150    ) -> Self {
151        Self::new_with_connector(
152            jid,
153            password,
154            StartTlsServerConnector::from(dns_config),
155            timeouts,
156        )
157    }
158}
159
160#[cfg(feature = "insecure-tcp")]
161impl Client {
162    /// Start a new XMPP client with plaintext insecure connection and specific DNS config
163    pub fn new_plaintext<J: Into<Jid>, P: Into<String>>(
164        jid: J,
165        password: P,
166        dns_config: DnsConfig,
167        timeouts: Timeouts,
168    ) -> Self {
169        Self::new_with_connector(
170            jid,
171            password,
172            TcpServerConnector::from(dns_config),
173            timeouts,
174        )
175    }
176}
177
178impl Client {
179    /// Start a new client given that the JID is already parsed.
180    pub fn new_with_connector<J: Into<Jid>, P: Into<String>, C: ServerConnector>(
181        jid: J,
182        password: P,
183        connector: C,
184        timeouts: Timeouts,
185    ) -> Self {
186        Self {
187            stream: StanzaStream::new_c2s(connector, jid.into(), password.into(), timeouts, 16),
188            bound_jid: None,
189            features: None,
190            iq_response_tracker: iq::IqResponseTracker::new(),
191        }
192    }
193}