p2p_chat/sync/engine/mailbox/
fetch.rs

1//! This module contains logic for fetching messages from mailbox providers.
2use std::time::Instant;
3
4use anyhow::{anyhow, Result};
5use libp2p::PeerId;
6use tracing::{debug, error, info, trace};
7use uuid::Uuid;
8
9use crate::crypto::StorageEncryption;
10use crate::sync::retry::RetryPolicy;
11
12use super::super::SyncEngine;
13
14impl SyncEngine {
15    /// Fetches messages from all discovered and available mailbox providers.
16    ///
17    /// This function iterates through available mailboxes, attempting to fetch
18    /// messages from each. It skips mailboxes that are currently backed off.
19    ///
20    /// # Errors
21    ///
22    /// This function will return an error if fetching from any mailbox fails,
23    /// but it continues to try other mailboxes.
24    pub async fn fetch_from_mailboxes(&mut self) -> Result<()> {
25        if self.discovered_mailboxes.is_empty() {
26            trace!("No mailbox nodes discovered, skipping fetch cycle.");
27            return Ok(());
28        }
29
30        let available_mailboxes = self.get_available_mailboxes();
31        if available_mailboxes.is_empty() {
32            trace!("All discovered mailboxes are currently backed off, skipping fetch cycle.");
33            return Ok(());
34        }
35
36        let mut total_processed = 0;
37        for peer_id in available_mailboxes.iter() {
38            if !self.discovered_mailboxes.contains(peer_id) {
39                debug!(
40                    "Skipping fetch from mailbox {} - was removed during iteration",
41                    peer_id
42                );
43                continue;
44            }
45
46            if !self.backoff_manager.can_attempt(peer_id) {
47                debug!("Skipping fetch from backed-off mailbox {}", peer_id);
48                continue;
49            }
50
51            match self.fetch_from_single_mailbox(*peer_id).await {
52                Ok(processed_ids) => {
53                    total_processed += processed_ids.len();
54                }
55                Err(e) => {
56                    error!("Scheduled fetch from mailbox {} failed: {}", peer_id, e);
57                }
58            }
59        }
60
61        if total_processed > 0 {
62            info!(
63                "Fetch cycle completed: {} messages processed across all mailboxes",
64                total_processed
65            );
66        } else {
67            trace!("Fetch cycle completed: no new messages found");
68        }
69
70        Ok(())
71    }
72
73    /// Fetches messages from a single mailbox provider.
74    ///
75    /// This function attempts to fetch messages from a specified mailbox,
76    /// processes them, and then acknowledges their receipt. It updates the
77    /// performance metrics for the mailbox based on the outcome.
78    ///
79    /// # Arguments
80    ///
81    /// * `peer_id` - The `PeerId` of the mailbox provider to fetch from.
82    ///
83    /// # Returns
84    ///
85    /// A `Vec` of `Uuid`s representing the IDs of processed messages.
86    ///
87    /// # Errors
88    ///
89    /// This function will return an error if fetching or processing messages fails.
90    pub async fn fetch_from_single_mailbox(&mut self, peer_id: PeerId) -> Result<Vec<Uuid>> {
91        let Some(network) = self.network.clone() else {
92            debug!("No network handle available for single mailbox fetch");
93            return Ok(vec![]);
94        };
95
96        let recipient_hash =
97            StorageEncryption::derive_recipient_hash(&self.identity.hpke_public_key());
98
99        debug!("Sync: Fetching messages from mailbox {}", peer_id);
100
101        let start_time = Instant::now();
102        let retry_policy = RetryPolicy::fast_mailbox();
103
104        let fetch_result = retry_policy
105            .retry_with_jitter(|| async {
106                network
107                    .mailbox_fetch(peer_id, recipient_hash, 100)
108                    .await
109                    .map_err(|e| anyhow!("Fetch failed: {}", e))
110            })
111            .await;
112
113        match fetch_result {
114            Ok(messages) => {
115                self.update_mailbox_performance(peer_id, true, start_time.elapsed()).await;
116
117                if messages.is_empty() {
118                    trace!("No messages found in mailbox {}", peer_id);
119                    return Ok(vec![]);
120                }
121                info!(
122                    "Retrieved {} messages from mailbox {}",
123                    messages.len(),
124                    peer_id
125                );
126
127                match self.process_mailbox_messages(messages).await {
128                    Ok(processed_ids) => {
129                        if !processed_ids.is_empty() {
130                            info!(
131                                "Successfully processed {} new messages from mailbox {}",
132                                processed_ids.len(),
133                                peer_id
134                            );
135                            if let Err(e) = self
136                                .acknowledge_mailbox_messages(processed_ids.clone())
137                                .await
138                            {
139                                error!("Failed to ACK messages to mailbox {}: {}", peer_id, e);
140                            }
141                        }
142                        Ok(processed_ids)
143                    }
144                    Err(e) => {
145                        error!("Failed to process messages from mailbox {}: {}", peer_id, e);
146                        Err(e)
147                    }
148                }
149            }
150            Err(e) => {
151                let fast_policy = RetryPolicy::fast_mailbox();
152                for _ in 0..fast_policy.max_attempts {
153                    self.update_mailbox_performance(
154                        peer_id,
155                        false,
156                        start_time.elapsed() / fast_policy.max_attempts,
157                    ).await;
158                }
159
160                if self.should_forget_mailbox(peer_id) {
161                    self.forget_failing_mailbox(peer_id).await;
162                }
163
164                error!(
165                    "Failed to fetch from mailbox {} after retries: {}",
166                    peer_id, e
167                );
168                Err(e)
169            }
170        }
171    }
172}