p2p_chat/network/
handle.rs

1//! This module defines the `NetworkHandle`, which is the main API for
2//! interacting with the `NetworkLayer` from other parts of the application.
3use anyhow::{anyhow, Result};
4use libp2p::{kad, PeerId};
5use tokio::sync::{mpsc, oneshot};
6
7use crate::types::{ChatRequest, EncryptedMessage, Message};
8
9use super::message::{NetworkCommand, NetworkResponse};
10
11/// A handle for interacting with the `NetworkLayer`.
12///
13/// This struct provides a thread-safe way to send commands to the `NetworkLayer`
14/// and receive responses.
15#[derive(Clone)]
16pub struct NetworkHandle {
17    pub(super) command_sender: mpsc::UnboundedSender<NetworkCommand>,
18}
19
20impl NetworkHandle {
21    /// Sends a chat message to a peer.
22    ///
23    /// # Arguments
24    ///
25    /// * `peer_id` - The `PeerId` of the recipient.
26    /// * `message` - The message to send.
27    ///
28    /// # Errors
29    ///
30    /// This function will return an error if the message cannot be sent.
31    pub async fn send_message(&self, peer_id: PeerId, message: Message) -> Result<()> {
32        let (tx, rx) = oneshot::channel();
33        self.command_sender.send(NetworkCommand::SendMessage {
34            peer_id,
35            message,
36            response: tx,
37        })?;
38
39        match rx.await? {
40            NetworkResponse::MessageSent => Ok(()),
41            NetworkResponse::Error(e) => Err(anyhow!(e)),
42            _ => Err(anyhow!("Unexpected response")),
43        }
44    }
45
46    /// Sends a chat request to a peer.
47    ///
48    /// This can be used for things like sending delivery confirmations or read receipts.
49    ///
50    /// # Arguments
51    ///
52    /// * `peer_id` - The `PeerId` of the recipient.
53    /// * `request` - The chat request to send.
54    ///
55    /// # Errors
56    ///
57    /// This function will return an error if the request cannot be sent.
58    pub async fn send_chat_request(&self, peer_id: PeerId, request: ChatRequest) -> Result<()> {
59        let (tx, rx) = oneshot::channel();
60        self.command_sender.send(NetworkCommand::SendChatRequest {
61            peer_id,
62            request,
63            response: tx,
64        })?;
65
66        match rx.await? {
67            NetworkResponse::MessageSent => Ok(()),
68            NetworkResponse::Error(e) => Err(anyhow!(e)),
69            _ => Err(anyhow!("Unexpected response")),
70        }
71    }
72
73    /// Gets the list of connected peers.
74    ///
75    /// # Errors
76    ///
77    /// This function will return an error if the list of peers cannot be retrieved.
78    pub async fn get_connected_peers(&self) -> Result<Vec<PeerId>> {
79        let (tx, rx) = oneshot::channel();
80        self.command_sender
81            .send(NetworkCommand::GetConnectedPeers { response: tx })?;
82
83        match rx.await? {
84            NetworkResponse::ConnectedPeers { peers } => Ok(peers),
85            NetworkResponse::Error(e) => Err(anyhow!(e)),
86            _ => Err(anyhow!("Unexpected response")),
87        }
88    }
89
90    /// Puts a message into a mailbox.
91    ///
92    /// # Arguments
93    ///
94    /// * `peer_id` - The `PeerId` of the mailbox node.
95    /// * `recipient` - The hash of the recipient's public key.
96    /// * `message` - The encrypted message to store.
97    ///
98    /// # Errors
99    ///
100    /// This function will return an error if the message cannot be stored.
101    pub async fn mailbox_put(
102        &self,
103        peer_id: PeerId,
104        recipient: [u8; 32],
105        message: EncryptedMessage,
106    ) -> Result<bool> {
107        let (tx, rx) = oneshot::channel();
108        self.command_sender.send(NetworkCommand::MailboxPut {
109            peer_id,
110            recipient,
111            message,
112            response: tx,
113        })?;
114        match rx.await? {
115            NetworkResponse::MailboxPutResult { success } => Ok(success),
116            NetworkResponse::Error(e) => Err(anyhow!(e)),
117            _ => Err(anyhow!("Unexpected response")),
118        }
119    }
120
121    /// Fetches messages from a mailbox.
122    ///
123    /// # Arguments
124    ///
125    /// * `peer_id` - The `PeerId` of the mailbox node.
126    /// * `recipient` - The hash of the recipient's public key.
127    /// * `limit` - The maximum number of messages to fetch.
128    ///
129    /// # Errors
130    ///
131    /// This function will return an error if the messages cannot be fetched.
132    pub async fn mailbox_fetch(
133        &self,
134        peer_id: PeerId,
135        recipient: [u8; 32],
136        limit: usize,
137    ) -> Result<Vec<EncryptedMessage>> {
138        let (tx, rx) = oneshot::channel();
139        self.command_sender.send(NetworkCommand::MailboxFetch {
140            peer_id,
141            recipient,
142            limit,
143            response: tx,
144        })?;
145        match rx.await? {
146            NetworkResponse::MailboxMessages { messages } => Ok(messages),
147            NetworkResponse::Error(e) => Err(anyhow!(e)),
148            _ => Err(anyhow!("Unexpected response")),
149        }
150    }
151
152    /// Acknowledges the receipt of messages from a mailbox.
153    ///
154    /// This will delete the acknowledged messages from the mailbox.
155    ///
156    /// # Arguments
157    ///
158    /// * `peer_id` - The `PeerId` of the mailbox node.
159    /// * `recipient` - The hash of the recipient's public key.
160    /// * `msg_ids` - The IDs of the messages to acknowledge.
161    ///
162    /// # Errors
163    ///
164    /// This function will return an error if the messages cannot be acknowledged.
165    pub async fn mailbox_ack(
166        &self,
167        peer_id: PeerId,
168        recipient: [u8; 32],
169        msg_ids: Vec<uuid::Uuid>,
170    ) -> Result<usize> {
171        let (tx, rx) = oneshot::channel();
172        self.command_sender.send(NetworkCommand::MailboxAck {
173            peer_id,
174            recipient,
175            msg_ids,
176            response: tx,
177        })?;
178        match rx.await? {
179            NetworkResponse::MailboxAckResult { deleted } => Ok(deleted),
180            NetworkResponse::Error(e) => Err(anyhow!(e)),
181            _ => Err(anyhow!("Unexpected response")),
182        }
183    }
184
185    /// Starts a Kademlia DHT query to find providers for a key.
186    ///
187    /// # Arguments
188    ///
189    /// * `key` - The key to find providers for.
190    ///
191    /// # Errors
192    ///
193    /// This function will return an error if the query cannot be started.
194    pub async fn start_dht_provider_query(&self, key: kad::RecordKey) -> Result<kad::QueryId> {
195        let (tx, rx) = oneshot::channel();
196        self.command_sender
197            .send(NetworkCommand::StartDhtProviderQuery { key, response: tx })?;
198        rx.await?
199    }
200}