p2p_chat/network/handlers/
mailbox.rs1use super::super::{NetworkLayer, NetworkResponse};
3use crate::storage::MailboxStore;
4use crate::types::{MailboxRequest, MailboxResponse};
5use anyhow::Result;
6use libp2p::request_response::{self, OutboundRequestId, ResponseChannel};
7use tracing::{debug, error, info, warn};
8
9impl NetworkLayer {
10 pub(super) async fn handle_mailbox_event(
23 &mut self,
24 event: request_response::Event<MailboxRequest, MailboxResponse>,
25 ) -> Result<()> {
26 match event {
27 request_response::Event::Message { message, .. } => match message {
28 request_response::Message::Request {
29 request, channel, ..
30 } => {
31 self.handle_mailbox_request(request, channel).await?;
32 }
33 request_response::Message::Response {
34 request_id,
35 response,
36 } => {
37 self.handle_mailbox_response(request_id, response).await?;
38 }
39 },
40 request_response::Event::OutboundFailure {
41 request_id, error, ..
42 } => {
43 warn!("Mailbox request failed: {:?}", error);
44 if let Some(sender) = self.pending_requests.remove(&request_id) {
45 let _ = sender.send(NetworkResponse::Error(format!(
46 "Request failed: {:?}",
47 error
48 )));
49 }
50 }
51 request_response::Event::InboundFailure { error, .. } => {
52 warn!("Mailbox inbound failure: {:?}", error);
53 }
54 _ => {}
55 }
56
57 Ok(())
58 }
59
60 async fn handle_mailbox_request(
62 &mut self,
63 request: MailboxRequest,
64 channel: ResponseChannel<MailboxResponse>,
65 ) -> Result<()> {
66 match &request {
68 MailboxRequest::Put { recipient, message } => {
69 debug!(
70 "Network mailbox request: Put {{ recipient: {}, message_id: {}, sender: {} }}",
71 hex::encode(&recipient[..8]),
72 message.id,
73 message.sender
74 );
75 }
76 MailboxRequest::Fetch { recipient, limit } => {
77 debug!(
78 "Network mailbox request: Fetch {{ recipient: {}, limit: {} }}",
79 hex::encode(&recipient[..8]),
80 limit
81 );
82 }
83 MailboxRequest::Ack { recipient, msg_ids } => {
84 debug!(
85 "Network mailbox request: Ack {{ recipient: {}, msg_ids: {:?} }}",
86 hex::encode(&recipient[..8]),
87 msg_ids
88 );
89 }
90 }
91
92 let response = if let Some(ref storage) = self.mailbox_storage {
93 match request {
94 MailboxRequest::Put { recipient, message } => {
95 match storage.store_message(recipient, message).await {
96 Ok(()) => {
97 info!(
98 "Successfully stored message in mailbox for recipient: {}",
99 hex::encode(&recipient[..8])
100 );
101
102 if let Err(e) = self.start_providing_for_recipient(recipient) {
103 debug!(
104 "Failed to register as provider for recipient {}: {}",
105 hex::encode(&recipient[..8]),
106 e
107 );
108 } else {
109 debug!(
110 "Registered as provider for recipient: {}",
111 hex::encode(&recipient[..8])
112 );
113 }
114
115 MailboxResponse::PutResult { success: true }
116 }
117 Err(e) => {
118 error!("Failed to store mailbox message: {}", e);
119 MailboxResponse::PutResult { success: false }
120 }
121 }
122 }
123 MailboxRequest::Fetch { recipient, limit } => {
124 match storage.fetch_messages(recipient, limit).await {
125 Ok(messages) => {
126 info!(
127 "Fetched {} messages for recipient: {}",
128 messages.len(),
129 hex::encode(&recipient[..8])
130 );
131 MailboxResponse::Messages { items: messages }
132 }
133 Err(e) => {
134 error!("Failed to fetch mailbox messages: {}", e);
135 MailboxResponse::Messages { items: vec![] }
136 }
137 }
138 }
139 MailboxRequest::Ack { recipient, msg_ids } => {
140 match storage.delete_messages(recipient, msg_ids).await {
141 Ok(deleted) => {
142 info!(
143 "Deleted {} messages for recipient: {}",
144 deleted,
145 hex::encode(&recipient[..8])
146 );
147
148 match storage.fetch_messages(recipient, 1).await {
149 Ok(remaining_messages) if remaining_messages.is_empty() => {
150 debug!(
151 "No more messages for recipient {}, could stop DHT announcement",
152 hex::encode(&recipient[..8])
153 );
154 }
155 Ok(_) => {
156 debug!(
157 "Still have messages for recipient {}, keeping DHT announcement",
158 hex::encode(&recipient[..8])
159 );
160 }
161 Err(e) => {
162 debug!("Failed to check remaining messages for cleanup: {}", e);
163 }
164 }
165
166 MailboxResponse::AckResult { deleted }
167 }
168 Err(e) => {
169 error!("Failed to delete mailbox messages: {}", e);
170 MailboxResponse::AckResult { deleted: 0 }
171 }
172 }
173 }
174 }
175 } else {
176 debug!("No mailbox storage available, returning default responses");
177 match request {
178 MailboxRequest::Put { .. } => MailboxResponse::PutResult { success: false },
179 MailboxRequest::Fetch { .. } => MailboxResponse::Messages { items: vec![] },
180 MailboxRequest::Ack { .. } => MailboxResponse::AckResult { deleted: 0 },
181 }
182 };
183
184 let _ = self
185 .swarm
186 .behaviour_mut()
187 .mailbox
188 .send_response(channel, response);
189 Ok(())
190 }
191
192 async fn handle_mailbox_response(
194 &mut self,
195 request_id: OutboundRequestId,
196 response: MailboxResponse,
197 ) -> Result<()> {
198 if let Some(sender) = self.pending_requests.remove(&request_id) {
199 match response {
200 MailboxResponse::PutResult { success } => {
201 let _ = sender.send(NetworkResponse::MailboxPutResult { success });
202 }
203 MailboxResponse::Messages { items } => {
204 let _ = sender.send(NetworkResponse::MailboxMessages { messages: items });
205 }
206 MailboxResponse::AckResult { deleted } => {
207 let _ = sender.send(NetworkResponse::MailboxAckResult { deleted });
208 }
209 }
210 }
211
212 Ok(())
213 }
214}