p2p_chat/network/handle.rs
1//! This module defines the `NetworkHandle`, which is the main API for
2//! interacting with the `NetworkLayer` from other parts of the application.
3use anyhow::{anyhow, Result};
4use libp2p::{kad, PeerId};
5use tokio::sync::{mpsc, oneshot};
6
7use crate::types::{ChatRequest, EncryptedMessage, Message};
8
9use super::message::{NetworkCommand, NetworkResponse};
10
11/// A handle for interacting with the `NetworkLayer`.
12///
13/// This struct provides a thread-safe way to send commands to the `NetworkLayer`
14/// and receive responses.
15#[derive(Clone)]
16pub struct NetworkHandle {
17 pub(super) command_sender: mpsc::UnboundedSender<NetworkCommand>,
18}
19
20impl NetworkHandle {
21 /// Sends a chat message to a peer.
22 ///
23 /// # Arguments
24 ///
25 /// * `peer_id` - The `PeerId` of the recipient.
26 /// * `message` - The message to send.
27 ///
28 /// # Errors
29 ///
30 /// This function will return an error if the message cannot be sent.
31 pub async fn send_message(&self, peer_id: PeerId, message: Message) -> Result<()> {
32 let (tx, rx) = oneshot::channel();
33 self.command_sender.send(NetworkCommand::SendMessage {
34 peer_id,
35 message,
36 response: tx,
37 })?;
38
39 match rx.await? {
40 NetworkResponse::MessageSent => Ok(()),
41 NetworkResponse::Error(e) => Err(anyhow!(e)),
42 _ => Err(anyhow!("Unexpected response")),
43 }
44 }
45
46 /// Sends a chat request to a peer.
47 ///
48 /// This can be used for things like sending delivery confirmations or read receipts.
49 ///
50 /// # Arguments
51 ///
52 /// * `peer_id` - The `PeerId` of the recipient.
53 /// * `request` - The chat request to send.
54 ///
55 /// # Errors
56 ///
57 /// This function will return an error if the request cannot be sent.
58 pub async fn send_chat_request(&self, peer_id: PeerId, request: ChatRequest) -> Result<()> {
59 let (tx, rx) = oneshot::channel();
60 self.command_sender.send(NetworkCommand::SendChatRequest {
61 peer_id,
62 request,
63 response: tx,
64 })?;
65
66 match rx.await? {
67 NetworkResponse::MessageSent => Ok(()),
68 NetworkResponse::Error(e) => Err(anyhow!(e)),
69 _ => Err(anyhow!("Unexpected response")),
70 }
71 }
72
73 /// Gets the list of connected peers.
74 ///
75 /// # Errors
76 ///
77 /// This function will return an error if the list of peers cannot be retrieved.
78 pub async fn get_connected_peers(&self) -> Result<Vec<PeerId>> {
79 let (tx, rx) = oneshot::channel();
80 self.command_sender
81 .send(NetworkCommand::GetConnectedPeers { response: tx })?;
82
83 match rx.await? {
84 NetworkResponse::ConnectedPeers { peers } => Ok(peers),
85 NetworkResponse::Error(e) => Err(anyhow!(e)),
86 _ => Err(anyhow!("Unexpected response")),
87 }
88 }
89
90 /// Puts a message into a mailbox.
91 ///
92 /// # Arguments
93 ///
94 /// * `peer_id` - The `PeerId` of the mailbox node.
95 /// * `recipient` - The hash of the recipient's public key.
96 /// * `message` - The encrypted message to store.
97 ///
98 /// # Errors
99 ///
100 /// This function will return an error if the message cannot be stored.
101 pub async fn mailbox_put(
102 &self,
103 peer_id: PeerId,
104 recipient: [u8; 32],
105 message: EncryptedMessage,
106 ) -> Result<bool> {
107 let (tx, rx) = oneshot::channel();
108 self.command_sender.send(NetworkCommand::MailboxPut {
109 peer_id,
110 recipient,
111 message,
112 response: tx,
113 })?;
114 match rx.await? {
115 NetworkResponse::MailboxPutResult { success } => Ok(success),
116 NetworkResponse::Error(e) => Err(anyhow!(e)),
117 _ => Err(anyhow!("Unexpected response")),
118 }
119 }
120
121 /// Fetches messages from a mailbox.
122 ///
123 /// # Arguments
124 ///
125 /// * `peer_id` - The `PeerId` of the mailbox node.
126 /// * `recipient` - The hash of the recipient's public key.
127 /// * `limit` - The maximum number of messages to fetch.
128 ///
129 /// # Errors
130 ///
131 /// This function will return an error if the messages cannot be fetched.
132 pub async fn mailbox_fetch(
133 &self,
134 peer_id: PeerId,
135 recipient: [u8; 32],
136 limit: usize,
137 ) -> Result<Vec<EncryptedMessage>> {
138 let (tx, rx) = oneshot::channel();
139 self.command_sender.send(NetworkCommand::MailboxFetch {
140 peer_id,
141 recipient,
142 limit,
143 response: tx,
144 })?;
145 match rx.await? {
146 NetworkResponse::MailboxMessages { messages } => Ok(messages),
147 NetworkResponse::Error(e) => Err(anyhow!(e)),
148 _ => Err(anyhow!("Unexpected response")),
149 }
150 }
151
152 /// Acknowledges the receipt of messages from a mailbox.
153 ///
154 /// This will delete the acknowledged messages from the mailbox.
155 ///
156 /// # Arguments
157 ///
158 /// * `peer_id` - The `PeerId` of the mailbox node.
159 /// * `recipient` - The hash of the recipient's public key.
160 /// * `msg_ids` - The IDs of the messages to acknowledge.
161 ///
162 /// # Errors
163 ///
164 /// This function will return an error if the messages cannot be acknowledged.
165 pub async fn mailbox_ack(
166 &self,
167 peer_id: PeerId,
168 recipient: [u8; 32],
169 msg_ids: Vec<uuid::Uuid>,
170 ) -> Result<usize> {
171 let (tx, rx) = oneshot::channel();
172 self.command_sender.send(NetworkCommand::MailboxAck {
173 peer_id,
174 recipient,
175 msg_ids,
176 response: tx,
177 })?;
178 match rx.await? {
179 NetworkResponse::MailboxAckResult { deleted } => Ok(deleted),
180 NetworkResponse::Error(e) => Err(anyhow!(e)),
181 _ => Err(anyhow!("Unexpected response")),
182 }
183 }
184
185 /// Starts a Kademlia DHT query to find providers for a key.
186 ///
187 /// # Arguments
188 ///
189 /// * `key` - The key to find providers for.
190 ///
191 /// # Errors
192 ///
193 /// This function will return an error if the query cannot be started.
194 pub async fn start_dht_provider_query(&self, key: kad::RecordKey) -> Result<kad::QueryId> {
195 let (tx, rx) = oneshot::channel();
196 self.command_sender
197 .send(NetworkCommand::StartDhtProviderQuery { key, response: tx })?;
198 rx.await?
199 }
200}