tokio_xmpp/client/mod.rs
1// Copyright (c) 2019 Emmanuel Gil Peyrot <linkmauve@linkmauve.fr>
2//
3// This Source Code Form is subject to the terms of the Mozilla Public
4// License, v. 2.0. If a copy of the MPL was not distributed with this
5// file, You can obtain one at http://mozilla.org/MPL/2.0/.
6
7use std::io;
8
9use xmpp_parsers::{jid::Jid, stream_features::StreamFeatures};
10
11use crate::{
12 connect::ServerConnector,
13 error::Error,
14 stanzastream::{StanzaStage, StanzaState, StanzaStream, StanzaToken},
15 xmlstream::Timeouts,
16 Stanza,
17};
18
19#[cfg(feature = "direct-tls")]
20use crate::connect::DirectTlsServerConnector;
21#[cfg(any(feature = "direct-tls", feature = "starttls", feature = "insecure-tcp"))]
22use crate::connect::DnsConfig;
23#[cfg(feature = "starttls")]
24use crate::connect::StartTlsServerConnector;
25#[cfg(feature = "insecure-tcp")]
26use crate::connect::TcpServerConnector;
27
28mod iq;
29pub(crate) mod login;
30mod stream;
31pub use login::auth;
32
33pub use iq::{IqFailure, IqRequest, IqResponse, IqResponseToken};
34
35/// XMPP client connection and state
36///
37/// This implements the `futures` crate's [`Stream`](#impl-Stream) to receive
38/// stream state changes as well as stanzas received via the stream.
39///
40/// To send stanzas, the [`send_stanza`][`Client::send_stanza`] method can be
41/// used.
42#[derive(Debug)]
43pub struct Client {
44 stream: StanzaStream,
45 bound_jid: Option<Jid>,
46 features: Option<StreamFeatures>,
47 iq_response_tracker: iq::IqResponseTracker,
48}
49
50impl Client {
51 /// Get the client's bound JID (the one reported by the XMPP
52 /// server).
53 pub fn bound_jid(&self) -> Option<&Jid> {
54 self.bound_jid.as_ref()
55 }
56
57 /// Send a stanza.
58 ///
59 /// This will automatically allocate an ID if the stanza has no ID set.
60 /// The returned `StanzaToken` is awaited up to the [`StanzaStage::Sent`]
61 /// stage, which means that this coroutine only returns once the stanza
62 /// has actually been written to the XMPP transport.
63 ///
64 /// Note that this does not imply that it has been *reeceived* by the
65 /// peer, nor that it has been successfully processed. To confirm that a
66 /// stanza has been received by a peer, the [`StanzaToken::wait_for`]
67 /// method can be called with [`StanzaStage::Acked`], but that stage will
68 /// only ever be reached if the server supports XEP-0198 and it has been
69 /// negotiated successfully (this may change in the future).
70 ///
71 /// For sending Iq request stanzas, it is recommended to use
72 /// [`send_iq`][`Self::send_iq`], which allows awaiting the response.
73 pub async fn send_stanza(&mut self, mut stanza: Stanza) -> Result<StanzaToken, io::Error> {
74 stanza.ensure_id();
75 let mut token = self.stream.send(Box::new(stanza)).await;
76 match token.wait_for(StanzaStage::Sent).await {
77 // Queued < Sent, so it cannot be reached.
78 Some(StanzaState::Queued) => unreachable!(),
79
80 None | Some(StanzaState::Dropped) => Err(io::Error::new(
81 io::ErrorKind::NotConnected,
82 "stream disconnected fatally before stanza could be sent",
83 )),
84 Some(StanzaState::Failed { error }) => Err(error.into_io_error()),
85 Some(StanzaState::Sent { .. }) | Some(StanzaState::Acked { .. }) => Ok(token),
86 }
87 }
88
89 /// Send an IQ request and return a token to retrieve the response.
90 ///
91 /// This coroutine method will complete once the Iq has been sent to the
92 /// server. The returned `IqResponseToken` can be used to await the
93 /// response. See also the documentation of [`IqResponseToken`] for more
94 /// information on the behaviour of these tokens.
95 ///
96 /// **Important**: Even though IQ responses are delivered through the
97 /// returned token (and never through the `Stream`), the
98 /// [`Stream`][`futures::Stream`]
99 /// implementation of the [`Client`] **must be polled** to make progress
100 /// on the stream and to process incoming stanzas and thus to deliver them
101 /// to the returned token.
102 ///
103 /// **Note**: If an IQ response arrives after the `token` has been
104 /// dropped (e.g. due to a timeout), it will be delivered through the
105 /// `Stream` like any other stanza.
106 pub async fn send_iq(&mut self, to: Option<Jid>, req: IqRequest) -> IqResponseToken {
107 let (iq, mut token) = self.iq_response_tracker.allocate_iq_handle(
108 // from is always None for a client
109 None, to, req,
110 );
111 let stanza_token = self.stream.send(Box::new(iq.into())).await;
112 token.set_stanza_token(stanza_token);
113 token
114 }
115
116 /// Get the stream features (`<stream:features/>`) of the underlying
117 /// stream.
118 ///
119 /// If the stream has not completed negotiation yet, this will return
120 /// `None`. Note that stream features may change at any point due to a
121 /// transparent reconnect.
122 pub fn get_stream_features(&self) -> Option<&StreamFeatures> {
123 self.features.as_ref()
124 }
125
126 /// Close the client cleanly.
127 ///
128 /// This performs an orderly stream shutdown, ensuring that all resources
129 /// are correctly cleaned up.
130 pub async fn send_end(self) -> Result<(), Error> {
131 self.stream.close().await;
132 Ok(())
133 }
134}
135
136#[cfg(feature = "direct-tls")]
137impl Client {
138 /// Start a new XMPP client using DirectTLS transport and autoreconnect
139 ///
140 /// It use RFC 7590 _xmpps-client._tcp lookup for connector details.
141 pub fn new_direct_tls<J: Into<Jid>, P: Into<String>>(jid: J, password: P) -> Self {
142 let jid_ref = jid.into();
143 let dns_config = DnsConfig::srv_xmpps(jid_ref.domain().as_ref());
144 Self::new_with_connector(
145 jid_ref,
146 password,
147 DirectTlsServerConnector::from(dns_config),
148 Timeouts::default(),
149 )
150 }
151
152 /// Start a new XMPP client with direct TLS transport, useful for testing or
153 /// when one does not want to rely on dns lookups
154 pub fn new_direct_tls_with_config<J: Into<Jid>, P: Into<String>>(
155 jid: J,
156 password: P,
157 dns_config: DnsConfig,
158 timeouts: Timeouts,
159 ) -> Self {
160 Self::new_with_connector(
161 jid,
162 password,
163 DirectTlsServerConnector::from(dns_config),
164 timeouts,
165 )
166 }
167}
168
169#[cfg(feature = "starttls")]
170impl Client {
171 /// Start a new XMPP client using StartTLS transport and autoreconnect
172 ///
173 /// Start polling the returned instance so that it will connect
174 /// and yield events.
175 pub fn new<J: Into<Jid>, P: Into<String>>(jid: J, password: P) -> Self {
176 let jid = jid.into();
177 let dns_config = DnsConfig::srv_default_client(jid.domain().as_ref());
178 Self::new_starttls(jid, password, dns_config, Timeouts::default())
179 }
180
181 /// Start a new XMPP client with StartTLS transport and specific DNS config
182 pub fn new_starttls<J: Into<Jid>, P: Into<String>>(
183 jid: J,
184 password: P,
185 dns_config: DnsConfig,
186 timeouts: Timeouts,
187 ) -> Self {
188 Self::new_with_connector(
189 jid,
190 password,
191 StartTlsServerConnector::from(dns_config),
192 timeouts,
193 )
194 }
195}
196
197#[cfg(feature = "insecure-tcp")]
198impl Client {
199 /// Start a new XMPP client with plaintext insecure connection and specific DNS config
200 pub fn new_plaintext<J: Into<Jid>, P: Into<String>>(
201 jid: J,
202 password: P,
203 dns_config: DnsConfig,
204 timeouts: Timeouts,
205 ) -> Self {
206 Self::new_with_connector(
207 jid,
208 password,
209 TcpServerConnector::from(dns_config),
210 timeouts,
211 )
212 }
213}
214
215impl Client {
216 /// Start a new client given that the JID is already parsed.
217 pub fn new_with_connector<J: Into<Jid>, P: Into<String>, C: ServerConnector>(
218 jid: J,
219 password: P,
220 connector: C,
221 timeouts: Timeouts,
222 ) -> Self {
223 Self {
224 stream: StanzaStream::new_c2s(connector, jid.into(), password.into(), timeouts, 16),
225 bound_jid: None,
226 features: None,
227 iq_response_tracker: iq::IqResponseTracker::new(),
228 }
229 }
230}