Skip to main content

tokio_xmpp/client/
mod.rs

1// Copyright (c) 2019 Emmanuel Gil Peyrot <linkmauve@linkmauve.fr>
2//
3// This Source Code Form is subject to the terms of the Mozilla Public
4// License, v. 2.0. If a copy of the MPL was not distributed with this
5// file, You can obtain one at http://mozilla.org/MPL/2.0/.
6
7use crate::client::{receiver::ClientReceiver, sender::ClientSender};
8use crate::connect::ServerConnector;
9use crate::error::Error;
10use crate::event::Event;
11use crate::stanzastream::{self, StanzaStage, StanzaState, StanzaStream, StanzaToken};
12use crate::xmlstream::Timeouts;
13use crate::Stanza;
14use std::io;
15use std::sync::Arc;
16use tokio::sync::{mpsc, oneshot, Mutex};
17use tokio::task::JoinHandle;
18use xmpp_parsers::{jid::Jid, stream_features::StreamFeatures};
19
20#[cfg(feature = "direct-tls")]
21use crate::connect::DirectTlsServerConnector;
22#[cfg(any(feature = "direct-tls", feature = "starttls", feature = "insecure-tcp"))]
23use crate::connect::DnsConfig;
24#[cfg(feature = "starttls")]
25use crate::connect::StartTlsServerConnector;
26#[cfg(feature = "insecure-tcp")]
27use crate::connect::TcpServerConnector;
28
29mod iq;
30pub(crate) mod login;
31pub(crate) mod receiver;
32pub(crate) mod sender;
33mod stream;
34mod worker;
35
36pub use iq::{IqFailure, IqRequest, IqResponse, IqResponseToken};
37
38/// XMPP client connection and state
39///
40/// This implements the `futures` crate's [`Stream`](#impl-Stream) to receive
41/// stream state changes as well as stanzas received via the stream.
42///
43/// To send stanzas, the [`send_stanza`][`Client::send_stanza`] method can be
44/// used.
45#[derive(Debug)]
46pub struct Client {
47    // Stanza receiver from the client worker
48    stanza_rx: mpsc::Receiver<Event>,
49    // Stanza sender to the StanzaStream
50    stream_tx: stanzastream::StanzaSender,
51    // Shutdown handle for the client worker
52    shutdown_tx: oneshot::Sender<()>,
53    // Client worker task
54    worker: JoinHandle<stanzastream::StanzaReceiver>,
55    // JID of the logged-in client
56    bound_jid: Option<Jid>,
57    // Stream features of the currently connected stream
58    features: Option<StreamFeatures>,
59    // Response tracker for IQs
60    iq_response_tracker: iq::IqResponseTracker,
61}
62
63impl Client {
64    /// Get the client's bound JID (the one reported by the XMPP
65    /// server).
66    pub fn bound_jid(&self) -> Option<&Jid> {
67        self.bound_jid.as_ref()
68    }
69
70    /// Send a stanza.
71    ///
72    /// This will automatically allocate an ID if the stanza has no ID set.
73    /// The returned `StanzaToken` is awaited up to the [`StanzaStage::Sent`]
74    /// stage, which means that this coroutine only returns once the stanza
75    /// has actually been written to the XMPP transport.
76    ///
77    /// Note that this does not imply that it has been *reeceived* by the
78    /// peer, nor that it has been successfully processed. To confirm that a
79    /// stanza has been received by a peer, the [`StanzaToken::wait_for`]
80    /// method can be called with [`StanzaStage::Acked`], but that stage will
81    /// only ever be reached if the server supports XEP-0198 and it has been
82    /// negotiated successfully (this may change in the future).
83    ///
84    /// For sending Iq request stanzas, it is recommended to use
85    /// [`send_iq`][`Self::send_iq`], which allows awaiting the response.
86    pub async fn send_stanza(&mut self, mut stanza: Stanza) -> Result<StanzaToken, io::Error> {
87        stanza.ensure_id();
88        let mut token = self.stream_tx.send(Box::new(stanza)).await;
89
90        match token.wait_for(StanzaStage::Sent).await {
91            // Queued < Sent, so it cannot be reached.
92            Some(StanzaState::Queued) => unreachable!(),
93
94            None | Some(StanzaState::Dropped) => Err(io::Error::new(
95                io::ErrorKind::NotConnected,
96                "stream disconnected fatally before stanza could be sent",
97            )),
98            Some(StanzaState::Failed { error }) => Err(error.into_io_error()),
99            Some(StanzaState::Sent { .. }) | Some(StanzaState::Acked { .. }) => Ok(token),
100        }
101    }
102
103    /// Send an IQ request and return a token to retrieve the response.
104    ///
105    /// This coroutine method will complete once the Iq has been sent to the
106    /// server. The returned `IqResponseToken` can be used to await the
107    /// response. See also the documentation of [`IqResponseToken`] for more
108    /// information on the behaviour of these tokens.
109    ///
110    /// **Note**: If an IQ response arrives after the `token` has been
111    /// dropped (e.g. due to a timeout), it will be delivered through the
112    /// `Stream` like any other stanza.
113    pub async fn send_iq(&mut self, to: Option<Jid>, req: IqRequest) -> IqResponseToken {
114        let (iq, mut token) = self.iq_response_tracker.allocate_iq_handle(
115            // from is always None for a client
116            None, to, req,
117        );
118        let stanza_token = self.stream_tx.send(Box::new(iq.into())).await;
119
120        token.set_stanza_token(stanza_token);
121        token
122    }
123
124    /// Get the stream features (`<stream:features/>`) of the underlying
125    /// stream.
126    ///
127    /// If the stream has not completed negotiation yet, this will return
128    /// `None`. Note that stream features may change at any point due to a
129    /// transparent reconnect.
130    pub fn get_stream_features(&self) -> Option<&StreamFeatures> {
131        self.features.as_ref()
132    }
133
134    /// Close the client cleanly.
135    ///
136    /// This performs an orderly stream shutdown, ensuring that all resources
137    /// are correctly cleaned up.
138    pub async fn send_end(self) -> Result<(), Error> {
139        self.shutdown_tx.send(()).expect("ClientWorker crashed.");
140
141        let stream_rx = self.worker.await.unwrap();
142        let stream = StanzaStream::reunite(self.stream_tx, stream_rx);
143        stream.close().await;
144
145        Ok(())
146    }
147
148    /// Split the client into [`ClientSender`] and [`ClientReceiver`].
149    pub fn split(self) -> (ClientSender, ClientReceiver) {
150        let client = Arc::new(Mutex::new(self));
151
152        let sender = ClientSender(client.clone());
153        let receiver = ClientReceiver(client);
154
155        (sender, receiver)
156    }
157
158    /// Reunite a [`ClientSender`] and [`ClientReceiver`].
159    ///
160    /// # Panics
161    ///
162    /// This functions returns an error if the [`ClientSender`] and
163    /// [`ClientReceiver`] don't come from the same [`Client`].
164    pub fn reunite(sender: ClientSender, receiver: ClientReceiver) -> Self {
165        assert!(
166            Arc::ptr_eq(&sender.0, &receiver.0),
167            "Unrelated ClientSender and ClientReceiver passed to reunite."
168        );
169
170        drop(sender);
171
172        let inner = Arc::try_unwrap(receiver.0).expect("Failed to unwrap ClientReceiver Arc");
173        inner.into_inner()
174    }
175}
176
177#[cfg(feature = "direct-tls")]
178impl Client {
179    /// Start a new XMPP client using DirectTLS transport and autoreconnect
180    ///
181    /// It use RFC 7590 _xmpps-client._tcp lookup for connector details.
182    pub fn new_direct_tls<J: Into<Jid>, P: Into<String>>(jid: J, password: P) -> Self {
183        let jid_ref = jid.into();
184        let dns_config = DnsConfig::srv_xmpps(jid_ref.domain().as_ref());
185        Self::new_with_connector(
186            jid_ref,
187            password,
188            DirectTlsServerConnector::from(dns_config),
189            Timeouts::default(),
190        )
191    }
192
193    /// Start a new XMPP client with direct TLS transport, useful for testing or
194    /// when one does not want to rely on dns lookups
195    pub fn new_direct_tls_with_config<J: Into<Jid>, P: Into<String>>(
196        jid: J,
197        password: P,
198        dns_config: DnsConfig,
199        timeouts: Timeouts,
200    ) -> Self {
201        Self::new_with_connector(
202            jid,
203            password,
204            DirectTlsServerConnector::from(dns_config),
205            timeouts,
206        )
207    }
208}
209
210#[cfg(feature = "starttls")]
211impl Client {
212    /// Start a new XMPP client using StartTLS transport and autoreconnect
213    ///
214    /// Start polling the returned instance so that it will connect
215    /// and yield events.
216    pub fn new<J: Into<Jid>, P: Into<String>>(jid: J, password: P) -> Self {
217        let jid = jid.into();
218        let dns_config = DnsConfig::srv_default_client(jid.domain().as_ref());
219        Self::new_starttls(jid, password, dns_config, Timeouts::default())
220    }
221
222    /// Start a new XMPP client with StartTLS transport and specific DNS config
223    pub fn new_starttls<J: Into<Jid>, P: Into<String>>(
224        jid: J,
225        password: P,
226        dns_config: DnsConfig,
227        timeouts: Timeouts,
228    ) -> Self {
229        Self::new_with_connector(
230            jid,
231            password,
232            StartTlsServerConnector::from(dns_config),
233            timeouts,
234        )
235    }
236}
237
238#[cfg(feature = "insecure-tcp")]
239impl Client {
240    /// Start a new XMPP client with plaintext insecure connection and specific DNS config
241    pub fn new_plaintext<J: Into<Jid>, P: Into<String>>(
242        jid: J,
243        password: P,
244        dns_config: DnsConfig,
245        timeouts: Timeouts,
246    ) -> Self {
247        Self::new_with_connector(
248            jid,
249            password,
250            TcpServerConnector::from(dns_config),
251            timeouts,
252        )
253    }
254}
255
256impl Client {
257    /// Start a new client given that the JID is already parsed.
258    pub fn new_with_connector<J: Into<Jid>, P: Into<String>, C: ServerConnector>(
259        jid: J,
260        password: P,
261        connector: C,
262        timeouts: Timeouts,
263    ) -> Self {
264        let stream = StanzaStream::new_c2s(connector, jid.into(), password.into(), timeouts, 16);
265        let (stream_tx, stream_rx) = stream.split();
266
267        let iq_response_tracker = iq::IqResponseTracker::new();
268        let (worker, shutdown_tx, stanza_rx) =
269            worker::ClientWorker::new(stream_rx, iq_response_tracker.clone(), 16);
270
271        let worker = tokio::task::spawn(async move { worker.run().await });
272
273        Self {
274            stream_tx,
275            stanza_rx,
276            worker,
277            shutdown_tx,
278            iq_response_tracker,
279            bound_jid: None,
280            features: None,
281        }
282    }
283}