xmpp/
agent.rs

1// Copyright (c) 2023 xmpp-rs contributors.
2//
3// This Source Code Form is subject to the terms of the Mozilla Public
4// License, v. 2.0. If a copy of the MPL was not distributed with this
5// file, You can obtain one at http://mozilla.org/MPL/2.0/.
6
7use alloc::sync::Arc;
8use std::collections::HashMap;
9use std::path::{Path, PathBuf};
10use tokio::{
11    io,
12    sync::{mpsc, oneshot, RwLock},
13};
14
15use crate::{
16    jid::{BareJid, Jid},
17    message, muc,
18    parsers::{disco::DiscoInfoResult, message::Message, presence::Presence, roster::Roster},
19    presence::send::make_initial_presence,
20    stream::{xml_stream_worker, NonTransactional, Request},
21    upload, Error, Event, RoomNick,
22};
23use tokio_xmpp::{Client as TokioXmppClient, IqFailure, IqRequest, IqResponse, Stanza};
24
25fn error_foo() -> io::Error {
26    io::Error::new(io::ErrorKind::ConnectionReset, "lost xml stream worker")
27}
28
29pub struct Agent {
30    pub(crate) default_nick: Arc<RwLock<RoomNick>>,
31    pub(crate) lang: Arc<Vec<String>>,
32    pub(crate) disco: DiscoInfoResult,
33    pub(crate) node: String,
34    pub(crate) uploads: Vec<(String, Jid, PathBuf)>,
35    pub(crate) awaiting_disco_bookmarks_type: bool,
36    // Mapping of room->nick
37    pub(crate) rooms_joined: HashMap<BareJid, RoomNick>,
38    pub(crate) rooms_joining: HashMap<BareJid, RoomNick>,
39    pub(crate) rooms_leaving: HashMap<BareJid, RoomNick>,
40    // Communication with the xml stream worker
41    pub(crate) cmdq: Arc<RwLock<mpsc::UnboundedSender<Request>>>,
42    pub(crate) miscq: Arc<RwLock<mpsc::UnboundedReceiver<NonTransactional>>>,
43    pub(crate) eventq: Arc<RwLock<mpsc::UnboundedReceiver<Event>>>,
44}
45
46impl Agent {
47    pub async fn new(
48        client: TokioXmppClient,
49        default_nick: RoomNick,
50        lang: Vec<String>,
51        disco: DiscoInfoResult,
52        node: String,
53    ) -> Agent {
54        let (cmdtx, cmdrx) = mpsc::unbounded_channel();
55        let (misctx, miscrx) = mpsc::unbounded_channel();
56        let (eventtx, eventrx) = mpsc::unbounded_channel();
57        let agent = Agent {
58            default_nick: Arc::new(RwLock::new(default_nick)),
59            lang: Arc::new(lang),
60            disco: disco.clone(),
61            node: node.clone(),
62            uploads: Vec::new(),
63            awaiting_disco_bookmarks_type: false,
64            rooms_joined: HashMap::new(),
65            rooms_joining: HashMap::new(),
66            rooms_leaving: HashMap::new(),
67            cmdq: Arc::new(RwLock::new(cmdtx)),
68            miscq: Arc::new(RwLock::new(miscrx)),
69            eventq: Arc::new(RwLock::new(eventrx)),
70        };
71        let _ = tokio::spawn(xml_stream_worker(client, cmdrx, misctx, eventtx));
72        let presence = make_initial_presence(&disco, &node).into();
73        let _ = agent.send_presence(presence).await;
74        let req = IqRequest::Get(
75            Roster {
76                ver: None,
77                items: vec![],
78            }
79            .into(),
80        );
81        let _ = agent.send_iq(None::<Jid>, req).await;
82        agent
83    }
84
85    pub async fn disconnect(&self) -> io::Result<()> {
86        let (tx, rx) = oneshot::channel();
87        let req = Request::Disconnect { response: tx };
88
89        match self.cmdq.write().await.send(req) {
90            Ok(()) => (),
91            Err(_) => return Err(error_foo()),
92        };
93        match rx.await {
94            Ok(v) => v,
95            Err(_) => return Err(error_foo()),
96        }
97    }
98
99    pub async fn send_iq(&self, to: Option<Jid>, data: IqRequest) -> Result<IqResponse, IqFailure> {
100        let (tx, rx) = oneshot::channel();
101        let req = Request::SendIq {
102            to,
103            data,
104            response: tx,
105        };
106
107        match self.cmdq.write().await.send(req) {
108            Ok(_) => (),
109            Err(_) => return Err(IqFailure::SendError(error_foo())),
110        };
111        match rx.await {
112            Ok(v) => v,
113            Err(_) => return Err(IqFailure::SendError(error_foo())),
114        }
115    }
116
117    pub async fn send_message(&mut self, message: Message) -> io::Result<()> {
118        let (tx, rx) = oneshot::channel();
119        let req = Request::SendMessage {
120            message,
121            response: tx,
122        };
123
124        match self.cmdq.write().await.send(req) {
125            Ok(()) => (),
126            Err(_) => return Err(error_foo()),
127        };
128        match rx.await {
129            Ok(v) => v,
130            Err(_) => return Err(error_foo()),
131        }
132    }
133
134    pub async fn send_presence(&self, presence: Presence) -> io::Result<()> {
135        let (tx, rx) = oneshot::channel();
136        let req = Request::SendPresence {
137            presence,
138            response: tx,
139        };
140
141        match self.cmdq.write().await.send(req) {
142            Ok(()) => (),
143            Err(_) => return Err(error_foo()),
144        };
145        match rx.await {
146            Ok(v) => v,
147            Err(_) => return Err(error_foo()),
148        }
149    }
150
151    pub fn misc_receiver(&mut self) -> Arc<RwLock<mpsc::UnboundedReceiver<NonTransactional>>> {
152        Arc::clone(&self.miscq)
153    }
154
155    pub fn events(&mut self) -> Arc<RwLock<mpsc::UnboundedReceiver<Event>>> {
156        Arc::clone(&self.eventq)
157    }
158
159    // TODO: Old API, remove? Readapt?
160    pub async fn join_room<'a>(&mut self, settings: muc::room::JoinRoomSettings<'a>) {
161        muc::room::join_room(self, settings).await
162    }
163
164    /// Request to leave a chatroom.
165    ///
166    /// If successful, an [Event::RoomLeft] event will be produced. This method does not remove the room
167    /// from bookmarks nor remove the autojoin flag. See [muc::room::leave_room] for more information.
168    pub async fn leave_room<'a>(&mut self, settings: muc::room::LeaveRoomSettings<'a>) {
169        muc::room::leave_room(self, settings).await
170    }
171
172    pub async fn send_stanza(&self, _stanza: Stanza) -> Result<(), Error> {
173        Ok(())
174    }
175
176    pub async fn send_raw_message<'a>(&mut self, settings: message::send::RawMessageSettings<'a>) {
177        message::send::send_raw_message(self, settings).await
178    }
179
180    pub async fn send_room_message<'a>(&mut self, settings: muc::room::RoomMessageSettings<'a>) {
181        muc::room::send_room_message(self, settings).await
182    }
183
184    pub async fn send_room_private_message<'a>(
185        &mut self,
186        settings: muc::private_message::RoomPrivateMessageSettings<'a>,
187    ) {
188        muc::private_message::send_room_private_message(self, settings).await
189    }
190
191    pub async fn upload_file_with(&mut self, service: &str, path: &Path) {
192        upload::send::upload_file_with(self, service, path).await
193    }
194
195    /// Get the bound jid of the client.
196    ///
197    /// If the client is not connected, this will be None.
198    pub fn bound_jid(&self) -> Option<&Jid> {
199        // self.client.bound_jid()
200        None
201    }
202}