p2p_chat/sync/engine/discovery/queries/
discover.rs

1//! This module contains logic for discovering mailbox providers in the DHT.
2use crate::crypto::StorageEncryption;
3use crate::mailbox::{make_mailbox_provider_key, make_recipient_mailbox_key};
4use crate::sync::engine::{DhtQueryState, SyncEngine};
5use anyhow::Result;
6use std::time::{Duration, Instant};
7use tracing::{debug, error, info, trace};
8
9impl SyncEngine {
10    /// Discovers mailbox providers.
11    ///
12    /// This function attempts to discover new mailbox providers in the DHT.
13    /// It is a wrapper around `discover_mailboxes_if_needed` with `force` set to `false`.
14    ///
15    /// # Errors
16    ///
17    /// This function will return an error if the discovery process fails.
18    pub async fn discover_mailboxes(&mut self) -> Result<()> {
19        self.discover_mailboxes_if_needed(false).await
20    }
21
22    /// Loads cached mailboxes from the database into `discovered_mailboxes`.
23    ///
24    /// # Errors
25    ///
26    /// This function will return an error if loading the cached mailboxes fails.
27    async fn load_cached_mailboxes(&mut self) -> Result<()> {
28        let cached = self.known_mailboxes.list_mailboxes().await?;
29
30        if !cached.is_empty() {
31            info!(
32                "Loaded {} cached mailboxes from database",
33                cached.len()
34            );
35
36            for mailbox in cached {
37                self.discovered_mailboxes.insert(mailbox.peer_id);
38                trace!("Loaded cached mailbox: {}", mailbox.peer_id);
39            }
40        }
41
42        Ok(())
43    }
44
45    /// Discovers mailbox providers in the DHT if needed.
46    ///
47    /// This function checks if there are enough available mailboxes or if a recent
48    /// discovery has already been performed (rate-limiting). It can be forced
49    /// to run a discovery regardless of these conditions.
50    ///
51    /// # Arguments
52    ///
53    /// * `force` - If `true`, a discovery will be performed even if conditions
54    ///             for skipping are met.
55    ///
56    /// # Errors
57    ///
58    /// This function will return an error if the discovery process fails.
59    pub async fn discover_mailboxes_if_needed(&mut self, force: bool) -> Result<()> {
60        // On first call (when discovered_mailboxes is empty), load cached mailboxes.
61        if self.discovered_mailboxes.is_empty() {
62            if let Err(e) = self.load_cached_mailboxes().await {
63                error!("Failed to load cached mailboxes: {}", e);
64            }
65        }
66
67        let current_mailbox_count = self.discovered_mailboxes.len();
68        let available_mailbox_count = self.get_available_mailboxes().len();
69
70        if !force {
71            if available_mailbox_count >= 2 {
72                trace!(
73                    "Have {} available mailboxes, skipping discovery",
74                    available_mailbox_count
75                );
76                return Ok(());
77            }
78
79            if let Some(last_discovery) = self.last_discovery_time {
80                if last_discovery.elapsed() < Duration::from_secs(30) {
81                    trace!(
82                        "Last discovery was {:?} ago, skipping (rate limited)",
83                        last_discovery.elapsed()
84                    );
85                    return Ok(());
86                }
87            }
88        }
89
90        trace!(
91            "Discovering mailbox providers in DHT (currently have: {}, available: {}, force: {})",
92            current_mailbox_count,
93            available_mailbox_count,
94            force
95        );
96
97        let Some(network) = &self.network else {
98            debug!("No network handle available for mailbox discovery");
99            return Ok(());
100        };
101
102        self.last_discovery_time = Some(Instant::now());
103
104        // Start a DHT query for general mailbox providers.
105        let general_mailbox_key = make_mailbox_provider_key();
106        if !self.has_pending_query_for(&general_mailbox_key) {
107            match network
108                .start_dht_provider_query(general_mailbox_key.clone())
109                .await
110            {
111                Ok(query_id) => {
112                    let query_state = DhtQueryState {
113                        key: general_mailbox_key,
114                        started_at: Instant::now(),
115                        received_results: false,
116                    };
117                    self.pending_dht_queries.insert(query_id, query_state);
118                    trace!(
119                        "Started DHT query for general mailbox providers: {:?}",
120                        query_id
121                    );
122                }
123                Err(e) => {
124                    error!("Failed to start DHT query for general mailboxes: {}", e);
125                }
126            }
127        } else {
128            trace!("Skipping DHT query for general mailbox providers; query already pending");
129        }
130
131        // Start a DHT query for recipient-specific mailbox providers (for our own hash).
132        let our_recipient_hash =
133            StorageEncryption::derive_recipient_hash(&self.identity.hpke_public_key());
134        let recipient_mailbox_key = make_recipient_mailbox_key(our_recipient_hash);
135
136        if !self.has_pending_query_for(&recipient_mailbox_key) {
137            match network
138                .start_dht_provider_query(recipient_mailbox_key.clone())
139                .await
140            {
141                Ok(query_id) => {
142                    let query_state = DhtQueryState {
143                        key: recipient_mailbox_key,
144                        started_at: Instant::now(),
145                        received_results: false,
146                    };
147                    self.pending_dht_queries.insert(query_id, query_state);
148                    trace!(
149                        "Started DHT query for recipient-specific mailbox providers: {:?}",
150                        query_id
151                    );
152                }
153                Err(e) => {
154                    trace!(
155                        "Failed to start DHT query for recipient-specific mailboxes: {}",
156                        e
157                    );
158                }
159            }
160        } else {
161            trace!("Skipping DHT query for recipient-specific mailboxes; query already pending");
162        }
163
164        Ok(())
165    }
166}