Skip to main content

tokio_xmpp/client/
mod.rs

1// Copyright (c) 2019 Emmanuel Gil Peyrot <linkmauve@linkmauve.fr>
2//
3// This Source Code Form is subject to the terms of the Mozilla Public
4// License, v. 2.0. If a copy of the MPL was not distributed with this
5// file, You can obtain one at http://mozilla.org/MPL/2.0/.
6
7use crate::client::{receiver::ClientReceiver, sender::ClientSender};
8use crate::connect::ServerConnector;
9use crate::error::Error;
10use crate::event::Event;
11use crate::stanzastream::{self, StanzaStage, StanzaState, StanzaStream, StanzaToken};
12use crate::xmlstream::Timeouts;
13use crate::Stanza;
14use std::io;
15use std::sync::Arc;
16use tokio::sync::{mpsc, oneshot, Mutex};
17use tokio::task::JoinHandle;
18use xmpp_parsers::{jid::Jid, stream_features::StreamFeatures};
19
20#[cfg(feature = "direct-tls")]
21use crate::connect::DirectTlsServerConnector;
22#[cfg(any(feature = "direct-tls", feature = "starttls", feature = "insecure-tcp"))]
23use crate::connect::DnsConfig;
24#[cfg(feature = "starttls")]
25use crate::connect::StartTlsServerConnector;
26#[cfg(feature = "insecure-tcp")]
27use crate::connect::TcpServerConnector;
28
29mod iq;
30pub(crate) mod login;
31pub(crate) mod receiver;
32pub(crate) mod sender;
33mod stream;
34mod worker;
35
36pub use iq::{IqFailure, IqRequest, IqResponse, IqResponseToken};
37pub use login::auth;
38
39/// XMPP client connection and state
40///
41/// This implements the `futures` crate's [`Stream`](#impl-Stream) to receive
42/// stream state changes as well as stanzas received via the stream.
43///
44/// To send stanzas, the [`send_stanza`][`Client::send_stanza`] method can be
45/// used.
46#[derive(Debug)]
47pub struct Client {
48    // Stanza receiver from the client worker
49    stanza_rx: mpsc::Receiver<Event>,
50    // Stanza sender to the StanzaStream
51    stream_tx: stanzastream::StanzaSender,
52    // Shutdown handle for the client worker
53    shutdown_tx: oneshot::Sender<()>,
54    // Client worker task
55    worker: JoinHandle<stanzastream::StanzaReceiver>,
56    // JID of the logged-in client
57    bound_jid: Option<Jid>,
58    // Stream features of the currently connected stream
59    features: Option<StreamFeatures>,
60    // Response tracker for IQs
61    iq_response_tracker: iq::IqResponseTracker,
62}
63
64impl Client {
65    /// Get the client's bound JID (the one reported by the XMPP
66    /// server).
67    pub fn bound_jid(&self) -> Option<&Jid> {
68        self.bound_jid.as_ref()
69    }
70
71    /// Send a stanza.
72    ///
73    /// This will automatically allocate an ID if the stanza has no ID set.
74    /// The returned `StanzaToken` is awaited up to the [`StanzaStage::Sent`]
75    /// stage, which means that this coroutine only returns once the stanza
76    /// has actually been written to the XMPP transport.
77    ///
78    /// Note that this does not imply that it has been *reeceived* by the
79    /// peer, nor that it has been successfully processed. To confirm that a
80    /// stanza has been received by a peer, the [`StanzaToken::wait_for`]
81    /// method can be called with [`StanzaStage::Acked`], but that stage will
82    /// only ever be reached if the server supports XEP-0198 and it has been
83    /// negotiated successfully (this may change in the future).
84    ///
85    /// For sending Iq request stanzas, it is recommended to use
86    /// [`send_iq`][`Self::send_iq`], which allows awaiting the response.
87    pub async fn send_stanza(&mut self, mut stanza: Stanza) -> Result<StanzaToken, io::Error> {
88        stanza.ensure_id();
89        let mut token = self.stream_tx.send(Box::new(stanza)).await;
90
91        match token.wait_for(StanzaStage::Sent).await {
92            // Queued < Sent, so it cannot be reached.
93            Some(StanzaState::Queued) => unreachable!(),
94
95            None | Some(StanzaState::Dropped) => Err(io::Error::new(
96                io::ErrorKind::NotConnected,
97                "stream disconnected fatally before stanza could be sent",
98            )),
99            Some(StanzaState::Failed { error }) => Err(error.into_io_error()),
100            Some(StanzaState::Sent { .. }) | Some(StanzaState::Acked { .. }) => Ok(token),
101        }
102    }
103
104    /// Send an IQ request and return a token to retrieve the response.
105    ///
106    /// This coroutine method will complete once the Iq has been sent to the
107    /// server. The returned `IqResponseToken` can be used to await the
108    /// response. See also the documentation of [`IqResponseToken`] for more
109    /// information on the behaviour of these tokens.
110    ///
111    /// **Note**: If an IQ response arrives after the `token` has been
112    /// dropped (e.g. due to a timeout), it will be delivered through the
113    /// `Stream` like any other stanza.
114    pub async fn send_iq(&mut self, to: Option<Jid>, req: IqRequest) -> IqResponseToken {
115        let (iq, mut token) = self.iq_response_tracker.allocate_iq_handle(
116            // from is always None for a client
117            None, to, req,
118        );
119        let stanza_token = self.stream_tx.send(Box::new(iq.into())).await;
120
121        token.set_stanza_token(stanza_token);
122        token
123    }
124
125    /// Get the stream features (`<stream:features/>`) of the underlying
126    /// stream.
127    ///
128    /// If the stream has not completed negotiation yet, this will return
129    /// `None`. Note that stream features may change at any point due to a
130    /// transparent reconnect.
131    pub fn get_stream_features(&self) -> Option<&StreamFeatures> {
132        self.features.as_ref()
133    }
134
135    /// Close the client cleanly.
136    ///
137    /// This performs an orderly stream shutdown, ensuring that all resources
138    /// are correctly cleaned up.
139    pub async fn send_end(self) -> Result<(), Error> {
140        self.shutdown_tx.send(()).expect("ClientWorker crashed.");
141
142        let stream_rx = self.worker.await.unwrap();
143        let stream = StanzaStream::reunite(self.stream_tx, stream_rx);
144        stream.close().await;
145
146        Ok(())
147    }
148
149    /// Split the client into [`ClientSender`] and [`ClientReceiver`].
150    pub fn split(self) -> (ClientSender, ClientReceiver) {
151        let client = Arc::new(Mutex::new(self));
152
153        let sender = ClientSender(client.clone());
154        let receiver = ClientReceiver(client);
155
156        (sender, receiver)
157    }
158
159    /// Reunite a [`ClientSender`] and [`ClientReceiver`].
160    ///
161    /// # Panics
162    ///
163    /// This functions returns an error if the [`ClientSender`] and
164    /// [`ClientReceiver`] don't come from the same [`Client`].
165    pub fn reunite(sender: ClientSender, receiver: ClientReceiver) -> Self {
166        assert!(
167            Arc::ptr_eq(&sender.0, &receiver.0),
168            "Unrelated ClientSender and ClientReceiver passed to reunite."
169        );
170
171        drop(sender);
172
173        let inner = Arc::try_unwrap(receiver.0).expect("Failed to unwrap ClientReceiver Arc");
174        inner.into_inner()
175    }
176}
177
178#[cfg(feature = "direct-tls")]
179impl Client {
180    /// Start a new XMPP client using DirectTLS transport and autoreconnect
181    ///
182    /// It use RFC 7590 _xmpps-client._tcp lookup for connector details.
183    pub fn new_direct_tls<J: Into<Jid>, P: Into<String>>(jid: J, password: P) -> Self {
184        let jid_ref = jid.into();
185        let dns_config = DnsConfig::srv_xmpps(jid_ref.domain().as_ref());
186        Self::new_with_connector(
187            jid_ref,
188            password,
189            DirectTlsServerConnector::from(dns_config),
190            Timeouts::default(),
191        )
192    }
193
194    /// Start a new XMPP client with direct TLS transport, useful for testing or
195    /// when one does not want to rely on dns lookups
196    pub fn new_direct_tls_with_config<J: Into<Jid>, P: Into<String>>(
197        jid: J,
198        password: P,
199        dns_config: DnsConfig,
200        timeouts: Timeouts,
201    ) -> Self {
202        Self::new_with_connector(
203            jid,
204            password,
205            DirectTlsServerConnector::from(dns_config),
206            timeouts,
207        )
208    }
209}
210
211#[cfg(feature = "starttls")]
212impl Client {
213    /// Start a new XMPP client using StartTLS transport and autoreconnect
214    ///
215    /// Start polling the returned instance so that it will connect
216    /// and yield events.
217    pub fn new<J: Into<Jid>, P: Into<String>>(jid: J, password: P) -> Self {
218        let jid = jid.into();
219        let dns_config = DnsConfig::srv_default_client(jid.domain().as_ref());
220        Self::new_starttls(jid, password, dns_config, Timeouts::default())
221    }
222
223    /// Start a new XMPP client with StartTLS transport and specific DNS config
224    pub fn new_starttls<J: Into<Jid>, P: Into<String>>(
225        jid: J,
226        password: P,
227        dns_config: DnsConfig,
228        timeouts: Timeouts,
229    ) -> Self {
230        Self::new_with_connector(
231            jid,
232            password,
233            StartTlsServerConnector::from(dns_config),
234            timeouts,
235        )
236    }
237}
238
239#[cfg(feature = "insecure-tcp")]
240impl Client {
241    /// Start a new XMPP client with plaintext insecure connection and specific DNS config
242    pub fn new_plaintext<J: Into<Jid>, P: Into<String>>(
243        jid: J,
244        password: P,
245        dns_config: DnsConfig,
246        timeouts: Timeouts,
247    ) -> Self {
248        Self::new_with_connector(
249            jid,
250            password,
251            TcpServerConnector::from(dns_config),
252            timeouts,
253        )
254    }
255}
256
257impl Client {
258    /// Start a new client given that the JID is already parsed.
259    pub fn new_with_connector<J: Into<Jid>, P: Into<String>, C: ServerConnector>(
260        jid: J,
261        password: P,
262        connector: C,
263        timeouts: Timeouts,
264    ) -> Self {
265        let stream = StanzaStream::new_c2s(connector, jid.into(), password.into(), timeouts, 16);
266        let (stream_tx, stream_rx) = stream.split();
267
268        let iq_response_tracker = iq::IqResponseTracker::new();
269        let (worker, shutdown_tx, stanza_rx) =
270            worker::ClientWorker::new(stream_rx, iq_response_tracker.clone(), 16);
271
272        let worker = tokio::task::spawn(async move { worker.run().await });
273
274        Self {
275            stream_tx,
276            stanza_rx,
277            worker,
278            shutdown_tx,
279            iq_response_tracker,
280            bound_jid: None,
281            features: None,
282        }
283    }
284}