p2p_chat/sync/engine/discovery/queries/
discover.rs1use crate::crypto::StorageEncryption;
3use crate::mailbox::{make_mailbox_provider_key, make_recipient_mailbox_key};
4use crate::sync::engine::{DhtQueryState, SyncEngine};
5use anyhow::Result;
6use std::time::{Duration, Instant};
7use tracing::{debug, error, info, trace};
8
9impl SyncEngine {
10 pub async fn discover_mailboxes(&mut self) -> Result<()> {
19 self.discover_mailboxes_if_needed(false).await
20 }
21
22 async fn load_cached_mailboxes(&mut self) -> Result<()> {
28 let cached = self.known_mailboxes.list_mailboxes().await?;
29
30 if !cached.is_empty() {
31 info!(
32 "Loaded {} cached mailboxes from database",
33 cached.len()
34 );
35
36 for mailbox in cached {
37 self.discovered_mailboxes.insert(mailbox.peer_id);
38 trace!("Loaded cached mailbox: {}", mailbox.peer_id);
39 }
40 }
41
42 Ok(())
43 }
44
45 pub async fn discover_mailboxes_if_needed(&mut self, force: bool) -> Result<()> {
60 if self.discovered_mailboxes.is_empty() {
62 if let Err(e) = self.load_cached_mailboxes().await {
63 error!("Failed to load cached mailboxes: {}", e);
64 }
65 }
66
67 let current_mailbox_count = self.discovered_mailboxes.len();
68 let available_mailbox_count = self.get_available_mailboxes().len();
69
70 if !force {
71 if available_mailbox_count >= 2 {
72 trace!(
73 "Have {} available mailboxes, skipping discovery",
74 available_mailbox_count
75 );
76 return Ok(());
77 }
78
79 if let Some(last_discovery) = self.last_discovery_time {
80 if last_discovery.elapsed() < Duration::from_secs(30) {
81 trace!(
82 "Last discovery was {:?} ago, skipping (rate limited)",
83 last_discovery.elapsed()
84 );
85 return Ok(());
86 }
87 }
88 }
89
90 trace!(
91 "Discovering mailbox providers in DHT (currently have: {}, available: {}, force: {})",
92 current_mailbox_count,
93 available_mailbox_count,
94 force
95 );
96
97 let Some(network) = &self.network else {
98 debug!("No network handle available for mailbox discovery");
99 return Ok(());
100 };
101
102 self.last_discovery_time = Some(Instant::now());
103
104 let general_mailbox_key = make_mailbox_provider_key();
106 if !self.has_pending_query_for(&general_mailbox_key) {
107 match network
108 .start_dht_provider_query(general_mailbox_key.clone())
109 .await
110 {
111 Ok(query_id) => {
112 let query_state = DhtQueryState {
113 key: general_mailbox_key,
114 started_at: Instant::now(),
115 received_results: false,
116 };
117 self.pending_dht_queries.insert(query_id, query_state);
118 trace!(
119 "Started DHT query for general mailbox providers: {:?}",
120 query_id
121 );
122 }
123 Err(e) => {
124 error!("Failed to start DHT query for general mailboxes: {}", e);
125 }
126 }
127 } else {
128 trace!("Skipping DHT query for general mailbox providers; query already pending");
129 }
130
131 let our_recipient_hash =
133 StorageEncryption::derive_recipient_hash(&self.identity.hpke_public_key());
134 let recipient_mailbox_key = make_recipient_mailbox_key(our_recipient_hash);
135
136 if !self.has_pending_query_for(&recipient_mailbox_key) {
137 match network
138 .start_dht_provider_query(recipient_mailbox_key.clone())
139 .await
140 {
141 Ok(query_id) => {
142 let query_state = DhtQueryState {
143 key: recipient_mailbox_key,
144 started_at: Instant::now(),
145 received_results: false,
146 };
147 self.pending_dht_queries.insert(query_id, query_state);
148 trace!(
149 "Started DHT query for recipient-specific mailbox providers: {:?}",
150 query_id
151 );
152 }
153 Err(e) => {
154 trace!(
155 "Failed to start DHT query for recipient-specific mailboxes: {}",
156 e
157 );
158 }
159 }
160 } else {
161 trace!("Skipping DHT query for recipient-specific mailboxes; query already pending");
162 }
163
164 Ok(())
165 }
166}