p2p_chat/sync/engine/mailbox/
fetch.rs1use std::time::Instant;
3
4use anyhow::{anyhow, Result};
5use libp2p::PeerId;
6use tracing::{debug, error, info, trace};
7use uuid::Uuid;
8
9use crate::crypto::StorageEncryption;
10use crate::sync::retry::RetryPolicy;
11
12use super::super::SyncEngine;
13
14impl SyncEngine {
15 pub async fn fetch_from_mailboxes(&mut self) -> Result<()> {
25 if self.discovered_mailboxes.is_empty() {
26 trace!("No mailbox nodes discovered, skipping fetch cycle.");
27 return Ok(());
28 }
29
30 let available_mailboxes = self.get_available_mailboxes();
31 if available_mailboxes.is_empty() {
32 trace!("All discovered mailboxes are currently backed off, skipping fetch cycle.");
33 return Ok(());
34 }
35
36 let mut total_processed = 0;
37 for peer_id in available_mailboxes.iter() {
38 if !self.discovered_mailboxes.contains(peer_id) {
39 debug!(
40 "Skipping fetch from mailbox {} - was removed during iteration",
41 peer_id
42 );
43 continue;
44 }
45
46 if !self.backoff_manager.can_attempt(peer_id) {
47 debug!("Skipping fetch from backed-off mailbox {}", peer_id);
48 continue;
49 }
50
51 match self.fetch_from_single_mailbox(*peer_id).await {
52 Ok(processed_ids) => {
53 total_processed += processed_ids.len();
54 }
55 Err(e) => {
56 error!("Scheduled fetch from mailbox {} failed: {}", peer_id, e);
57 }
58 }
59 }
60
61 if total_processed > 0 {
62 info!(
63 "Fetch cycle completed: {} messages processed across all mailboxes",
64 total_processed
65 );
66 } else {
67 trace!("Fetch cycle completed: no new messages found");
68 }
69
70 Ok(())
71 }
72
73 pub async fn fetch_from_single_mailbox(&mut self, peer_id: PeerId) -> Result<Vec<Uuid>> {
91 let Some(network) = self.network.clone() else {
92 debug!("No network handle available for single mailbox fetch");
93 return Ok(vec![]);
94 };
95
96 let recipient_hash =
97 StorageEncryption::derive_recipient_hash(&self.identity.hpke_public_key());
98
99 debug!("Sync: Fetching messages from mailbox {}", peer_id);
100
101 let start_time = Instant::now();
102 let retry_policy = RetryPolicy::fast_mailbox();
103
104 let fetch_result = retry_policy
105 .retry_with_jitter(|| async {
106 network
107 .mailbox_fetch(peer_id, recipient_hash, 100)
108 .await
109 .map_err(|e| anyhow!("Fetch failed: {}", e))
110 })
111 .await;
112
113 match fetch_result {
114 Ok(messages) => {
115 self.update_mailbox_performance(peer_id, true, start_time.elapsed()).await;
116
117 if messages.is_empty() {
118 trace!("No messages found in mailbox {}", peer_id);
119 return Ok(vec![]);
120 }
121 info!(
122 "Retrieved {} messages from mailbox {}",
123 messages.len(),
124 peer_id
125 );
126
127 match self.process_mailbox_messages(messages).await {
128 Ok(processed_ids) => {
129 if !processed_ids.is_empty() {
130 info!(
131 "Successfully processed {} new messages from mailbox {}",
132 processed_ids.len(),
133 peer_id
134 );
135 if let Err(e) = self
136 .acknowledge_mailbox_messages(processed_ids.clone())
137 .await
138 {
139 error!("Failed to ACK messages to mailbox {}: {}", peer_id, e);
140 }
141 }
142 Ok(processed_ids)
143 }
144 Err(e) => {
145 error!("Failed to process messages from mailbox {}: {}", peer_id, e);
146 Err(e)
147 }
148 }
149 }
150 Err(e) => {
151 let fast_policy = RetryPolicy::fast_mailbox();
152 for _ in 0..fast_policy.max_attempts {
153 self.update_mailbox_performance(
154 peer_id,
155 false,
156 start_time.elapsed() / fast_policy.max_attempts,
157 ).await;
158 }
159
160 if self.should_forget_mailbox(peer_id) {
161 self.forget_failing_mailbox(peer_id).await;
162 }
163
164 error!(
165 "Failed to fetch from mailbox {} after retries: {}",
166 peer_id, e
167 );
168 Err(e)
169 }
170 }
171 }
172}