tokio_xmpp/client/mod.rs
1// Copyright (c) 2019 Emmanuel Gil Peyrot <linkmauve@linkmauve.fr>
2//
3// This Source Code Form is subject to the terms of the Mozilla Public
4// License, v. 2.0. If a copy of the MPL was not distributed with this
5// file, You can obtain one at http://mozilla.org/MPL/2.0/.
6
7use std::io;
8
9use xmpp_parsers::{jid::Jid, stream_features::StreamFeatures};
10
11use crate::{
12 connect::ServerConnector,
13 error::Error,
14 stanzastream::{StanzaStage, StanzaState, StanzaStream, StanzaToken},
15 xmlstream::Timeouts,
16 Stanza,
17};
18
19#[cfg(any(feature = "starttls", feature = "insecure-tcp"))]
20use crate::connect::DnsConfig;
21#[cfg(feature = "insecure-tcp")]
22use crate::connect::TcpServerConnector;
23#[cfg(feature = "starttls")]
24use crate::connect::{DirectTlsServerConnector, StartTlsServerConnector};
25
26mod iq;
27pub(crate) mod login;
28mod stream;
29
30pub use iq::{IqFailure, IqRequest, IqResponse, IqResponseToken};
31
32/// XMPP client connection and state
33///
34/// This implements the `futures` crate's [`Stream`](#impl-Stream) to receive
35/// stream state changes as well as stanzas received via the stream.
36///
37/// To send stanzas, the [`send_stanza`][`Client::send_stanza`] method can be
38/// used.
39#[derive(Debug)]
40pub struct Client {
41 stream: StanzaStream,
42 bound_jid: Option<Jid>,
43 features: Option<StreamFeatures>,
44 iq_response_tracker: iq::IqResponseTracker,
45}
46
47impl Client {
48 /// Get the client's bound JID (the one reported by the XMPP
49 /// server).
50 pub fn bound_jid(&self) -> Option<&Jid> {
51 self.bound_jid.as_ref()
52 }
53
54 /// Send a stanza.
55 ///
56 /// This will automatically allocate an ID if the stanza has no ID set.
57 /// The returned `StanzaToken` is awaited up to the [`StanzaStage::Sent`]
58 /// stage, which means that this coroutine only returns once the stanza
59 /// has actually been written to the XMPP transport.
60 ///
61 /// Note that this does not imply that it has been *reeceived* by the
62 /// peer, nor that it has been successfully processed. To confirm that a
63 /// stanza has been received by a peer, the [`StanzaToken::wait_for`]
64 /// method can be called with [`StanzaStage::Acked`], but that stage will
65 /// only ever be reached if the server supports XEP-0198 and it has been
66 /// negotiated successfully (this may change in the future).
67 ///
68 /// For sending Iq request stanzas, it is recommended to use
69 /// [`send_iq`][`Self::send_iq`], which allows awaiting the response.
70 pub async fn send_stanza(&mut self, mut stanza: Stanza) -> Result<StanzaToken, io::Error> {
71 stanza.ensure_id();
72 let mut token = self.stream.send(Box::new(stanza)).await;
73 match token.wait_for(StanzaStage::Sent).await {
74 // Queued < Sent, so it cannot be reached.
75 Some(StanzaState::Queued) => unreachable!(),
76
77 None | Some(StanzaState::Dropped) => Err(io::Error::new(
78 io::ErrorKind::NotConnected,
79 "stream disconnected fatally before stanza could be sent",
80 )),
81 Some(StanzaState::Failed { error }) => Err(error.into_io_error()),
82 Some(StanzaState::Sent { .. }) | Some(StanzaState::Acked { .. }) => Ok(token),
83 }
84 }
85
86 /// Send an IQ request and return a token to retrieve the response.
87 ///
88 /// This coroutine method will complete once the Iq has been sent to the
89 /// server. The returned `IqResponseToken` can be used to await the
90 /// response. See also the documentation of [`IqResponseToken`] for more
91 /// information on the behaviour of these tokens.
92 ///
93 /// **Important**: Even though IQ responses are delivered through the
94 /// returned token (and never through the `Stream`), the
95 /// [`Stream`][`futures::Stream`]
96 /// implementation of the [`Client`] **must be polled** to make progress
97 /// on the stream and to process incoming stanzas and thus to deliver them
98 /// to the returned token.
99 ///
100 /// **Note**: If an IQ response arrives after the `token` has been
101 /// dropped (e.g. due to a timeout), it will be delivered through the
102 /// `Stream` like any other stanza.
103 pub async fn send_iq(&mut self, to: Option<Jid>, req: IqRequest) -> IqResponseToken {
104 let (iq, mut token) = self.iq_response_tracker.allocate_iq_handle(
105 // from is always None for a client
106 None, to, req,
107 );
108 let stanza_token = self.stream.send(Box::new(iq.into())).await;
109 token.set_stanza_token(stanza_token);
110 token
111 }
112
113 /// Get the stream features (`<stream:features/>`) of the underlying
114 /// stream.
115 ///
116 /// If the stream has not completed negotiation yet, this will return
117 /// `None`. Note that stream features may change at any point due to a
118 /// transparent reconnect.
119 pub fn get_stream_features(&self) -> Option<&StreamFeatures> {
120 self.features.as_ref()
121 }
122
123 /// Close the client cleanly.
124 ///
125 /// This performs an orderly stream shutdown, ensuring that all resources
126 /// are correctly cleaned up.
127 pub async fn send_end(self) -> Result<(), Error> {
128 self.stream.close().await;
129 Ok(())
130 }
131}
132
133#[cfg(feature = "starttls")]
134impl Client {
135 /// Start a new XMPP client using StartTLS transport and autoreconnect
136 ///
137 /// Start polling the returned instance so that it will connect
138 /// and yield events.
139 pub fn new<J: Into<Jid>, P: Into<String>>(jid: J, password: P) -> Self {
140 let jid = jid.into();
141 let dns_config = DnsConfig::srv_default_client(jid.domain().as_ref());
142 Self::new_starttls(jid, password, dns_config, Timeouts::default())
143 }
144
145 /// Start a new XMPP client with StartTLS transport and specific DNS config
146 pub fn new_starttls<J: Into<Jid>, P: Into<String>>(
147 jid: J,
148 password: P,
149 dns_config: DnsConfig,
150 timeouts: Timeouts,
151 ) -> Self {
152 Self::new_with_connector(
153 jid,
154 password,
155 StartTlsServerConnector::from(dns_config),
156 timeouts,
157 )
158 }
159
160 /// Start a new XMPP client with direct TLS transport
161 pub fn new_direct_tls<J: Into<Jid>, P: Into<String>>(
162 jid: J,
163 password: P,
164 dns_config: DnsConfig,
165 timeouts: Timeouts,
166 ) -> Self {
167 Self::new_with_connector(
168 jid,
169 password,
170 DirectTlsServerConnector::from(dns_config),
171 timeouts,
172 )
173 }
174
175 /// Start a new XMPP client with direct TLS using RFC 7590 _xmpps-client._tcp
176 pub fn new_xmpps<J: Into<Jid>, P: Into<String>>(
177 jid: J,
178 password: P,
179 timeouts: Timeouts,
180 ) -> Self {
181 let jid_ref = jid.into();
182 let dns_config = DnsConfig::srv_xmpps(jid_ref.domain().as_ref());
183 Self::new_with_connector(
184 jid_ref,
185 password,
186 DirectTlsServerConnector::from(dns_config),
187 timeouts,
188 )
189 }
190}
191
192#[cfg(feature = "insecure-tcp")]
193impl Client {
194 /// Start a new XMPP client with plaintext insecure connection and specific DNS config
195 pub fn new_plaintext<J: Into<Jid>, P: Into<String>>(
196 jid: J,
197 password: P,
198 dns_config: DnsConfig,
199 timeouts: Timeouts,
200 ) -> Self {
201 Self::new_with_connector(
202 jid,
203 password,
204 TcpServerConnector::from(dns_config),
205 timeouts,
206 )
207 }
208}
209
210impl Client {
211 /// Start a new client given that the JID is already parsed.
212 pub fn new_with_connector<J: Into<Jid>, P: Into<String>, C: ServerConnector>(
213 jid: J,
214 password: P,
215 connector: C,
216 timeouts: Timeouts,
217 ) -> Self {
218 Self {
219 stream: StanzaStream::new_c2s(connector, jid.into(), password.into(), timeouts, 16),
220 bound_jid: None,
221 features: None,
222 iq_response_tracker: iq::IqResponseTracker::new(),
223 }
224 }
225}