p2p_chat/network/handlers/
discovery.rs

1//! This module contains the handlers for discovery-related network events.
2use super::super::NetworkLayer;
3use crate::net::discovery::DiscoveryBehaviourEvent;
4use crate::sync::{DhtQueryResult, SyncEvent};
5use anyhow::Result;
6use libp2p::kad;
7use std::collections::HashSet;
8use tracing::{debug, error, info, trace};
9
10impl NetworkLayer {
11    /// Handles an event from the `DiscoveryBehaviour`.
12    ///
13    /// This function is called when an event is received from the `DiscoveryBehaviour`.
14    /// It dispatches the event to the appropriate handler.
15    ///
16    /// # Arguments
17    ///
18    /// * `event` - The `DiscoveryBehaviourEvent` to handle.
19    ///
20    /// # Errors
21    ///
22    /// This function will return an error if handling the event fails.
23    pub(super) async fn handle_discovery_event(
24        &mut self,
25        event: DiscoveryBehaviourEvent,
26    ) -> Result<()> {
27        match event {
28            DiscoveryBehaviourEvent::Mdns(mdns_event) => match mdns_event {
29                libp2p::mdns::Event::Discovered(list) => {
30                    for (peer_id, multiaddr) in list {
31                        info!("Discovered peer via mDNS: {} at {}", peer_id, multiaddr);
32
33                        if self.blocked_peers.contains_key(&peer_id) {
34                            debug!("Skipping mDNS discovery for blocked peer {}", peer_id);
35                            continue;
36                        }
37
38                        self.swarm
39                            .behaviour_mut()
40                            .discovery
41                            .add_peer_address(peer_id, multiaddr.clone());
42
43                        if let Err(e) = self.swarm.dial(multiaddr) {
44                            trace!(
45                                "Failed to proactively dial discovered peer {}: {}",
46                                peer_id,
47                                e
48                            );
49                        }
50                    }
51                }
52                libp2p::mdns::Event::Expired(list) => {
53                    for (peer_id, _) in list {
54                        trace!("mDNS record expired for peer: {}", peer_id);
55                    }
56                }
57            },
58            DiscoveryBehaviourEvent::Kademlia(kad_event) => {
59                self.handle_kademlia_event(kad_event).await?;
60            }
61        }
62
63        Ok(())
64    }
65
66    /// Handles a Kademlia event.
67    async fn handle_kademlia_event(&mut self, event: kad::Event) -> Result<()> {
68        match event {
69            kad::Event::OutboundQueryProgressed { id, result, .. } => match result {
70                kad::QueryResult::GetProviders(Ok(kad::GetProvidersOk::FoundProviders {
71                    key,
72                    providers,
73                    ..
74                })) => {
75                    if !providers.is_empty() {
76                        trace!("Found {} providers for key: {:?}", providers.len(), key);
77                    }
78
79                    if let Some(sync_tx) = &self.sync_event_tx {
80                        let dht_result = DhtQueryResult::ProvidersFound {
81                            providers: providers.into_iter().collect(),
82                            finished: false,
83                        };
84                        let _ = sync_tx.send(SyncEvent::DhtQueryResult {
85                            query_id: id,
86                            result: dht_result,
87                        });
88                    }
89                }
90                kad::QueryResult::GetProviders(Ok(
91                    kad::GetProvidersOk::FinishedWithNoAdditionalRecord { .. },
92                )) => {
93                    trace!("DHT query {} finished with no additional providers", id);
94
95                    if let Some(sync_tx) = &self.sync_event_tx {
96                        let dht_result = DhtQueryResult::ProvidersFound {
97                            providers: HashSet::new(),
98                            finished: true,
99                        };
100                        let _ = sync_tx.send(SyncEvent::DhtQueryResult {
101                            query_id: id,
102                            result: dht_result,
103                        });
104                    }
105                }
106                kad::QueryResult::GetProviders(Err(e)) => {
107                    error!("DHT provider query {} failed: {:?}", id, e);
108
109                    if let Some(sync_tx) = &self.sync_event_tx {
110                        let dht_result = DhtQueryResult::QueryFailed {
111                            error: format!("{:?}", e),
112                        };
113                        let _ = sync_tx.send(SyncEvent::DhtQueryResult {
114                            query_id: id,
115                            result: dht_result,
116                        });
117                    }
118                }
119                _ => {}
120            },
121            kad::Event::RoutingUpdated { peer, .. } => {
122                trace!("Kademlia routing table updated for peer: {}", peer);
123            }
124            _ => {}
125        }
126
127        Ok(())
128    }
129}