p2p_chat/network/handlers/
mailbox.rs

1//! This module contains the handlers for mailbox-related network events.
2use super::super::{NetworkLayer, NetworkResponse};
3use crate::storage::MailboxStore;
4use crate::types::{MailboxRequest, MailboxResponse};
5use anyhow::Result;
6use libp2p::request_response::{self, OutboundRequestId, ResponseChannel};
7use tracing::{debug, error, info, warn};
8
9impl NetworkLayer {
10    /// Handles an event from the `MailboxBehaviour`.
11    ///
12    /// This function is called when an event is received from the `MailboxBehaviour`.
13    /// It dispatches the event to the appropriate handler.
14    ///
15    /// # Arguments
16    ///
17    /// * `event` - The `request_response::Event<MailboxRequest, MailboxResponse>` to handle.
18    ///
19    /// # Errors
20    ///
21    /// This function will return an error if handling the event fails.
22    pub(super) async fn handle_mailbox_event(
23        &mut self,
24        event: request_response::Event<MailboxRequest, MailboxResponse>,
25    ) -> Result<()> {
26        match event {
27            request_response::Event::Message { message, .. } => match message {
28                request_response::Message::Request {
29                    request, channel, ..
30                } => {
31                    self.handle_mailbox_request(request, channel).await?;
32                }
33                request_response::Message::Response {
34                    request_id,
35                    response,
36                } => {
37                    self.handle_mailbox_response(request_id, response).await?;
38                }
39            },
40            request_response::Event::OutboundFailure {
41                request_id, error, ..
42            } => {
43                warn!("Mailbox request failed: {:?}", error);
44                if let Some(sender) = self.pending_requests.remove(&request_id) {
45                    let _ = sender.send(NetworkResponse::Error(format!(
46                        "Request failed: {:?}",
47                        error
48                    )));
49                }
50            }
51            request_response::Event::InboundFailure { error, .. } => {
52                warn!("Mailbox inbound failure: {:?}", error);
53            }
54            _ => {}
55        }
56
57        Ok(())
58    }
59
60    /// Handles an inbound mailbox request.
61    async fn handle_mailbox_request(
62        &mut self,
63        request: MailboxRequest,
64        channel: ResponseChannel<MailboxResponse>,
65    ) -> Result<()> {
66        // Log request with readable format.
67        match &request {
68            MailboxRequest::Put { recipient, message } => {
69                debug!(
70                    "Network mailbox request: Put {{ recipient: {}, message_id: {}, sender: {} }}",
71                    hex::encode(&recipient[..8]),
72                    message.id,
73                    message.sender
74                );
75            }
76            MailboxRequest::Fetch { recipient, limit } => {
77                debug!(
78                    "Network mailbox request: Fetch {{ recipient: {}, limit: {} }}",
79                    hex::encode(&recipient[..8]),
80                    limit
81                );
82            }
83            MailboxRequest::Ack { recipient, msg_ids } => {
84                debug!(
85                    "Network mailbox request: Ack {{ recipient: {}, msg_ids: {:?} }}",
86                    hex::encode(&recipient[..8]),
87                    msg_ids
88                );
89            }
90        }
91
92        let response = if let Some(ref storage) = self.mailbox_storage {
93            match request {
94                MailboxRequest::Put { recipient, message } => {
95                    match storage.store_message(recipient, message).await {
96                        Ok(()) => {
97                            info!(
98                                "Successfully stored message in mailbox for recipient: {}",
99                                hex::encode(&recipient[..8])
100                            );
101
102                            if let Err(e) = self.start_providing_for_recipient(recipient) {
103                                debug!(
104                                    "Failed to register as provider for recipient {}: {}",
105                                    hex::encode(&recipient[..8]),
106                                    e
107                                );
108                            } else {
109                                debug!(
110                                    "Registered as provider for recipient: {}",
111                                    hex::encode(&recipient[..8])
112                                );
113                            }
114
115                            MailboxResponse::PutResult { success: true }
116                        }
117                        Err(e) => {
118                            error!("Failed to store mailbox message: {}", e);
119                            MailboxResponse::PutResult { success: false }
120                        }
121                    }
122                }
123                MailboxRequest::Fetch { recipient, limit } => {
124                    match storage.fetch_messages(recipient, limit).await {
125                        Ok(messages) => {
126                            info!(
127                                "Fetched {} messages for recipient: {}",
128                                messages.len(),
129                                hex::encode(&recipient[..8])
130                            );
131                            MailboxResponse::Messages { items: messages }
132                        }
133                        Err(e) => {
134                            error!("Failed to fetch mailbox messages: {}", e);
135                            MailboxResponse::Messages { items: vec![] }
136                        }
137                    }
138                }
139                MailboxRequest::Ack { recipient, msg_ids } => {
140                    match storage.delete_messages(recipient, msg_ids).await {
141                        Ok(deleted) => {
142                            info!(
143                                "Deleted {} messages for recipient: {}",
144                                deleted,
145                                hex::encode(&recipient[..8])
146                            );
147
148                            match storage.fetch_messages(recipient, 1).await {
149                                Ok(remaining_messages) if remaining_messages.is_empty() => {
150                                    debug!(
151                                        "No more messages for recipient {}, could stop DHT announcement",
152                                        hex::encode(&recipient[..8])
153                                    );
154                                }
155                                Ok(_) => {
156                                    debug!(
157                                        "Still have messages for recipient {}, keeping DHT announcement",
158                                        hex::encode(&recipient[..8])
159                                    );
160                                }
161                                Err(e) => {
162                                    debug!("Failed to check remaining messages for cleanup: {}", e);
163                                }
164                            }
165
166                            MailboxResponse::AckResult { deleted }
167                        }
168                        Err(e) => {
169                            error!("Failed to delete mailbox messages: {}", e);
170                            MailboxResponse::AckResult { deleted: 0 }
171                        }
172                    }
173                }
174            }
175        } else {
176            debug!("No mailbox storage available, returning default responses");
177            match request {
178                MailboxRequest::Put { .. } => MailboxResponse::PutResult { success: false },
179                MailboxRequest::Fetch { .. } => MailboxResponse::Messages { items: vec![] },
180                MailboxRequest::Ack { .. } => MailboxResponse::AckResult { deleted: 0 },
181            }
182        };
183
184        let _ = self
185            .swarm
186            .behaviour_mut()
187            .mailbox
188            .send_response(channel, response);
189        Ok(())
190    }
191
192    /// Handles an outbound mailbox response.
193    async fn handle_mailbox_response(
194        &mut self,
195        request_id: OutboundRequestId,
196        response: MailboxResponse,
197    ) -> Result<()> {
198        if let Some(sender) = self.pending_requests.remove(&request_id) {
199            match response {
200                MailboxResponse::PutResult { success } => {
201                    let _ = sender.send(NetworkResponse::MailboxPutResult { success });
202                }
203                MailboxResponse::Messages { items } => {
204                    let _ = sender.send(NetworkResponse::MailboxMessages { messages: items });
205                }
206                MailboxResponse::AckResult { deleted } => {
207                    let _ = sender.send(NetworkResponse::MailboxAckResult { deleted });
208                }
209            }
210        }
211
212        Ok(())
213    }
214}