p2p_chat/sync/engine/discovery/queries/handlers.rs
1//! This module contains handlers for processing Kademlia DHT query results
2//! within the synchronization engine.
3use crate::storage::KnownMailbox;
4use crate::sync::engine::events::DhtQueryResult;
5use crate::sync::engine::SyncEngine;
6use anyhow::Result;
7use libp2p::kad;
8use tracing::{debug, error, info, trace};
9
10impl SyncEngine {
11 /// Handles the result of a DHT query.
12 ///
13 /// This function processes `DhtQueryResult`s, typically updating the list
14 /// of discovered mailbox providers and triggering actions like retrying the outbox.
15 ///
16 /// # Arguments
17 ///
18 /// * `key` - The `kad::RecordKey` that the query was performed for.
19 /// * `result` - The `DhtQueryResult` containing the outcome of the query.
20 ///
21 /// # Errors
22 ///
23 /// This function will return an error if processing the result fails, e.g.,
24 /// if there are issues saving a new mailbox to the database.
25 pub async fn handle_dht_query_result(
26 &mut self,
27 key: kad::RecordKey,
28 result: DhtQueryResult,
29 ) -> Result<()> {
30 match result {
31 DhtQueryResult::ProvidersFound {
32 providers,
33 finished,
34 } => {
35 let key_str = String::from_utf8_lossy(key.as_ref());
36 if !providers.is_empty() {
37 info!(
38 "Found {} providers for key {} (finished: {})",
39 providers.len(),
40 key_str,
41 finished
42 );
43 }
44
45 let mut new_providers = 0;
46 for provider in providers {
47 if !self.backoff_manager.can_attempt(&provider) {
48 if let Some(retry_time) = self.backoff_manager.time_until_retry(&provider) {
49 debug!(
50 "Skipping backed-off mailbox {} (retry in {:?})",
51 provider, retry_time
52 );
53 }
54 continue;
55 }
56
57 if self.discovered_mailboxes.insert(provider) {
58 new_providers += 1;
59 info!("Discovered new mailbox provider: {}", provider);
60 self.backoff_manager.record_success(&provider);
61
62 // Save newly discovered mailbox to database.
63 let known_mailbox = KnownMailbox::new(provider);
64 if let Err(e) = self.known_mailboxes.add_mailbox(known_mailbox).await {
65 error!("Failed to save mailbox {} to database: {}", provider, e);
66 } else {
67 trace!("Saved mailbox {} to database cache", provider);
68 }
69 }
70 }
71
72 if new_providers > 0 {
73 info!(
74 "Added {} new mailbox provider(s) to the pool.",
75 new_providers
76 );
77
78 if let Err(e) = self.retry_outbox().await {
79 error!(
80 "Failed to retry outbox after discovering new mailboxes: {}",
81 e
82 );
83 }
84 }
85 }
86 DhtQueryResult::QueryFailed { error } => {
87 let key_str = String::from_utf8_lossy(key.as_ref());
88 trace!("DHT query failed for key {}: {}", key_str, error);
89 }
90 }
91 Ok(())
92 }
93}