1use alloc::sync::Arc;
8use std::collections::HashMap;
9use std::path::{Path, PathBuf};
10use tokio::{
11 io,
12 sync::{mpsc, oneshot, RwLock},
13};
14
15use crate::{
16 jid::{BareJid, Jid},
17 message, muc,
18 parsers::{disco::DiscoInfoResult, message::Message, presence::Presence, roster::Roster},
19 presence::send::make_initial_presence,
20 stream::{xml_stream_worker, NonTransactional, Request},
21 upload, Error, Event, RoomNick,
22};
23use tokio_xmpp::{Client as TokioXmppClient, IqFailure, IqRequest, IqResponse, Stanza};
24
25fn error_foo() -> io::Error {
26 io::Error::new(io::ErrorKind::ConnectionReset, "lost xml stream worker")
27}
28
29pub struct Agent {
30 pub(crate) default_nick: Arc<RwLock<RoomNick>>,
31 pub(crate) lang: Arc<Vec<String>>,
32 pub(crate) disco: DiscoInfoResult,
33 pub(crate) node: String,
34 pub(crate) uploads: Vec<(String, Jid, PathBuf)>,
35 pub(crate) awaiting_disco_bookmarks_type: bool,
36 pub(crate) rooms_joined: HashMap<BareJid, RoomNick>,
38 pub(crate) rooms_joining: HashMap<BareJid, RoomNick>,
39 pub(crate) rooms_leaving: HashMap<BareJid, RoomNick>,
40 pub(crate) cmdq: Arc<RwLock<mpsc::UnboundedSender<Request>>>,
42 pub(crate) miscq: Arc<RwLock<mpsc::UnboundedReceiver<NonTransactional>>>,
43 pub(crate) eventq: Arc<RwLock<mpsc::UnboundedReceiver<Event>>>,
44}
45
46impl Agent {
47 pub async fn new(
48 client: TokioXmppClient,
49 default_nick: RoomNick,
50 lang: Vec<String>,
51 disco: DiscoInfoResult,
52 node: String,
53 ) -> Agent {
54 let (cmdtx, cmdrx) = mpsc::unbounded_channel();
55 let (misctx, miscrx) = mpsc::unbounded_channel();
56 let (eventtx, eventrx) = mpsc::unbounded_channel();
57 let agent = Agent {
58 default_nick: Arc::new(RwLock::new(default_nick)),
59 lang: Arc::new(lang),
60 disco: disco.clone(),
61 node: node.clone(),
62 uploads: Vec::new(),
63 awaiting_disco_bookmarks_type: false,
64 rooms_joined: HashMap::new(),
65 rooms_joining: HashMap::new(),
66 rooms_leaving: HashMap::new(),
67 cmdq: Arc::new(RwLock::new(cmdtx)),
68 miscq: Arc::new(RwLock::new(miscrx)),
69 eventq: Arc::new(RwLock::new(eventrx)),
70 };
71 let _ = tokio::spawn(xml_stream_worker(client, cmdrx, misctx, eventtx));
72 let presence = make_initial_presence(&disco, &node).into();
73 let _ = agent.send_presence(presence).await;
74 let req = IqRequest::Get(
75 Roster {
76 ver: None,
77 items: vec![],
78 }
79 .into(),
80 );
81 let _ = agent.send_iq(None::<Jid>, req).await;
82 agent
83 }
84
85 pub async fn disconnect(&self) -> io::Result<()> {
86 let (tx, rx) = oneshot::channel();
87 let req = Request::Disconnect { response: tx };
88
89 match self.cmdq.write().await.send(req) {
90 Ok(()) => (),
91 Err(_) => return Err(error_foo()),
92 };
93 match rx.await {
94 Ok(v) => v,
95 Err(_) => return Err(error_foo()),
96 }
97 }
98
99 pub async fn send_iq(&self, to: Option<Jid>, data: IqRequest) -> Result<IqResponse, IqFailure> {
100 let (tx, rx) = oneshot::channel();
101 let req = Request::SendIq {
102 to,
103 data,
104 response: tx,
105 };
106
107 match self.cmdq.write().await.send(req) {
108 Ok(_) => (),
109 Err(_) => return Err(IqFailure::SendError(error_foo())),
110 };
111 match rx.await {
112 Ok(v) => v,
113 Err(_) => return Err(IqFailure::SendError(error_foo())),
114 }
115 }
116
117 pub async fn send_message(&mut self, message: Message) -> io::Result<()> {
118 let (tx, rx) = oneshot::channel();
119 let req = Request::SendMessage {
120 message,
121 response: tx,
122 };
123
124 match self.cmdq.write().await.send(req) {
125 Ok(()) => (),
126 Err(_) => return Err(error_foo()),
127 };
128 match rx.await {
129 Ok(v) => v,
130 Err(_) => return Err(error_foo()),
131 }
132 }
133
134 pub async fn send_presence(&self, presence: Presence) -> io::Result<()> {
135 let (tx, rx) = oneshot::channel();
136 let req = Request::SendPresence {
137 presence,
138 response: tx,
139 };
140
141 match self.cmdq.write().await.send(req) {
142 Ok(()) => (),
143 Err(_) => return Err(error_foo()),
144 };
145 match rx.await {
146 Ok(v) => v,
147 Err(_) => return Err(error_foo()),
148 }
149 }
150
151 pub fn misc_receiver(&mut self) -> Arc<RwLock<mpsc::UnboundedReceiver<NonTransactional>>> {
152 Arc::clone(&self.miscq)
153 }
154
155 pub fn events(&mut self) -> Arc<RwLock<mpsc::UnboundedReceiver<Event>>> {
156 Arc::clone(&self.eventq)
157 }
158
159 pub async fn join_room<'a>(&mut self, settings: muc::room::JoinRoomSettings<'a>) {
161 muc::room::join_room(self, settings).await
162 }
163
164 pub async fn leave_room<'a>(&mut self, settings: muc::room::LeaveRoomSettings<'a>) {
169 muc::room::leave_room(self, settings).await
170 }
171
172 pub async fn send_stanza(&self, _stanza: Stanza) -> Result<(), Error> {
173 Ok(())
174 }
175
176 pub async fn send_raw_message<'a>(&mut self, settings: message::send::RawMessageSettings<'a>) {
177 message::send::send_raw_message(self, settings).await
178 }
179
180 pub async fn send_room_message<'a>(&mut self, settings: muc::room::RoomMessageSettings<'a>) {
181 muc::room::send_room_message(self, settings).await
182 }
183
184 pub async fn send_room_private_message<'a>(
185 &mut self,
186 settings: muc::private_message::RoomPrivateMessageSettings<'a>,
187 ) {
188 muc::private_message::send_room_private_message(self, settings).await
189 }
190
191 pub async fn upload_file_with(&mut self, service: &str, path: &Path) {
192 upload::send::upload_file_with(self, service, path).await
193 }
194
195 pub fn bound_jid(&self) -> Option<&Jid> {
199 None
201 }
202}