libp2p_kad/
behaviour.rs

1// Copyright 2018 Parity Technologies (UK) Ltd.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21//! Implementation of the `Kademlia` network behaviour.
22
23mod test;
24
25use crate::addresses::Addresses;
26use crate::handler::{Handler, HandlerEvent, HandlerIn, RequestId};
27use crate::jobs::*;
28use crate::kbucket::{self, Distance, KBucketsTable, NodeStatus};
29use crate::protocol::{ConnectionType, KadPeer, ProtocolConfig};
30use crate::query::{Query, QueryConfig, QueryId, QueryPool, QueryPoolState};
31use crate::record::{
32    self,
33    store::{self, RecordStore},
34    ProviderRecord, Record,
35};
36use crate::K_VALUE;
37use fnv::{FnvHashMap, FnvHashSet};
38use instant::Instant;
39use libp2p_core::{ConnectedPoint, Endpoint, Multiaddr};
40use libp2p_identity::PeerId;
41use libp2p_swarm::behaviour::{
42    AddressChange, ConnectionClosed, ConnectionEstablished, DialFailure, FromSwarm,
43};
44use libp2p_swarm::{
45    dial_opts::{self, DialOpts},
46    ConnectionDenied, ConnectionHandler, ConnectionId, DialError, ExternalAddresses,
47    ListenAddresses, NetworkBehaviour, NotifyHandler, StreamProtocol, THandler, THandlerInEvent,
48    THandlerOutEvent, ToSwarm,
49};
50use smallvec::SmallVec;
51use std::collections::{BTreeMap, HashMap, HashSet, VecDeque};
52use std::fmt;
53use std::num::NonZeroUsize;
54use std::task::{Context, Poll, Waker};
55use std::time::Duration;
56use std::vec;
57use thiserror::Error;
58use tracing::Level;
59
60pub use crate::query::QueryStats;
61
62/// `Behaviour` is a `NetworkBehaviour` that implements the libp2p
63/// Kademlia protocol.
64pub struct Behaviour<TStore> {
65    /// The Kademlia routing table.
66    kbuckets: KBucketsTable<kbucket::Key<PeerId>, Addresses>,
67
68    /// The k-bucket insertion strategy.
69    kbucket_inserts: BucketInserts,
70
71    /// Configuration of the wire protocol.
72    protocol_config: ProtocolConfig,
73
74    /// Configuration of [`RecordStore`] filtering.
75    record_filtering: StoreInserts,
76
77    /// The currently active (i.e. in-progress) queries.
78    queries: QueryPool<QueryInner>,
79
80    /// The currently connected peers.
81    ///
82    /// This is a superset of the connected peers currently in the routing table.
83    connected_peers: FnvHashSet<PeerId>,
84
85    /// Periodic job for re-publication of provider records for keys
86    /// provided by the local node.
87    add_provider_job: Option<AddProviderJob>,
88
89    /// Periodic job for (re-)replication and (re-)publishing of
90    /// regular (value-)records.
91    put_record_job: Option<PutRecordJob>,
92
93    /// The TTL of regular (value-)records.
94    record_ttl: Option<Duration>,
95
96    /// The TTL of provider records.
97    provider_record_ttl: Option<Duration>,
98
99    /// Queued events to return when the behaviour is being polled.
100    queued_events: VecDeque<ToSwarm<Event, HandlerIn>>,
101
102    listen_addresses: ListenAddresses,
103
104    external_addresses: ExternalAddresses,
105
106    connections: HashMap<ConnectionId, PeerId>,
107
108    /// See [`Config::caching`].
109    caching: Caching,
110
111    local_peer_id: PeerId,
112
113    mode: Mode,
114    auto_mode: bool,
115    no_events_waker: Option<Waker>,
116
117    /// The record storage.
118    store: TStore,
119}
120
121/// The configurable strategies for the insertion of peers
122/// and their addresses into the k-buckets of the Kademlia
123/// routing table.
124#[derive(Copy, Clone, Debug, PartialEq, Eq)]
125pub enum BucketInserts {
126    /// Whenever a connection to a peer is established as a
127    /// result of a dialing attempt and that peer is not yet
128    /// in the routing table, it is inserted as long as there
129    /// is a free slot in the corresponding k-bucket. If the
130    /// k-bucket is full but still has a free pending slot,
131    /// it may be inserted into the routing table at a later time if an unresponsive
132    /// disconnected peer is evicted from the bucket.
133    OnConnected,
134    /// New peers and addresses are only added to the routing table via
135    /// explicit calls to [`Behaviour::add_address`].
136    ///
137    /// > **Note**: Even though peers can only get into the
138    /// > routing table as a result of [`Behaviour::add_address`],
139    /// > routing table entries are still updated as peers
140    /// > connect and disconnect (i.e. the order of the entries
141    /// > as well as the network addresses).
142    Manual,
143}
144
145/// The configurable filtering strategies for the acceptance of
146/// incoming records.
147///
148/// This can be used for e.g. signature verification or validating
149/// the accompanying [`Key`].
150///
151/// [`Key`]: crate::record::Key
152#[derive(Copy, Clone, Debug, PartialEq, Eq)]
153pub enum StoreInserts {
154    /// Whenever a (provider) record is received,
155    /// the record is forwarded immediately to the [`RecordStore`].
156    Unfiltered,
157    /// Whenever a (provider) record is received, an event is emitted.
158    /// Provider records generate a [`InboundRequest::AddProvider`] under [`Event::InboundRequest`],
159    /// normal records generate a [`InboundRequest::PutRecord`] under [`Event::InboundRequest`].
160    ///
161    /// When deemed valid, a (provider) record needs to be explicitly stored in
162    /// the [`RecordStore`] via [`RecordStore::put`] or [`RecordStore::add_provider`],
163    /// whichever is applicable. A mutable reference to the [`RecordStore`] can
164    /// be retrieved via [`Behaviour::store_mut`].
165    FilterBoth,
166}
167
168/// The configuration for the `Kademlia` behaviour.
169///
170/// The configuration is consumed by [`Behaviour::new`].
171#[derive(Debug, Clone)]
172pub struct Config {
173    kbucket_pending_timeout: Duration,
174    query_config: QueryConfig,
175    protocol_config: ProtocolConfig,
176    record_ttl: Option<Duration>,
177    record_replication_interval: Option<Duration>,
178    record_publication_interval: Option<Duration>,
179    record_filtering: StoreInserts,
180    provider_record_ttl: Option<Duration>,
181    provider_publication_interval: Option<Duration>,
182    kbucket_inserts: BucketInserts,
183    caching: Caching,
184}
185
186impl Default for Config {
187    fn default() -> Self {
188        Config {
189            kbucket_pending_timeout: Duration::from_secs(60),
190            query_config: QueryConfig::default(),
191            protocol_config: Default::default(),
192            record_ttl: Some(Duration::from_secs(36 * 60 * 60)),
193            record_replication_interval: Some(Duration::from_secs(60 * 60)),
194            record_publication_interval: Some(Duration::from_secs(24 * 60 * 60)),
195            record_filtering: StoreInserts::Unfiltered,
196            provider_publication_interval: Some(Duration::from_secs(12 * 60 * 60)),
197            provider_record_ttl: Some(Duration::from_secs(24 * 60 * 60)),
198            kbucket_inserts: BucketInserts::OnConnected,
199            caching: Caching::Enabled { max_peers: 1 },
200        }
201    }
202}
203
204/// The configuration for Kademlia "write-back" caching after successful
205/// lookups via [`Behaviour::get_record`].
206#[derive(Debug, Clone)]
207pub enum Caching {
208    /// Caching is disabled and the peers closest to records being looked up
209    /// that do not return a record are not tracked, i.e.
210    /// [`GetRecordOk::FinishedWithNoAdditionalRecord`] is always empty.
211    Disabled,
212    /// Up to `max_peers` peers not returning a record that are closest to the key
213    /// being looked up are tracked and returned in [`GetRecordOk::FinishedWithNoAdditionalRecord`].
214    /// The write-back operation must be performed explicitly, if
215    /// desired and after choosing a record from the results, via [`Behaviour::put_record_to`].
216    Enabled { max_peers: u16 },
217}
218
219impl Config {
220    /// Sets custom protocol names.
221    ///
222    /// Kademlia nodes only communicate with other nodes using the same protocol
223    /// name. Using custom name(s) therefore allows to segregate the DHT from
224    /// others, if that is desired.
225    ///
226    /// More than one protocol name can be supplied. In this case the node will
227    /// be able to talk to other nodes supporting any of the provided names.
228    /// Multiple names must be used with caution to avoid network partitioning.
229    pub fn set_protocol_names(&mut self, names: Vec<StreamProtocol>) -> &mut Self {
230        self.protocol_config.set_protocol_names(names);
231        self
232    }
233
234    /// Sets the timeout for a single query.
235    ///
236    /// > **Note**: A single query usually comprises at least as many requests
237    /// > as the replication factor, i.e. this is not a request timeout.
238    ///
239    /// The default is 60 seconds.
240    pub fn set_query_timeout(&mut self, timeout: Duration) -> &mut Self {
241        self.query_config.timeout = timeout;
242        self
243    }
244
245    /// Sets the replication factor to use.
246    ///
247    /// The replication factor determines to how many closest peers
248    /// a record is replicated. The default is [`K_VALUE`].
249    pub fn set_replication_factor(&mut self, replication_factor: NonZeroUsize) -> &mut Self {
250        self.query_config.replication_factor = replication_factor;
251        self
252    }
253
254    /// Sets the allowed level of parallelism for iterative queries.
255    ///
256    /// The `α` parameter in the Kademlia paper. The maximum number of peers
257    /// that an iterative query is allowed to wait for in parallel while
258    /// iterating towards the closest nodes to a target. Defaults to
259    /// `ALPHA_VALUE`.
260    ///
261    /// This only controls the level of parallelism of an iterative query, not
262    /// the level of parallelism of a query to a fixed set of peers.
263    ///
264    /// When used with [`Config::disjoint_query_paths`] it equals
265    /// the amount of disjoint paths used.
266    pub fn set_parallelism(&mut self, parallelism: NonZeroUsize) -> &mut Self {
267        self.query_config.parallelism = parallelism;
268        self
269    }
270
271    /// Require iterative queries to use disjoint paths for increased resiliency
272    /// in the presence of potentially adversarial nodes.
273    ///
274    /// When enabled the number of disjoint paths used equals the configured
275    /// parallelism.
276    ///
277    /// See the S/Kademlia paper for more information on the high level design
278    /// as well as its security improvements.
279    pub fn disjoint_query_paths(&mut self, enabled: bool) -> &mut Self {
280        self.query_config.disjoint_query_paths = enabled;
281        self
282    }
283
284    /// Sets the TTL for stored records.
285    ///
286    /// The TTL should be significantly longer than the (re-)publication
287    /// interval, to avoid premature expiration of records. The default is 36
288    /// hours.
289    ///
290    /// `None` means records never expire.
291    ///
292    /// Does not apply to provider records.
293    pub fn set_record_ttl(&mut self, record_ttl: Option<Duration>) -> &mut Self {
294        self.record_ttl = record_ttl;
295        self
296    }
297
298    /// Sets whether or not records should be filtered before being stored.
299    ///
300    /// See [`StoreInserts`] for the different values.
301    /// Defaults to [`StoreInserts::Unfiltered`].
302    pub fn set_record_filtering(&mut self, filtering: StoreInserts) -> &mut Self {
303        self.record_filtering = filtering;
304        self
305    }
306
307    /// Sets the (re-)replication interval for stored records.
308    ///
309    /// Periodic replication of stored records ensures that the records
310    /// are always replicated to the available nodes closest to the key in the
311    /// context of DHT topology changes (i.e. nodes joining and leaving), thus
312    /// ensuring persistence until the record expires. Replication does not
313    /// prolong the regular lifetime of a record (for otherwise it would live
314    /// forever regardless of the configured TTL). The expiry of a record
315    /// is only extended through re-publication.
316    ///
317    /// This interval should be significantly shorter than the publication
318    /// interval, to ensure persistence between re-publications. The default
319    /// is 1 hour.
320    ///
321    /// `None` means that stored records are never re-replicated.
322    ///
323    /// Does not apply to provider records.
324    pub fn set_replication_interval(&mut self, interval: Option<Duration>) -> &mut Self {
325        self.record_replication_interval = interval;
326        self
327    }
328
329    /// Sets the (re-)publication interval of stored records.
330    ///
331    /// Records persist in the DHT until they expire. By default, published
332    /// records are re-published in regular intervals for as long as the record
333    /// exists in the local storage of the original publisher, thereby extending
334    /// the records lifetime.
335    ///
336    /// This interval should be significantly shorter than the record TTL, to
337    /// ensure records do not expire prematurely. The default is 24 hours.
338    ///
339    /// `None` means that stored records are never automatically re-published.
340    ///
341    /// Does not apply to provider records.
342    pub fn set_publication_interval(&mut self, interval: Option<Duration>) -> &mut Self {
343        self.record_publication_interval = interval;
344        self
345    }
346
347    /// Sets the TTL for provider records.
348    ///
349    /// `None` means that stored provider records never expire.
350    ///
351    /// Must be significantly larger than the provider publication interval.
352    pub fn set_provider_record_ttl(&mut self, ttl: Option<Duration>) -> &mut Self {
353        self.provider_record_ttl = ttl;
354        self
355    }
356
357    /// Sets the interval at which provider records for keys provided
358    /// by the local node are re-published.
359    ///
360    /// `None` means that stored provider records are never automatically
361    /// re-published.
362    ///
363    /// Must be significantly less than the provider record TTL.
364    pub fn set_provider_publication_interval(&mut self, interval: Option<Duration>) -> &mut Self {
365        self.provider_publication_interval = interval;
366        self
367    }
368
369    /// Modifies the maximum allowed size of individual Kademlia packets.
370    ///
371    /// It might be necessary to increase this value if trying to put large
372    /// records.
373    pub fn set_max_packet_size(&mut self, size: usize) -> &mut Self {
374        self.protocol_config.set_max_packet_size(size);
375        self
376    }
377
378    /// Sets the k-bucket insertion strategy for the Kademlia routing table.
379    pub fn set_kbucket_inserts(&mut self, inserts: BucketInserts) -> &mut Self {
380        self.kbucket_inserts = inserts;
381        self
382    }
383
384    /// Sets the [`Caching`] strategy to use for successful lookups.
385    ///
386    /// The default is [`Caching::Enabled`] with a `max_peers` of 1.
387    /// Hence, with default settings and a lookup quorum of 1, a successful lookup
388    /// will result in the record being cached at the closest node to the key that
389    /// did not return the record, i.e. the standard Kademlia behaviour.
390    pub fn set_caching(&mut self, c: Caching) -> &mut Self {
391        self.caching = c;
392        self
393    }
394}
395
396impl<TStore> Behaviour<TStore>
397where
398    TStore: RecordStore + Send + 'static,
399{
400    /// Creates a new `Kademlia` network behaviour with a default configuration.
401    pub fn new(id: PeerId, store: TStore) -> Self {
402        Self::with_config(id, store, Default::default())
403    }
404
405    /// Get the protocol name of this kademlia instance.
406    pub fn protocol_names(&self) -> &[StreamProtocol] {
407        self.protocol_config.protocol_names()
408    }
409
410    /// Creates a new `Kademlia` network behaviour with the given configuration.
411    pub fn with_config(id: PeerId, store: TStore, config: Config) -> Self {
412        let local_key = kbucket::Key::from(id);
413
414        let put_record_job = config
415            .record_replication_interval
416            .or(config.record_publication_interval)
417            .map(|interval| {
418                PutRecordJob::new(
419                    id,
420                    interval,
421                    config.record_publication_interval,
422                    config.record_ttl,
423                )
424            });
425
426        let add_provider_job = config
427            .provider_publication_interval
428            .map(AddProviderJob::new);
429
430        Behaviour {
431            store,
432            caching: config.caching,
433            kbuckets: KBucketsTable::new(local_key, config.kbucket_pending_timeout),
434            kbucket_inserts: config.kbucket_inserts,
435            protocol_config: config.protocol_config,
436            record_filtering: config.record_filtering,
437            queued_events: VecDeque::with_capacity(config.query_config.replication_factor.get()),
438            listen_addresses: Default::default(),
439            queries: QueryPool::new(config.query_config),
440            connected_peers: Default::default(),
441            add_provider_job,
442            put_record_job,
443            record_ttl: config.record_ttl,
444            provider_record_ttl: config.provider_record_ttl,
445            external_addresses: Default::default(),
446            local_peer_id: id,
447            connections: Default::default(),
448            mode: Mode::Client,
449            auto_mode: true,
450            no_events_waker: None,
451        }
452    }
453
454    /// Gets an iterator over immutable references to all running queries.
455    pub fn iter_queries(&self) -> impl Iterator<Item = QueryRef<'_>> {
456        self.queries.iter().filter_map(|query| {
457            if !query.is_finished() {
458                Some(QueryRef { query })
459            } else {
460                None
461            }
462        })
463    }
464
465    /// Gets an iterator over mutable references to all running queries.
466    pub fn iter_queries_mut(&mut self) -> impl Iterator<Item = QueryMut<'_>> {
467        self.queries.iter_mut().filter_map(|query| {
468            if !query.is_finished() {
469                Some(QueryMut { query })
470            } else {
471                None
472            }
473        })
474    }
475
476    /// Gets an immutable reference to a running query, if it exists.
477    pub fn query(&self, id: &QueryId) -> Option<QueryRef<'_>> {
478        self.queries.get(id).and_then(|query| {
479            if !query.is_finished() {
480                Some(QueryRef { query })
481            } else {
482                None
483            }
484        })
485    }
486
487    /// Gets a mutable reference to a running query, if it exists.
488    pub fn query_mut<'a>(&'a mut self, id: &QueryId) -> Option<QueryMut<'a>> {
489        self.queries.get_mut(id).and_then(|query| {
490            if !query.is_finished() {
491                Some(QueryMut { query })
492            } else {
493                None
494            }
495        })
496    }
497
498    /// Adds a known listen address of a peer participating in the DHT to the
499    /// routing table.
500    ///
501    /// Explicitly adding addresses of peers serves two purposes:
502    ///
503    ///   1. In order for a node to join the DHT, it must know about at least
504    ///      one other node of the DHT.
505    ///
506    ///   2. When a remote peer initiates a connection and that peer is not
507    ///      yet in the routing table, the `Kademlia` behaviour must be
508    ///      informed of an address on which that peer is listening for
509    ///      connections before it can be added to the routing table
510    ///      from where it can subsequently be discovered by all peers
511    ///      in the DHT.
512    ///
513    /// If the routing table has been updated as a result of this operation,
514    /// a [`Event::RoutingUpdated`] event is emitted.
515    pub fn add_address(&mut self, peer: &PeerId, address: Multiaddr) -> RoutingUpdate {
516        // ensuring address is a fully-qualified /p2p multiaddr
517        let Ok(address) = address.with_p2p(*peer) else {
518            return RoutingUpdate::Failed;
519        };
520        let key = kbucket::Key::from(*peer);
521        match self.kbuckets.entry(&key) {
522            kbucket::Entry::Present(mut entry, _) => {
523                if entry.value().insert(address) {
524                    self.queued_events
525                        .push_back(ToSwarm::GenerateEvent(Event::RoutingUpdated {
526                            peer: *peer,
527                            is_new_peer: false,
528                            addresses: entry.value().clone(),
529                            old_peer: None,
530                            bucket_range: self
531                                .kbuckets
532                                .bucket(&key)
533                                .map(|b| b.range())
534                                .expect("Not kbucket::Entry::SelfEntry."),
535                        }))
536                }
537                RoutingUpdate::Success
538            }
539            kbucket::Entry::Pending(mut entry, _) => {
540                entry.value().insert(address);
541                RoutingUpdate::Pending
542            }
543            kbucket::Entry::Absent(entry) => {
544                let addresses = Addresses::new(address);
545                let status = if self.connected_peers.contains(peer) {
546                    NodeStatus::Connected
547                } else {
548                    NodeStatus::Disconnected
549                };
550                match entry.insert(addresses.clone(), status) {
551                    kbucket::InsertResult::Inserted => {
552                        self.queued_events.push_back(ToSwarm::GenerateEvent(
553                            Event::RoutingUpdated {
554                                peer: *peer,
555                                is_new_peer: true,
556                                addresses,
557                                old_peer: None,
558                                bucket_range: self
559                                    .kbuckets
560                                    .bucket(&key)
561                                    .map(|b| b.range())
562                                    .expect("Not kbucket::Entry::SelfEntry."),
563                            },
564                        ));
565                        RoutingUpdate::Success
566                    }
567                    kbucket::InsertResult::Full => {
568                        tracing::debug!(%peer, "Bucket full. Peer not added to routing table");
569                        RoutingUpdate::Failed
570                    }
571                    kbucket::InsertResult::Pending { disconnected } => {
572                        self.queued_events.push_back(ToSwarm::Dial {
573                            opts: DialOpts::peer_id(disconnected.into_preimage())
574                                .condition(dial_opts::PeerCondition::NotDialing)
575                                .build(),
576                        });
577                        RoutingUpdate::Pending
578                    }
579                }
580            }
581            kbucket::Entry::SelfEntry => RoutingUpdate::Failed,
582        }
583    }
584
585    /// Removes an address of a peer from the routing table.
586    ///
587    /// If the given address is the last address of the peer in the
588    /// routing table, the peer is removed from the routing table
589    /// and `Some` is returned with a view of the removed entry.
590    /// The same applies if the peer is currently pending insertion
591    /// into the routing table.
592    ///
593    /// If the given peer or address is not in the routing table,
594    /// this is a no-op.
595    pub fn remove_address(
596        &mut self,
597        peer: &PeerId,
598        address: &Multiaddr,
599    ) -> Option<kbucket::EntryView<kbucket::Key<PeerId>, Addresses>> {
600        let address = &address.to_owned().with_p2p(*peer).ok()?;
601        let key = kbucket::Key::from(*peer);
602        match self.kbuckets.entry(&key) {
603            kbucket::Entry::Present(mut entry, _) => {
604                if entry.value().remove(address).is_err() {
605                    Some(entry.remove()) // it is the last address, thus remove the peer.
606                } else {
607                    None
608                }
609            }
610            kbucket::Entry::Pending(mut entry, _) => {
611                if entry.value().remove(address).is_err() {
612                    Some(entry.remove()) // it is the last address, thus remove the peer.
613                } else {
614                    None
615                }
616            }
617            kbucket::Entry::Absent(..) | kbucket::Entry::SelfEntry => None,
618        }
619    }
620
621    /// Removes a peer from the routing table.
622    ///
623    /// Returns `None` if the peer was not in the routing table,
624    /// not even pending insertion.
625    pub fn remove_peer(
626        &mut self,
627        peer: &PeerId,
628    ) -> Option<kbucket::EntryView<kbucket::Key<PeerId>, Addresses>> {
629        let key = kbucket::Key::from(*peer);
630        match self.kbuckets.entry(&key) {
631            kbucket::Entry::Present(entry, _) => Some(entry.remove()),
632            kbucket::Entry::Pending(entry, _) => Some(entry.remove()),
633            kbucket::Entry::Absent(..) | kbucket::Entry::SelfEntry => None,
634        }
635    }
636
637    /// Returns an iterator over all non-empty buckets in the routing table.
638    pub fn kbuckets(
639        &mut self,
640    ) -> impl Iterator<Item = kbucket::KBucketRef<'_, kbucket::Key<PeerId>, Addresses>> {
641        self.kbuckets.iter().filter(|b| !b.is_empty())
642    }
643
644    /// Returns the k-bucket for the distance to the given key.
645    ///
646    /// Returns `None` if the given key refers to the local key.
647    pub fn kbucket<K>(
648        &mut self,
649        key: K,
650    ) -> Option<kbucket::KBucketRef<'_, kbucket::Key<PeerId>, Addresses>>
651    where
652        K: Into<kbucket::Key<K>> + Clone,
653    {
654        self.kbuckets.bucket(&key.into())
655    }
656
657    /// Initiates an iterative query for the closest peers to the given key.
658    ///
659    /// The result of the query is delivered in a
660    /// [`Event::OutboundQueryProgressed{QueryResult::GetClosestPeers}`].
661    pub fn get_closest_peers<K>(&mut self, key: K) -> QueryId
662    where
663        K: Into<kbucket::Key<K>> + Into<Vec<u8>> + Clone,
664    {
665        let target: kbucket::Key<K> = key.clone().into();
666        let key: Vec<u8> = key.into();
667        let info = QueryInfo::GetClosestPeers {
668            key,
669            step: ProgressStep::first(),
670        };
671        let peer_keys: Vec<kbucket::Key<PeerId>> = self.kbuckets.closest_keys(&target).collect();
672        let inner = QueryInner::new(info);
673        self.queries.add_iter_closest(target, peer_keys, inner)
674    }
675
676    /// Returns closest peers to the given key; takes peers from local routing table only.
677    pub fn get_closest_local_peers<'a, K: Clone>(
678        &'a mut self,
679        key: &'a kbucket::Key<K>,
680    ) -> impl Iterator<Item = kbucket::Key<PeerId>> + 'a {
681        self.kbuckets.closest_keys(key)
682    }
683
684    /// Performs a lookup for a record in the DHT.
685    ///
686    /// The result of this operation is delivered in a
687    /// [`Event::OutboundQueryProgressed{QueryResult::GetRecord}`].
688    pub fn get_record(&mut self, key: record::Key) -> QueryId {
689        let record = if let Some(record) = self.store.get(&key) {
690            if record.is_expired(Instant::now()) {
691                self.store.remove(&key);
692                None
693            } else {
694                Some(PeerRecord {
695                    peer: None,
696                    record: record.into_owned(),
697                })
698            }
699        } else {
700            None
701        };
702
703        let step = ProgressStep::first();
704
705        let target = kbucket::Key::new(key.clone());
706        let info = if record.is_some() {
707            QueryInfo::GetRecord {
708                key,
709                step: step.next(),
710                found_a_record: true,
711                cache_candidates: BTreeMap::new(),
712            }
713        } else {
714            QueryInfo::GetRecord {
715                key,
716                step: step.clone(),
717                found_a_record: false,
718                cache_candidates: BTreeMap::new(),
719            }
720        };
721        let peers = self.kbuckets.closest_keys(&target);
722        let inner = QueryInner::new(info);
723        let id = self.queries.add_iter_closest(target.clone(), peers, inner);
724
725        // No queries were actually done for the results yet.
726        let stats = QueryStats::empty();
727
728        if let Some(record) = record {
729            self.queued_events
730                .push_back(ToSwarm::GenerateEvent(Event::OutboundQueryProgressed {
731                    id,
732                    result: QueryResult::GetRecord(Ok(GetRecordOk::FoundRecord(record))),
733                    step,
734                    stats,
735                }));
736        }
737
738        id
739    }
740
741    /// Stores a record in the DHT, locally as well as at the nodes
742    /// closest to the key as per the xor distance metric.
743    ///
744    /// Returns `Ok` if a record has been stored locally, providing the
745    /// `QueryId` of the initial query that replicates the record in the DHT.
746    /// The result of the query is eventually reported as a
747    /// [`Event::OutboundQueryProgressed{QueryResult::PutRecord}`].
748    ///
749    /// The record is always stored locally with the given expiration. If the record's
750    /// expiration is `None`, the common case, it does not expire in local storage
751    /// but is still replicated with the configured record TTL. To remove the record
752    /// locally and stop it from being re-published in the DHT, see [`Behaviour::remove_record`].
753    ///
754    /// After the initial publication of the record, it is subject to (re-)replication
755    /// and (re-)publication as per the configured intervals. Periodic (re-)publication
756    /// does not update the record's expiration in local storage, thus a given record
757    /// with an explicit expiration will always expire at that instant and until then
758    /// is subject to regular (re-)replication and (re-)publication.
759    pub fn put_record(
760        &mut self,
761        mut record: Record,
762        quorum: Quorum,
763    ) -> Result<QueryId, store::Error> {
764        record.publisher = Some(*self.kbuckets.local_key().preimage());
765        self.store.put(record.clone())?;
766        record.expires = record
767            .expires
768            .or_else(|| self.record_ttl.map(|ttl| Instant::now() + ttl));
769        let quorum = quorum.eval(self.queries.config().replication_factor);
770        let target = kbucket::Key::new(record.key.clone());
771        let peers = self.kbuckets.closest_keys(&target);
772        let context = PutRecordContext::Publish;
773        let info = QueryInfo::PutRecord {
774            context,
775            record,
776            quorum,
777            phase: PutRecordPhase::GetClosestPeers,
778        };
779        let inner = QueryInner::new(info);
780        Ok(self.queries.add_iter_closest(target.clone(), peers, inner))
781    }
782
783    /// Stores a record at specific peers, without storing it locally.
784    ///
785    /// The given [`Quorum`] is understood in the context of the total
786    /// number of distinct peers given.
787    ///
788    /// If the record's expiration is `None`, the configured record TTL is used.
789    ///
790    /// > **Note**: This is not a regular Kademlia DHT operation. It needs to be
791    /// > used to selectively update or store a record to specific peers
792    /// > for the purpose of e.g. making sure these peers have the latest
793    /// > "version" of a record or to "cache" a record at further peers
794    /// > to increase the lookup success rate on the DHT for other peers.
795    /// >
796    /// > In particular, there is no automatic storing of records performed, and this
797    /// > method must be used to ensure the standard Kademlia
798    /// > procedure of "caching" (i.e. storing) a found record at the closest
799    /// > node to the key that _did not_ return it.
800    pub fn put_record_to<I>(&mut self, mut record: Record, peers: I, quorum: Quorum) -> QueryId
801    where
802        I: ExactSizeIterator<Item = PeerId>,
803    {
804        let quorum = if peers.len() > 0 {
805            quorum.eval(NonZeroUsize::new(peers.len()).expect("> 0"))
806        } else {
807            // If no peers are given, we just let the query fail immediately
808            // due to the fact that the quorum must be at least one, instead of
809            // introducing a new kind of error.
810            NonZeroUsize::new(1).expect("1 > 0")
811        };
812        record.expires = record
813            .expires
814            .or_else(|| self.record_ttl.map(|ttl| Instant::now() + ttl));
815        let context = PutRecordContext::Custom;
816        let info = QueryInfo::PutRecord {
817            context,
818            record,
819            quorum,
820            phase: PutRecordPhase::PutRecord {
821                success: Vec::new(),
822                get_closest_peers_stats: QueryStats::empty(),
823            },
824        };
825        let inner = QueryInner::new(info);
826        self.queries.add_fixed(peers, inner)
827    }
828
829    /// Removes the record with the given key from _local_ storage,
830    /// if the local node is the publisher of the record.
831    ///
832    /// Has no effect if a record for the given key is stored locally but
833    /// the local node is not a publisher of the record.
834    ///
835    /// This is a _local_ operation. However, it also has the effect that
836    /// the record will no longer be periodically re-published, allowing the
837    /// record to eventually expire throughout the DHT.
838    pub fn remove_record(&mut self, key: &record::Key) {
839        if let Some(r) = self.store.get(key) {
840            if r.publisher.as_ref() == Some(self.kbuckets.local_key().preimage()) {
841                self.store.remove(key)
842            }
843        }
844    }
845
846    /// Gets a mutable reference to the record store.
847    pub fn store_mut(&mut self) -> &mut TStore {
848        &mut self.store
849    }
850
851    /// Bootstraps the local node to join the DHT.
852    ///
853    /// Bootstrapping is a multi-step operation that starts with a lookup of the local node's
854    /// own ID in the DHT. This introduces the local node to the other nodes
855    /// in the DHT and populates its routing table with the closest neighbours.
856    ///
857    /// Subsequently, all buckets farther from the bucket of the closest neighbour are
858    /// refreshed by initiating an additional bootstrapping query for each such
859    /// bucket with random keys.
860    ///
861    /// Returns `Ok` if bootstrapping has been initiated with a self-lookup, providing the
862    /// `QueryId` for the entire bootstrapping process. The progress of bootstrapping is
863    /// reported via [`Event::OutboundQueryProgressed{QueryResult::Bootstrap}`] events,
864    /// with one such event per bootstrapping query.
865    ///
866    /// Returns `Err` if bootstrapping is impossible due an empty routing table.
867    ///
868    /// > **Note**: Bootstrapping requires at least one node of the DHT to be known.
869    /// > See [`Behaviour::add_address`].
870    pub fn bootstrap(&mut self) -> Result<QueryId, NoKnownPeers> {
871        let local_key = self.kbuckets.local_key().clone();
872        let info = QueryInfo::Bootstrap {
873            peer: *local_key.preimage(),
874            remaining: None,
875            step: ProgressStep::first(),
876        };
877        let peers = self.kbuckets.closest_keys(&local_key).collect::<Vec<_>>();
878        if peers.is_empty() {
879            Err(NoKnownPeers())
880        } else {
881            let inner = QueryInner::new(info);
882            Ok(self.queries.add_iter_closest(local_key, peers, inner))
883        }
884    }
885
886    /// Establishes the local node as a provider of a value for the given key.
887    ///
888    /// This operation publishes a provider record with the given key and
889    /// identity of the local node to the peers closest to the key, thus establishing
890    /// the local node as a provider.
891    ///
892    /// Returns `Ok` if a provider record has been stored locally, providing the
893    /// `QueryId` of the initial query that announces the local node as a provider.
894    ///
895    /// The publication of the provider records is periodically repeated as per the
896    /// configured interval, to renew the expiry and account for changes to the DHT
897    /// topology. A provider record may be removed from local storage and
898    /// thus no longer re-published by calling [`Behaviour::stop_providing`].
899    ///
900    /// In contrast to the standard Kademlia push-based model for content distribution
901    /// implemented by [`Behaviour::put_record`], the provider API implements a
902    /// pull-based model that may be used in addition or as an alternative.
903    /// The means by which the actual value is obtained from a provider is out of scope
904    /// of the libp2p Kademlia provider API.
905    ///
906    /// The results of the (repeated) provider announcements sent by this node are
907    /// reported via [`Event::OutboundQueryProgressed{QueryResult::StartProviding}`].
908    pub fn start_providing(&mut self, key: record::Key) -> Result<QueryId, store::Error> {
909        // Note: We store our own provider records locally without local addresses
910        // to avoid redundant storage and outdated addresses. Instead these are
911        // acquired on demand when returning a `ProviderRecord` for the local node.
912        let local_addrs = Vec::new();
913        let record = ProviderRecord::new(
914            key.clone(),
915            *self.kbuckets.local_key().preimage(),
916            local_addrs,
917        );
918        self.store.add_provider(record)?;
919        let target = kbucket::Key::new(key.clone());
920        let peers = self.kbuckets.closest_keys(&target);
921        let context = AddProviderContext::Publish;
922        let info = QueryInfo::AddProvider {
923            context,
924            key,
925            phase: AddProviderPhase::GetClosestPeers,
926        };
927        let inner = QueryInner::new(info);
928        let id = self.queries.add_iter_closest(target.clone(), peers, inner);
929        Ok(id)
930    }
931
932    /// Stops the local node from announcing that it is a provider for the given key.
933    ///
934    /// This is a local operation. The local node will still be considered as a
935    /// provider for the key by other nodes until these provider records expire.
936    pub fn stop_providing(&mut self, key: &record::Key) {
937        self.store
938            .remove_provider(key, self.kbuckets.local_key().preimage());
939    }
940
941    /// Performs a lookup for providers of a value to the given key.
942    ///
943    /// The result of this operation is delivered in a
944    /// reported via [`Event::OutboundQueryProgressed{QueryResult::GetProviders}`].
945    pub fn get_providers(&mut self, key: record::Key) -> QueryId {
946        let providers: HashSet<_> = self
947            .store
948            .providers(&key)
949            .into_iter()
950            .filter(|p| !p.is_expired(Instant::now()))
951            .map(|p| p.provider)
952            .collect();
953
954        let step = ProgressStep::first();
955
956        let info = QueryInfo::GetProviders {
957            key: key.clone(),
958            providers_found: providers.len(),
959            step: if providers.is_empty() {
960                step.clone()
961            } else {
962                step.next()
963            },
964        };
965
966        let target = kbucket::Key::new(key.clone());
967        let peers = self.kbuckets.closest_keys(&target);
968        let inner = QueryInner::new(info);
969        let id = self.queries.add_iter_closest(target.clone(), peers, inner);
970
971        // No queries were actually done for the results yet.
972        let stats = QueryStats::empty();
973
974        if !providers.is_empty() {
975            self.queued_events
976                .push_back(ToSwarm::GenerateEvent(Event::OutboundQueryProgressed {
977                    id,
978                    result: QueryResult::GetProviders(Ok(GetProvidersOk::FoundProviders {
979                        key,
980                        providers,
981                    })),
982                    step,
983                    stats,
984                }));
985        }
986        id
987    }
988
989    /// Set the [`Mode`] in which we should operate.
990    ///
991    /// By default, we are in [`Mode::Client`] and will swap into [`Mode::Server`] as soon as we have a confirmed, external address via [`FromSwarm::ExternalAddrConfirmed`].
992    ///
993    /// Setting a mode via this function disables this automatic behaviour and unconditionally operates in the specified mode.
994    /// To reactivate the automatic configuration, pass [`None`] instead.
995    pub fn set_mode(&mut self, mode: Option<Mode>) {
996        match mode {
997            Some(mode) => {
998                self.mode = mode;
999                self.auto_mode = false;
1000                self.reconfigure_mode();
1001            }
1002            None => {
1003                self.auto_mode = true;
1004                self.determine_mode_from_external_addresses();
1005            }
1006        }
1007
1008        if let Some(waker) = self.no_events_waker.take() {
1009            waker.wake();
1010        }
1011    }
1012
1013    fn reconfigure_mode(&mut self) {
1014        if self.connections.is_empty() {
1015            return;
1016        }
1017
1018        let num_connections = self.connections.len();
1019
1020        tracing::debug!(
1021            "Re-configuring {} established connection{}",
1022            num_connections,
1023            if num_connections > 1 { "s" } else { "" }
1024        );
1025
1026        self.queued_events
1027            .extend(
1028                self.connections
1029                    .iter()
1030                    .map(|(conn_id, peer_id)| ToSwarm::NotifyHandler {
1031                        peer_id: *peer_id,
1032                        handler: NotifyHandler::One(*conn_id),
1033                        event: HandlerIn::ReconfigureMode {
1034                            new_mode: self.mode,
1035                        },
1036                    }),
1037            );
1038    }
1039
1040    fn determine_mode_from_external_addresses(&mut self) {
1041        let old_mode = self.mode;
1042
1043        self.mode = match (self.external_addresses.as_slice(), self.mode) {
1044            ([], Mode::Server) => {
1045                tracing::debug!("Switching to client-mode because we no longer have any confirmed external addresses");
1046
1047                Mode::Client
1048            }
1049            ([], Mode::Client) => {
1050                // Previously client-mode, now also client-mode because no external addresses.
1051
1052                Mode::Client
1053            }
1054            (confirmed_external_addresses, Mode::Client) => {
1055                if tracing::enabled!(Level::DEBUG) {
1056                    let confirmed_external_addresses =
1057                        to_comma_separated_list(confirmed_external_addresses);
1058
1059                    tracing::debug!("Switching to server-mode assuming that one of [{confirmed_external_addresses}] is externally reachable");
1060                }
1061
1062                Mode::Server
1063            }
1064            (confirmed_external_addresses, Mode::Server) => {
1065                debug_assert!(
1066                    !confirmed_external_addresses.is_empty(),
1067                    "Previous match arm handled empty list"
1068                );
1069
1070                // Previously, server-mode, now also server-mode because > 1 external address. Don't log anything to avoid spam.
1071
1072                Mode::Server
1073            }
1074        };
1075
1076        self.reconfigure_mode();
1077
1078        if old_mode != self.mode {
1079            self.queued_events
1080                .push_back(ToSwarm::GenerateEvent(Event::ModeChanged {
1081                    new_mode: self.mode,
1082                }));
1083        }
1084    }
1085
1086    /// Processes discovered peers from a successful request in an iterative `Query`.
1087    fn discovered<'a, I>(&'a mut self, query_id: &QueryId, source: &PeerId, peers: I)
1088    where
1089        I: Iterator<Item = &'a KadPeer> + Clone,
1090    {
1091        let local_id = self.kbuckets.local_key().preimage();
1092        let others_iter = peers.filter(|p| &p.node_id != local_id);
1093        if let Some(query) = self.queries.get_mut(query_id) {
1094            tracing::trace!(peer=%source, query=?query_id, "Request to peer in query succeeded");
1095            for peer in others_iter.clone() {
1096                tracing::trace!(
1097                    ?peer,
1098                    %source,
1099                    query=?query_id,
1100                    "Peer reported by source in query"
1101                );
1102                let addrs = peer.multiaddrs.iter().cloned().collect();
1103                query.inner.addresses.insert(peer.node_id, addrs);
1104            }
1105            query.on_success(source, others_iter.cloned().map(|kp| kp.node_id))
1106        }
1107    }
1108
1109    /// Finds the closest peers to a `target` in the context of a request by
1110    /// the `source` peer, such that the `source` peer is never included in the
1111    /// result.
1112    fn find_closest<T: Clone>(
1113        &mut self,
1114        target: &kbucket::Key<T>,
1115        source: &PeerId,
1116    ) -> Vec<KadPeer> {
1117        if target == self.kbuckets.local_key() {
1118            Vec::new()
1119        } else {
1120            self.kbuckets
1121                .closest(target)
1122                .filter(|e| e.node.key.preimage() != source)
1123                .take(self.queries.config().replication_factor.get())
1124                .map(KadPeer::from)
1125                .collect()
1126        }
1127    }
1128
1129    /// Collects all peers who are known to be providers of the value for a given `Multihash`.
1130    fn provider_peers(&mut self, key: &record::Key, source: &PeerId) -> Vec<KadPeer> {
1131        let kbuckets = &mut self.kbuckets;
1132        let connected = &mut self.connected_peers;
1133        let listen_addresses = &self.listen_addresses;
1134        let external_addresses = &self.external_addresses;
1135
1136        self.store
1137            .providers(key)
1138            .into_iter()
1139            .filter_map(move |p| {
1140                if &p.provider != source {
1141                    let node_id = p.provider;
1142                    let multiaddrs = p.addresses;
1143                    let connection_ty = if connected.contains(&node_id) {
1144                        ConnectionType::Connected
1145                    } else {
1146                        ConnectionType::NotConnected
1147                    };
1148                    if multiaddrs.is_empty() {
1149                        // The provider is either the local node and we fill in
1150                        // the local addresses on demand, or it is a legacy
1151                        // provider record without addresses, in which case we
1152                        // try to find addresses in the routing table, as was
1153                        // done before provider records were stored along with
1154                        // their addresses.
1155                        if &node_id == kbuckets.local_key().preimage() {
1156                            Some(
1157                                listen_addresses
1158                                    .iter()
1159                                    .chain(external_addresses.iter())
1160                                    .cloned()
1161                                    .collect::<Vec<_>>(),
1162                            )
1163                        } else {
1164                            let key = kbucket::Key::from(node_id);
1165                            kbuckets
1166                                .entry(&key)
1167                                .view()
1168                                .map(|e| e.node.value.clone().into_vec())
1169                        }
1170                    } else {
1171                        Some(multiaddrs)
1172                    }
1173                    .map(|multiaddrs| KadPeer {
1174                        node_id,
1175                        multiaddrs,
1176                        connection_ty,
1177                    })
1178                } else {
1179                    None
1180                }
1181            })
1182            .take(self.queries.config().replication_factor.get())
1183            .collect()
1184    }
1185
1186    /// Starts an iterative `ADD_PROVIDER` query for the given key.
1187    fn start_add_provider(&mut self, key: record::Key, context: AddProviderContext) {
1188        let info = QueryInfo::AddProvider {
1189            context,
1190            key: key.clone(),
1191            phase: AddProviderPhase::GetClosestPeers,
1192        };
1193        let target = kbucket::Key::new(key);
1194        let peers = self.kbuckets.closest_keys(&target);
1195        let inner = QueryInner::new(info);
1196        self.queries.add_iter_closest(target.clone(), peers, inner);
1197    }
1198
1199    /// Starts an iterative `PUT_VALUE` query for the given record.
1200    fn start_put_record(&mut self, record: Record, quorum: Quorum, context: PutRecordContext) {
1201        let quorum = quorum.eval(self.queries.config().replication_factor);
1202        let target = kbucket::Key::new(record.key.clone());
1203        let peers = self.kbuckets.closest_keys(&target);
1204        let info = QueryInfo::PutRecord {
1205            record,
1206            quorum,
1207            context,
1208            phase: PutRecordPhase::GetClosestPeers,
1209        };
1210        let inner = QueryInner::new(info);
1211        self.queries.add_iter_closest(target.clone(), peers, inner);
1212    }
1213
1214    /// Updates the routing table with a new connection status and address of a peer.
1215    fn connection_updated(
1216        &mut self,
1217        peer: PeerId,
1218        address: Option<Multiaddr>,
1219        new_status: NodeStatus,
1220    ) {
1221        let key = kbucket::Key::from(peer);
1222        match self.kbuckets.entry(&key) {
1223            kbucket::Entry::Present(mut entry, old_status) => {
1224                if old_status != new_status {
1225                    entry.update(new_status)
1226                }
1227                if let Some(address) = address {
1228                    if entry.value().insert(address) {
1229                        self.queued_events.push_back(ToSwarm::GenerateEvent(
1230                            Event::RoutingUpdated {
1231                                peer,
1232                                is_new_peer: false,
1233                                addresses: entry.value().clone(),
1234                                old_peer: None,
1235                                bucket_range: self
1236                                    .kbuckets
1237                                    .bucket(&key)
1238                                    .map(|b| b.range())
1239                                    .expect("Not kbucket::Entry::SelfEntry."),
1240                            },
1241                        ))
1242                    }
1243                }
1244            }
1245
1246            kbucket::Entry::Pending(mut entry, old_status) => {
1247                if let Some(address) = address {
1248                    entry.value().insert(address);
1249                }
1250                if old_status != new_status {
1251                    entry.update(new_status);
1252                }
1253            }
1254
1255            kbucket::Entry::Absent(entry) => {
1256                // Only connected nodes with a known address are newly inserted.
1257                if new_status != NodeStatus::Connected {
1258                    return;
1259                }
1260                match (address, self.kbucket_inserts) {
1261                    (None, _) => {
1262                        self.queued_events
1263                            .push_back(ToSwarm::GenerateEvent(Event::UnroutablePeer { peer }));
1264                    }
1265                    (Some(a), BucketInserts::Manual) => {
1266                        self.queued_events
1267                            .push_back(ToSwarm::GenerateEvent(Event::RoutablePeer {
1268                                peer,
1269                                address: a,
1270                            }));
1271                    }
1272                    (Some(a), BucketInserts::OnConnected) => {
1273                        let addresses = Addresses::new(a);
1274                        match entry.insert(addresses.clone(), new_status) {
1275                            kbucket::InsertResult::Inserted => {
1276                                let event = Event::RoutingUpdated {
1277                                    peer,
1278                                    is_new_peer: true,
1279                                    addresses,
1280                                    old_peer: None,
1281                                    bucket_range: self
1282                                        .kbuckets
1283                                        .bucket(&key)
1284                                        .map(|b| b.range())
1285                                        .expect("Not kbucket::Entry::SelfEntry."),
1286                                };
1287                                self.queued_events.push_back(ToSwarm::GenerateEvent(event));
1288                            }
1289                            kbucket::InsertResult::Full => {
1290                                tracing::debug!(
1291                                    %peer,
1292                                    "Bucket full. Peer not added to routing table"
1293                                );
1294                                let address = addresses.first().clone();
1295                                self.queued_events.push_back(ToSwarm::GenerateEvent(
1296                                    Event::RoutablePeer { peer, address },
1297                                ));
1298                            }
1299                            kbucket::InsertResult::Pending { disconnected } => {
1300                                let address = addresses.first().clone();
1301                                self.queued_events.push_back(ToSwarm::GenerateEvent(
1302                                    Event::PendingRoutablePeer { peer, address },
1303                                ));
1304
1305                                // `disconnected` might already be in the process of re-connecting.
1306                                // In other words `disconnected` might have already re-connected but
1307                                // is not yet confirmed to support the Kademlia protocol via
1308                                // [`HandlerEvent::ProtocolConfirmed`].
1309                                //
1310                                // Only try dialing peer if not currently connected.
1311                                if !self.connected_peers.contains(disconnected.preimage()) {
1312                                    self.queued_events.push_back(ToSwarm::Dial {
1313                                        opts: DialOpts::peer_id(disconnected.into_preimage())
1314                                            .condition(dial_opts::PeerCondition::NotDialing)
1315                                            .build(),
1316                                    })
1317                                }
1318                            }
1319                        }
1320                    }
1321                }
1322            }
1323            _ => {}
1324        }
1325    }
1326
1327    /// Handles a finished (i.e. successful) query.
1328    fn query_finished(&mut self, q: Query<QueryInner>) -> Option<Event> {
1329        let query_id = q.id();
1330        tracing::trace!(query=?query_id, "Query finished");
1331        let result = q.into_result();
1332        match result.inner.info {
1333            QueryInfo::Bootstrap {
1334                peer,
1335                remaining,
1336                mut step,
1337            } => {
1338                let local_key = self.kbuckets.local_key().clone();
1339                let mut remaining = remaining.unwrap_or_else(|| {
1340                    debug_assert_eq!(&peer, local_key.preimage());
1341                    // The lookup for the local key finished. To complete the bootstrap process,
1342                    // a bucket refresh should be performed for every bucket farther away than
1343                    // the first non-empty bucket (which are most likely no more than the last
1344                    // few, i.e. farthest, buckets).
1345                    self.kbuckets
1346                        .iter()
1347                        .skip_while(|b| b.is_empty())
1348                        .skip(1) // Skip the bucket with the closest neighbour.
1349                        .map(|b| {
1350                            // Try to find a key that falls into the bucket. While such keys can
1351                            // be generated fully deterministically, the current libp2p kademlia
1352                            // wire protocol requires transmission of the preimages of the actual
1353                            // keys in the DHT keyspace, hence for now this is just a "best effort"
1354                            // to find a key that hashes into a specific bucket. The probabilities
1355                            // of finding a key in the bucket `b` with as most 16 trials are as
1356                            // follows:
1357                            //
1358                            // Pr(bucket-255) = 1 - (1/2)^16   ~= 1
1359                            // Pr(bucket-254) = 1 - (3/4)^16   ~= 1
1360                            // Pr(bucket-253) = 1 - (7/8)^16   ~= 0.88
1361                            // Pr(bucket-252) = 1 - (15/16)^16 ~= 0.64
1362                            // ...
1363                            let mut target = kbucket::Key::from(PeerId::random());
1364                            for _ in 0..16 {
1365                                let d = local_key.distance(&target);
1366                                if b.contains(&d) {
1367                                    break;
1368                                }
1369                                target = kbucket::Key::from(PeerId::random());
1370                            }
1371                            target
1372                        })
1373                        .collect::<Vec<_>>()
1374                        .into_iter()
1375                });
1376
1377                let num_remaining = remaining.len() as u32;
1378
1379                if let Some(target) = remaining.next() {
1380                    let info = QueryInfo::Bootstrap {
1381                        peer: *target.preimage(),
1382                        remaining: Some(remaining),
1383                        step: step.next(),
1384                    };
1385                    let peers = self.kbuckets.closest_keys(&target);
1386                    let inner = QueryInner::new(info);
1387                    self.queries
1388                        .continue_iter_closest(query_id, target.clone(), peers, inner);
1389                } else {
1390                    step.last = true;
1391                };
1392
1393                Some(Event::OutboundQueryProgressed {
1394                    id: query_id,
1395                    stats: result.stats,
1396                    result: QueryResult::Bootstrap(Ok(BootstrapOk {
1397                        peer,
1398                        num_remaining,
1399                    })),
1400                    step,
1401                })
1402            }
1403
1404            QueryInfo::GetClosestPeers { key, mut step } => {
1405                step.last = true;
1406
1407                Some(Event::OutboundQueryProgressed {
1408                    id: query_id,
1409                    stats: result.stats,
1410                    result: QueryResult::GetClosestPeers(Ok(GetClosestPeersOk {
1411                        key,
1412                        peers: result.peers.collect(),
1413                    })),
1414                    step,
1415                })
1416            }
1417
1418            QueryInfo::GetProviders { mut step, .. } => {
1419                step.last = true;
1420
1421                Some(Event::OutboundQueryProgressed {
1422                    id: query_id,
1423                    stats: result.stats,
1424                    result: QueryResult::GetProviders(Ok(
1425                        GetProvidersOk::FinishedWithNoAdditionalRecord {
1426                            closest_peers: result.peers.collect(),
1427                        },
1428                    )),
1429                    step,
1430                })
1431            }
1432
1433            QueryInfo::AddProvider {
1434                context,
1435                key,
1436                phase: AddProviderPhase::GetClosestPeers,
1437            } => {
1438                let provider_id = self.local_peer_id;
1439                let external_addresses = self.external_addresses.iter().cloned().collect();
1440                let inner = QueryInner::new(QueryInfo::AddProvider {
1441                    context,
1442                    key,
1443                    phase: AddProviderPhase::AddProvider {
1444                        provider_id,
1445                        external_addresses,
1446                        get_closest_peers_stats: result.stats,
1447                    },
1448                });
1449                self.queries.continue_fixed(query_id, result.peers, inner);
1450                None
1451            }
1452
1453            QueryInfo::AddProvider {
1454                context,
1455                key,
1456                phase:
1457                    AddProviderPhase::AddProvider {
1458                        get_closest_peers_stats,
1459                        ..
1460                    },
1461            } => match context {
1462                AddProviderContext::Publish => Some(Event::OutboundQueryProgressed {
1463                    id: query_id,
1464                    stats: get_closest_peers_stats.merge(result.stats),
1465                    result: QueryResult::StartProviding(Ok(AddProviderOk { key })),
1466                    step: ProgressStep::first_and_last(),
1467                }),
1468                AddProviderContext::Republish => Some(Event::OutboundQueryProgressed {
1469                    id: query_id,
1470                    stats: get_closest_peers_stats.merge(result.stats),
1471                    result: QueryResult::RepublishProvider(Ok(AddProviderOk { key })),
1472                    step: ProgressStep::first_and_last(),
1473                }),
1474            },
1475
1476            QueryInfo::GetRecord {
1477                key,
1478                mut step,
1479                found_a_record,
1480                cache_candidates,
1481            } => {
1482                step.last = true;
1483
1484                let results = if found_a_record {
1485                    Ok(GetRecordOk::FinishedWithNoAdditionalRecord { cache_candidates })
1486                } else {
1487                    Err(GetRecordError::NotFound {
1488                        key,
1489                        closest_peers: result.peers.collect(),
1490                    })
1491                };
1492                Some(Event::OutboundQueryProgressed {
1493                    id: query_id,
1494                    stats: result.stats,
1495                    result: QueryResult::GetRecord(results),
1496                    step,
1497                })
1498            }
1499
1500            QueryInfo::PutRecord {
1501                context,
1502                record,
1503                quorum,
1504                phase: PutRecordPhase::GetClosestPeers,
1505            } => {
1506                let info = QueryInfo::PutRecord {
1507                    context,
1508                    record,
1509                    quorum,
1510                    phase: PutRecordPhase::PutRecord {
1511                        success: vec![],
1512                        get_closest_peers_stats: result.stats,
1513                    },
1514                };
1515                let inner = QueryInner::new(info);
1516                self.queries.continue_fixed(query_id, result.peers, inner);
1517                None
1518            }
1519
1520            QueryInfo::PutRecord {
1521                context,
1522                record,
1523                quorum,
1524                phase:
1525                    PutRecordPhase::PutRecord {
1526                        success,
1527                        get_closest_peers_stats,
1528                    },
1529            } => {
1530                let mk_result = |key: record::Key| {
1531                    if success.len() >= quorum.get() {
1532                        Ok(PutRecordOk { key })
1533                    } else {
1534                        Err(PutRecordError::QuorumFailed {
1535                            key,
1536                            quorum,
1537                            success,
1538                        })
1539                    }
1540                };
1541                match context {
1542                    PutRecordContext::Publish | PutRecordContext::Custom => {
1543                        Some(Event::OutboundQueryProgressed {
1544                            id: query_id,
1545                            stats: get_closest_peers_stats.merge(result.stats),
1546                            result: QueryResult::PutRecord(mk_result(record.key)),
1547                            step: ProgressStep::first_and_last(),
1548                        })
1549                    }
1550                    PutRecordContext::Republish => Some(Event::OutboundQueryProgressed {
1551                        id: query_id,
1552                        stats: get_closest_peers_stats.merge(result.stats),
1553                        result: QueryResult::RepublishRecord(mk_result(record.key)),
1554                        step: ProgressStep::first_and_last(),
1555                    }),
1556                    PutRecordContext::Replicate => {
1557                        tracing::debug!(record=?record.key, "Record replicated");
1558                        None
1559                    }
1560                }
1561            }
1562        }
1563    }
1564
1565    /// Handles a query that timed out.
1566    fn query_timeout(&mut self, query: Query<QueryInner>) -> Option<Event> {
1567        let query_id = query.id();
1568        tracing::trace!(query=?query_id, "Query timed out");
1569        let result = query.into_result();
1570        match result.inner.info {
1571            QueryInfo::Bootstrap {
1572                peer,
1573                mut remaining,
1574                mut step,
1575            } => {
1576                let num_remaining = remaining.as_ref().map(|r| r.len().saturating_sub(1) as u32);
1577
1578                // Continue with the next bootstrap query if `remaining` is not empty.
1579                if let Some((target, remaining)) =
1580                    remaining.take().and_then(|mut r| Some((r.next()?, r)))
1581                {
1582                    let info = QueryInfo::Bootstrap {
1583                        peer: target.clone().into_preimage(),
1584                        remaining: Some(remaining),
1585                        step: step.next(),
1586                    };
1587                    let peers = self.kbuckets.closest_keys(&target);
1588                    let inner = QueryInner::new(info);
1589                    self.queries
1590                        .continue_iter_closest(query_id, target.clone(), peers, inner);
1591                } else {
1592                    step.last = true;
1593                }
1594
1595                Some(Event::OutboundQueryProgressed {
1596                    id: query_id,
1597                    stats: result.stats,
1598                    result: QueryResult::Bootstrap(Err(BootstrapError::Timeout {
1599                        peer,
1600                        num_remaining,
1601                    })),
1602                    step,
1603                })
1604            }
1605
1606            QueryInfo::AddProvider { context, key, .. } => Some(match context {
1607                AddProviderContext::Publish => Event::OutboundQueryProgressed {
1608                    id: query_id,
1609                    stats: result.stats,
1610                    result: QueryResult::StartProviding(Err(AddProviderError::Timeout { key })),
1611                    step: ProgressStep::first_and_last(),
1612                },
1613                AddProviderContext::Republish => Event::OutboundQueryProgressed {
1614                    id: query_id,
1615                    stats: result.stats,
1616                    result: QueryResult::RepublishProvider(Err(AddProviderError::Timeout { key })),
1617                    step: ProgressStep::first_and_last(),
1618                },
1619            }),
1620
1621            QueryInfo::GetClosestPeers { key, mut step } => {
1622                step.last = true;
1623
1624                Some(Event::OutboundQueryProgressed {
1625                    id: query_id,
1626                    stats: result.stats,
1627                    result: QueryResult::GetClosestPeers(Err(GetClosestPeersError::Timeout {
1628                        key,
1629                        peers: result.peers.collect(),
1630                    })),
1631                    step,
1632                })
1633            }
1634
1635            QueryInfo::PutRecord {
1636                record,
1637                quorum,
1638                context,
1639                phase,
1640            } => {
1641                let err = Err(PutRecordError::Timeout {
1642                    key: record.key,
1643                    quorum,
1644                    success: match phase {
1645                        PutRecordPhase::GetClosestPeers => vec![],
1646                        PutRecordPhase::PutRecord { ref success, .. } => success.clone(),
1647                    },
1648                });
1649                match context {
1650                    PutRecordContext::Publish | PutRecordContext::Custom => {
1651                        Some(Event::OutboundQueryProgressed {
1652                            id: query_id,
1653                            stats: result.stats,
1654                            result: QueryResult::PutRecord(err),
1655                            step: ProgressStep::first_and_last(),
1656                        })
1657                    }
1658                    PutRecordContext::Republish => Some(Event::OutboundQueryProgressed {
1659                        id: query_id,
1660                        stats: result.stats,
1661                        result: QueryResult::RepublishRecord(err),
1662                        step: ProgressStep::first_and_last(),
1663                    }),
1664                    PutRecordContext::Replicate => match phase {
1665                        PutRecordPhase::GetClosestPeers => {
1666                            tracing::warn!(
1667                                "Locating closest peers for replication failed: {:?}",
1668                                err
1669                            );
1670                            None
1671                        }
1672                        PutRecordPhase::PutRecord { .. } => {
1673                            tracing::debug!("Replicating record failed: {:?}", err);
1674                            None
1675                        }
1676                    },
1677                }
1678            }
1679
1680            QueryInfo::GetRecord { key, mut step, .. } => {
1681                step.last = true;
1682
1683                Some(Event::OutboundQueryProgressed {
1684                    id: query_id,
1685                    stats: result.stats,
1686                    result: QueryResult::GetRecord(Err(GetRecordError::Timeout { key })),
1687                    step,
1688                })
1689            }
1690
1691            QueryInfo::GetProviders { key, mut step, .. } => {
1692                step.last = true;
1693
1694                Some(Event::OutboundQueryProgressed {
1695                    id: query_id,
1696                    stats: result.stats,
1697                    result: QueryResult::GetProviders(Err(GetProvidersError::Timeout {
1698                        key,
1699                        closest_peers: result.peers.collect(),
1700                    })),
1701                    step,
1702                })
1703            }
1704        }
1705    }
1706
1707    /// Processes a record received from a peer.
1708    fn record_received(
1709        &mut self,
1710        source: PeerId,
1711        connection: ConnectionId,
1712        request_id: RequestId,
1713        mut record: Record,
1714    ) {
1715        if record.publisher.as_ref() == Some(self.kbuckets.local_key().preimage()) {
1716            // If the (alleged) publisher is the local node, do nothing. The record of
1717            // the original publisher should never change as a result of replication
1718            // and the publisher is always assumed to have the "right" value.
1719            self.queued_events.push_back(ToSwarm::NotifyHandler {
1720                peer_id: source,
1721                handler: NotifyHandler::One(connection),
1722                event: HandlerIn::PutRecordRes {
1723                    key: record.key,
1724                    value: record.value,
1725                    request_id,
1726                },
1727            });
1728            return;
1729        }
1730
1731        let now = Instant::now();
1732
1733        // Calculate the expiration exponentially inversely proportional to the
1734        // number of nodes between the local node and the closest node to the key
1735        // (beyond the replication factor). This ensures avoiding over-caching
1736        // outside of the k closest nodes to a key.
1737        let target = kbucket::Key::new(record.key.clone());
1738        let num_between = self.kbuckets.count_nodes_between(&target);
1739        let k = self.queries.config().replication_factor.get();
1740        let num_beyond_k = (usize::max(k, num_between) - k) as u32;
1741        let expiration = self
1742            .record_ttl
1743            .map(|ttl| now + exp_decrease(ttl, num_beyond_k));
1744        // The smaller TTL prevails. Only if neither TTL is set is the record
1745        // stored "forever".
1746        record.expires = record.expires.or(expiration).min(expiration);
1747
1748        if let Some(job) = self.put_record_job.as_mut() {
1749            // Ignore the record in the next run of the replication
1750            // job, since we can assume the sender replicated the
1751            // record to the k closest peers. Effectively, only
1752            // one of the k closest peers performs a replication
1753            // in the configured interval, assuming a shared interval.
1754            job.skip(record.key.clone())
1755        }
1756
1757        // While records received from a publisher, as well as records that do
1758        // not exist locally should always (attempted to) be stored, there is a
1759        // choice here w.r.t. the handling of replicated records whose keys refer
1760        // to records that exist locally: The value and / or the publisher may
1761        // either be overridden or left unchanged. At the moment and in the
1762        // absence of a decisive argument for another option, both are always
1763        // overridden as it avoids having to load the existing record in the
1764        // first place.
1765
1766        if !record.is_expired(now) {
1767            // The record is cloned because of the weird libp2p protocol
1768            // requirement to send back the value in the response, although this
1769            // is a waste of resources.
1770            match self.record_filtering {
1771                StoreInserts::Unfiltered => match self.store.put(record.clone()) {
1772                    Ok(()) => {
1773                        tracing::debug!(
1774                            record=?record.key,
1775                            "Record stored: {} bytes",
1776                            record.value.len()
1777                        );
1778                        self.queued_events.push_back(ToSwarm::GenerateEvent(
1779                            Event::InboundRequest {
1780                                request: InboundRequest::PutRecord {
1781                                    source,
1782                                    connection,
1783                                    record: None,
1784                                },
1785                            },
1786                        ));
1787                    }
1788                    Err(e) => {
1789                        tracing::info!("Record not stored: {:?}", e);
1790                        self.queued_events.push_back(ToSwarm::NotifyHandler {
1791                            peer_id: source,
1792                            handler: NotifyHandler::One(connection),
1793                            event: HandlerIn::Reset(request_id),
1794                        });
1795
1796                        return;
1797                    }
1798                },
1799                StoreInserts::FilterBoth => {
1800                    self.queued_events
1801                        .push_back(ToSwarm::GenerateEvent(Event::InboundRequest {
1802                            request: InboundRequest::PutRecord {
1803                                source,
1804                                connection,
1805                                record: Some(record.clone()),
1806                            },
1807                        }));
1808                }
1809            }
1810        }
1811
1812        // The remote receives a [`HandlerIn::PutRecordRes`] even in the
1813        // case where the record is discarded due to being expired. Given that
1814        // the remote sent the local node a [`HandlerEvent::PutRecord`]
1815        // request, the remote perceives the local node as one node among the k
1816        // closest nodes to the target. In addition returning
1817        // [`HandlerIn::PutRecordRes`] does not reveal any internal
1818        // information to a possibly malicious remote node.
1819        self.queued_events.push_back(ToSwarm::NotifyHandler {
1820            peer_id: source,
1821            handler: NotifyHandler::One(connection),
1822            event: HandlerIn::PutRecordRes {
1823                key: record.key,
1824                value: record.value,
1825                request_id,
1826            },
1827        })
1828    }
1829
1830    /// Processes a provider record received from a peer.
1831    fn provider_received(&mut self, key: record::Key, provider: KadPeer) {
1832        if &provider.node_id != self.kbuckets.local_key().preimage() {
1833            let record = ProviderRecord {
1834                key,
1835                provider: provider.node_id,
1836                expires: self.provider_record_ttl.map(|ttl| Instant::now() + ttl),
1837                addresses: provider.multiaddrs,
1838            };
1839            match self.record_filtering {
1840                StoreInserts::Unfiltered => {
1841                    if let Err(e) = self.store.add_provider(record) {
1842                        tracing::info!("Provider record not stored: {:?}", e);
1843                        return;
1844                    }
1845
1846                    self.queued_events
1847                        .push_back(ToSwarm::GenerateEvent(Event::InboundRequest {
1848                            request: InboundRequest::AddProvider { record: None },
1849                        }));
1850                }
1851                StoreInserts::FilterBoth => {
1852                    self.queued_events
1853                        .push_back(ToSwarm::GenerateEvent(Event::InboundRequest {
1854                            request: InboundRequest::AddProvider {
1855                                record: Some(record),
1856                            },
1857                        }));
1858                }
1859            }
1860        }
1861    }
1862
1863    fn address_failed(&mut self, peer_id: PeerId, address: &Multiaddr) {
1864        let key = kbucket::Key::from(peer_id);
1865
1866        if let Some(addrs) = self.kbuckets.entry(&key).value() {
1867            // TODO: Ideally, the address should only be removed if the error can
1868            // be classified as "permanent" but since `err` is currently a borrowed
1869            // trait object without a `'static` bound, even downcasting for inspection
1870            // of the error is not possible (and also not truly desirable or ergonomic).
1871            // The error passed in should rather be a dedicated enum.
1872            if addrs.remove(address).is_ok() {
1873                tracing::debug!(
1874                    peer=%peer_id,
1875                    %address,
1876                    "Address removed from peer due to error."
1877                );
1878            } else {
1879                // Despite apparently having no reachable address (any longer),
1880                // the peer is kept in the routing table with the last address to avoid
1881                // (temporary) loss of network connectivity to "flush" the routing
1882                // table. Once in, a peer is only removed from the routing table
1883                // if it is the least recently connected peer, currently disconnected
1884                // and is unreachable in the context of another peer pending insertion
1885                // into the same bucket. This is handled transparently by the
1886                // `KBucketsTable` and takes effect through `KBucketsTable::take_applied_pending`
1887                // within `Behaviour::poll`.
1888                tracing::debug!(
1889                    peer=%peer_id,
1890                    %address,
1891                    "Last remaining address of peer is unreachable."
1892                );
1893            }
1894        }
1895
1896        for query in self.queries.iter_mut() {
1897            if let Some(addrs) = query.inner.addresses.get_mut(&peer_id) {
1898                addrs.retain(|a| a != address);
1899            }
1900        }
1901    }
1902
1903    fn on_connection_established(
1904        &mut self,
1905        ConnectionEstablished {
1906            peer_id,
1907            failed_addresses,
1908            other_established,
1909            ..
1910        }: ConnectionEstablished,
1911    ) {
1912        for addr in failed_addresses {
1913            self.address_failed(peer_id, addr);
1914        }
1915
1916        // Peer's first connection.
1917        if other_established == 0 {
1918            self.connected_peers.insert(peer_id);
1919        }
1920    }
1921
1922    fn on_address_change(
1923        &mut self,
1924        AddressChange {
1925            peer_id: peer,
1926            old,
1927            new,
1928            ..
1929        }: AddressChange,
1930    ) {
1931        let (old, new) = (old.get_remote_address(), new.get_remote_address());
1932
1933        // Update routing table.
1934        if let Some(addrs) = self.kbuckets.entry(&kbucket::Key::from(peer)).value() {
1935            if addrs.replace(old, new) {
1936                tracing::debug!(
1937                    %peer,
1938                    old_address=%old,
1939                    new_address=%new,
1940                    "Old address replaced with new address for peer."
1941                );
1942            } else {
1943                tracing::debug!(
1944                    %peer,
1945                    old_address=%old,
1946                    new_address=%new,
1947                    "Old address not replaced with new address for peer as old address wasn't present.",
1948                );
1949            }
1950        } else {
1951            tracing::debug!(
1952                %peer,
1953                old_address=%old,
1954                new_address=%new,
1955                "Old address not replaced with new address for peer as peer is not present in the \
1956                 routing table."
1957            );
1958        }
1959
1960        // Update query address cache.
1961        //
1962        // Given two connected nodes: local node A and remote node B. Say node B
1963        // is not in node A's routing table. Additionally node B is part of the
1964        // `QueryInner::addresses` list of an ongoing query on node A. Say Node
1965        // B triggers an address change and then disconnects. Later on the
1966        // earlier mentioned query on node A would like to connect to node B.
1967        // Without replacing the address in the `QueryInner::addresses` set node
1968        // A would attempt to dial the old and not the new address.
1969        //
1970        // While upholding correctness, iterating through all discovered
1971        // addresses of a peer in all currently ongoing queries might have a
1972        // large performance impact. If so, the code below might be worth
1973        // revisiting.
1974        for query in self.queries.iter_mut() {
1975            if let Some(addrs) = query.inner.addresses.get_mut(&peer) {
1976                for addr in addrs.iter_mut() {
1977                    if addr == old {
1978                        *addr = new.clone();
1979                    }
1980                }
1981            }
1982        }
1983    }
1984
1985    fn on_dial_failure(&mut self, DialFailure { peer_id, error, .. }: DialFailure) {
1986        let Some(peer_id) = peer_id else { return };
1987
1988        match error {
1989            DialError::LocalPeerId { .. }
1990            | DialError::WrongPeerId { .. }
1991            | DialError::Aborted
1992            | DialError::Denied { .. }
1993            | DialError::Transport(_)
1994            | DialError::NoAddresses => {
1995                if let DialError::Transport(addresses) = error {
1996                    for (addr, _) in addresses {
1997                        self.address_failed(peer_id, addr)
1998                    }
1999                }
2000
2001                for query in self.queries.iter_mut() {
2002                    query.on_failure(&peer_id);
2003                }
2004            }
2005            DialError::DialPeerConditionFalse(
2006                dial_opts::PeerCondition::Disconnected
2007                | dial_opts::PeerCondition::NotDialing
2008                | dial_opts::PeerCondition::DisconnectedAndNotDialing,
2009            ) => {
2010                // We might (still) be connected, or about to be connected, thus do not report the
2011                // failure to the queries.
2012            }
2013            DialError::DialPeerConditionFalse(dial_opts::PeerCondition::Always) => {
2014                unreachable!("DialPeerCondition::Always can not trigger DialPeerConditionFalse.");
2015            }
2016        }
2017    }
2018
2019    fn on_connection_closed(
2020        &mut self,
2021        ConnectionClosed {
2022            peer_id,
2023            remaining_established,
2024            connection_id,
2025            ..
2026        }: ConnectionClosed,
2027    ) {
2028        self.connections.remove(&connection_id);
2029
2030        if remaining_established == 0 {
2031            for query in self.queries.iter_mut() {
2032                query.on_failure(&peer_id);
2033            }
2034            self.connection_updated(peer_id, None, NodeStatus::Disconnected);
2035            self.connected_peers.remove(&peer_id);
2036        }
2037    }
2038
2039    /// Preloads a new [`Handler`] with requests that are waiting to be sent to the newly connected peer.
2040    fn preload_new_handler(
2041        &mut self,
2042        handler: &mut Handler,
2043        connection_id: ConnectionId,
2044        peer: PeerId,
2045    ) {
2046        self.connections.insert(connection_id, peer);
2047        // Queue events for sending pending RPCs to the connected peer.
2048        // There can be only one pending RPC for a particular peer and query per definition.
2049        for (_peer_id, event) in self.queries.iter_mut().filter_map(|q| {
2050            q.inner
2051                .pending_rpcs
2052                .iter()
2053                .position(|(p, _)| p == &peer)
2054                .map(|p| q.inner.pending_rpcs.remove(p))
2055        }) {
2056            handler.on_behaviour_event(event)
2057        }
2058    }
2059}
2060
2061/// Exponentially decrease the given duration (base 2).
2062fn exp_decrease(ttl: Duration, exp: u32) -> Duration {
2063    Duration::from_secs(ttl.as_secs().checked_shr(exp).unwrap_or(0))
2064}
2065
2066impl<TStore> NetworkBehaviour for Behaviour<TStore>
2067where
2068    TStore: RecordStore + Send + 'static,
2069{
2070    type ConnectionHandler = Handler;
2071    type ToSwarm = Event;
2072
2073    fn handle_established_inbound_connection(
2074        &mut self,
2075        connection_id: ConnectionId,
2076        peer: PeerId,
2077        local_addr: &Multiaddr,
2078        remote_addr: &Multiaddr,
2079    ) -> Result<THandler<Self>, ConnectionDenied> {
2080        let connected_point = ConnectedPoint::Listener {
2081            local_addr: local_addr.clone(),
2082            send_back_addr: remote_addr.clone(),
2083        };
2084
2085        let mut handler = Handler::new(
2086            self.protocol_config.clone(),
2087            connected_point,
2088            peer,
2089            self.mode,
2090        );
2091        self.preload_new_handler(&mut handler, connection_id, peer);
2092
2093        Ok(handler)
2094    }
2095
2096    fn handle_established_outbound_connection(
2097        &mut self,
2098        connection_id: ConnectionId,
2099        peer: PeerId,
2100        addr: &Multiaddr,
2101        role_override: Endpoint,
2102    ) -> Result<THandler<Self>, ConnectionDenied> {
2103        let connected_point = ConnectedPoint::Dialer {
2104            address: addr.clone(),
2105            role_override,
2106        };
2107
2108        let mut handler = Handler::new(
2109            self.protocol_config.clone(),
2110            connected_point,
2111            peer,
2112            self.mode,
2113        );
2114        self.preload_new_handler(&mut handler, connection_id, peer);
2115
2116        Ok(handler)
2117    }
2118
2119    fn handle_pending_outbound_connection(
2120        &mut self,
2121        _connection_id: ConnectionId,
2122        maybe_peer: Option<PeerId>,
2123        _addresses: &[Multiaddr],
2124        _effective_role: Endpoint,
2125    ) -> Result<Vec<Multiaddr>, ConnectionDenied> {
2126        let peer_id = match maybe_peer {
2127            None => return Ok(vec![]),
2128            Some(peer) => peer,
2129        };
2130
2131        // We should order addresses from decreasing likelyhood of connectivity, so start with
2132        // the addresses of that peer in the k-buckets.
2133        let key = kbucket::Key::from(peer_id);
2134        let mut peer_addrs =
2135            if let kbucket::Entry::Present(mut entry, _) = self.kbuckets.entry(&key) {
2136                let addrs = entry.value().iter().cloned().collect::<Vec<_>>();
2137                debug_assert!(!addrs.is_empty(), "Empty peer addresses in routing table.");
2138                addrs
2139            } else {
2140                Vec::new()
2141            };
2142
2143        // We add to that a temporary list of addresses from the ongoing queries.
2144        for query in self.queries.iter() {
2145            if let Some(addrs) = query.inner.addresses.get(&peer_id) {
2146                peer_addrs.extend(addrs.iter().cloned())
2147            }
2148        }
2149
2150        Ok(peer_addrs)
2151    }
2152
2153    fn on_connection_handler_event(
2154        &mut self,
2155        source: PeerId,
2156        connection: ConnectionId,
2157        event: THandlerOutEvent<Self>,
2158    ) {
2159        match event {
2160            HandlerEvent::ProtocolConfirmed { endpoint } => {
2161                debug_assert!(self.connected_peers.contains(&source));
2162                // The remote's address can only be put into the routing table,
2163                // and thus shared with other nodes, if the local node is the dialer,
2164                // since the remote address on an inbound connection may be specific
2165                // to that connection (e.g. typically the TCP port numbers).
2166                let address = match endpoint {
2167                    ConnectedPoint::Dialer { address, .. } => Some(address),
2168                    ConnectedPoint::Listener { .. } => None,
2169                };
2170
2171                self.connection_updated(source, address, NodeStatus::Connected);
2172            }
2173
2174            HandlerEvent::ProtocolNotSupported { endpoint } => {
2175                let address = match endpoint {
2176                    ConnectedPoint::Dialer { address, .. } => Some(address),
2177                    ConnectedPoint::Listener { .. } => None,
2178                };
2179                self.connection_updated(source, address, NodeStatus::Disconnected);
2180            }
2181
2182            HandlerEvent::FindNodeReq { key, request_id } => {
2183                let closer_peers = self.find_closest(&kbucket::Key::new(key), &source);
2184
2185                self.queued_events
2186                    .push_back(ToSwarm::GenerateEvent(Event::InboundRequest {
2187                        request: InboundRequest::FindNode {
2188                            num_closer_peers: closer_peers.len(),
2189                        },
2190                    }));
2191
2192                self.queued_events.push_back(ToSwarm::NotifyHandler {
2193                    peer_id: source,
2194                    handler: NotifyHandler::One(connection),
2195                    event: HandlerIn::FindNodeRes {
2196                        closer_peers,
2197                        request_id,
2198                    },
2199                });
2200            }
2201
2202            HandlerEvent::FindNodeRes {
2203                closer_peers,
2204                query_id,
2205            } => {
2206                self.discovered(&query_id, &source, closer_peers.iter());
2207            }
2208
2209            HandlerEvent::GetProvidersReq { key, request_id } => {
2210                let provider_peers = self.provider_peers(&key, &source);
2211                let closer_peers = self.find_closest(&kbucket::Key::new(key), &source);
2212
2213                self.queued_events
2214                    .push_back(ToSwarm::GenerateEvent(Event::InboundRequest {
2215                        request: InboundRequest::GetProvider {
2216                            num_closer_peers: closer_peers.len(),
2217                            num_provider_peers: provider_peers.len(),
2218                        },
2219                    }));
2220
2221                self.queued_events.push_back(ToSwarm::NotifyHandler {
2222                    peer_id: source,
2223                    handler: NotifyHandler::One(connection),
2224                    event: HandlerIn::GetProvidersRes {
2225                        closer_peers,
2226                        provider_peers,
2227                        request_id,
2228                    },
2229                });
2230            }
2231
2232            HandlerEvent::GetProvidersRes {
2233                closer_peers,
2234                provider_peers,
2235                query_id,
2236            } => {
2237                let peers = closer_peers.iter().chain(provider_peers.iter());
2238                self.discovered(&query_id, &source, peers);
2239                if let Some(query) = self.queries.get_mut(&query_id) {
2240                    let stats = query.stats().clone();
2241                    if let QueryInfo::GetProviders {
2242                        ref key,
2243                        ref mut providers_found,
2244                        ref mut step,
2245                        ..
2246                    } = query.inner.info
2247                    {
2248                        *providers_found += provider_peers.len();
2249                        let providers = provider_peers.iter().map(|p| p.node_id).collect();
2250
2251                        self.queued_events.push_back(ToSwarm::GenerateEvent(
2252                            Event::OutboundQueryProgressed {
2253                                id: query_id,
2254                                result: QueryResult::GetProviders(Ok(
2255                                    GetProvidersOk::FoundProviders {
2256                                        key: key.clone(),
2257                                        providers,
2258                                    },
2259                                )),
2260                                step: step.clone(),
2261                                stats,
2262                            },
2263                        ));
2264                        *step = step.next();
2265                    }
2266                }
2267            }
2268            HandlerEvent::QueryError { query_id, error } => {
2269                tracing::debug!(
2270                    peer=%source,
2271                    query=?query_id,
2272                    "Request to peer in query failed with {:?}",
2273                    error
2274                );
2275                // If the query to which the error relates is still active,
2276                // signal the failure w.r.t. `source`.
2277                if let Some(query) = self.queries.get_mut(&query_id) {
2278                    query.on_failure(&source)
2279                }
2280            }
2281
2282            HandlerEvent::AddProvider { key, provider } => {
2283                // Only accept a provider record from a legitimate peer.
2284                if provider.node_id != source {
2285                    return;
2286                }
2287
2288                self.provider_received(key, provider);
2289            }
2290
2291            HandlerEvent::GetRecord { key, request_id } => {
2292                // Lookup the record locally.
2293                let record = match self.store.get(&key) {
2294                    Some(record) => {
2295                        if record.is_expired(Instant::now()) {
2296                            self.store.remove(&key);
2297                            None
2298                        } else {
2299                            Some(record.into_owned())
2300                        }
2301                    }
2302                    None => None,
2303                };
2304
2305                let closer_peers = self.find_closest(&kbucket::Key::new(key), &source);
2306
2307                self.queued_events
2308                    .push_back(ToSwarm::GenerateEvent(Event::InboundRequest {
2309                        request: InboundRequest::GetRecord {
2310                            num_closer_peers: closer_peers.len(),
2311                            present_locally: record.is_some(),
2312                        },
2313                    }));
2314
2315                self.queued_events.push_back(ToSwarm::NotifyHandler {
2316                    peer_id: source,
2317                    handler: NotifyHandler::One(connection),
2318                    event: HandlerIn::GetRecordRes {
2319                        record,
2320                        closer_peers,
2321                        request_id,
2322                    },
2323                });
2324            }
2325
2326            HandlerEvent::GetRecordRes {
2327                record,
2328                closer_peers,
2329                query_id,
2330            } => {
2331                if let Some(query) = self.queries.get_mut(&query_id) {
2332                    let stats = query.stats().clone();
2333                    if let QueryInfo::GetRecord {
2334                        key,
2335                        ref mut step,
2336                        ref mut found_a_record,
2337                        cache_candidates,
2338                    } = &mut query.inner.info
2339                    {
2340                        if let Some(record) = record {
2341                            *found_a_record = true;
2342                            let record = PeerRecord {
2343                                peer: Some(source),
2344                                record,
2345                            };
2346
2347                            self.queued_events.push_back(ToSwarm::GenerateEvent(
2348                                Event::OutboundQueryProgressed {
2349                                    id: query_id,
2350                                    result: QueryResult::GetRecord(Ok(GetRecordOk::FoundRecord(
2351                                        record,
2352                                    ))),
2353                                    step: step.clone(),
2354                                    stats,
2355                                },
2356                            ));
2357
2358                            *step = step.next();
2359                        } else {
2360                            tracing::trace!(record=?key, %source, "Record not found at source");
2361                            if let Caching::Enabled { max_peers } = self.caching {
2362                                let source_key = kbucket::Key::from(source);
2363                                let target_key = kbucket::Key::from(key.clone());
2364                                let distance = source_key.distance(&target_key);
2365                                cache_candidates.insert(distance, source);
2366                                if cache_candidates.len() > max_peers as usize {
2367                                    // TODO: `pop_last()` would be nice once stabilised.
2368                                    // See https://github.com/rust-lang/rust/issues/62924.
2369                                    let last =
2370                                        *cache_candidates.keys().next_back().expect("len > 0");
2371                                    cache_candidates.remove(&last);
2372                                }
2373                            }
2374                        }
2375                    }
2376                }
2377
2378                self.discovered(&query_id, &source, closer_peers.iter());
2379            }
2380
2381            HandlerEvent::PutRecord { record, request_id } => {
2382                self.record_received(source, connection, request_id, record);
2383            }
2384
2385            HandlerEvent::PutRecordRes { query_id, .. } => {
2386                if let Some(query) = self.queries.get_mut(&query_id) {
2387                    query.on_success(&source, vec![]);
2388                    if let QueryInfo::PutRecord {
2389                        phase: PutRecordPhase::PutRecord { success, .. },
2390                        quorum,
2391                        ..
2392                    } = &mut query.inner.info
2393                    {
2394                        success.push(source);
2395
2396                        let quorum = quorum.get();
2397                        if success.len() >= quorum {
2398                            let peers = success.clone();
2399                            let finished = query.try_finish(peers.iter());
2400                            if !finished {
2401                                tracing::debug!(
2402                                    peer=%source,
2403                                    query=?query_id,
2404                                    "PutRecord query reached quorum ({}/{}) with response \
2405                                     from peer but could not yet finish.",
2406                                    peers.len(),
2407                                    quorum,
2408                                );
2409                            }
2410                        }
2411                    }
2412                }
2413            }
2414        };
2415    }
2416
2417    #[tracing::instrument(level = "trace", name = "NetworkBehaviour::poll", skip(self, cx))]
2418    fn poll(
2419        &mut self,
2420        cx: &mut Context<'_>,
2421    ) -> Poll<ToSwarm<Self::ToSwarm, THandlerInEvent<Self>>> {
2422        let now = Instant::now();
2423
2424        // Calculate the available capacity for queries triggered by background jobs.
2425        let mut jobs_query_capacity = JOBS_MAX_QUERIES.saturating_sub(self.queries.size());
2426
2427        // Run the periodic provider announcement job.
2428        if let Some(mut job) = self.add_provider_job.take() {
2429            let num = usize::min(JOBS_MAX_NEW_QUERIES, jobs_query_capacity);
2430            for _ in 0..num {
2431                if let Poll::Ready(r) = job.poll(cx, &mut self.store, now) {
2432                    self.start_add_provider(r.key, AddProviderContext::Republish)
2433                } else {
2434                    break;
2435                }
2436            }
2437            jobs_query_capacity -= num;
2438            self.add_provider_job = Some(job);
2439        }
2440
2441        // Run the periodic record replication / publication job.
2442        if let Some(mut job) = self.put_record_job.take() {
2443            let num = usize::min(JOBS_MAX_NEW_QUERIES, jobs_query_capacity);
2444            for _ in 0..num {
2445                if let Poll::Ready(r) = job.poll(cx, &mut self.store, now) {
2446                    let context =
2447                        if r.publisher.as_ref() == Some(self.kbuckets.local_key().preimage()) {
2448                            PutRecordContext::Republish
2449                        } else {
2450                            PutRecordContext::Replicate
2451                        };
2452                    self.start_put_record(r, Quorum::All, context)
2453                } else {
2454                    break;
2455                }
2456            }
2457            self.put_record_job = Some(job);
2458        }
2459
2460        loop {
2461            // Drain queued events first.
2462            if let Some(event) = self.queued_events.pop_front() {
2463                return Poll::Ready(event);
2464            }
2465
2466            // Drain applied pending entries from the routing table.
2467            if let Some(entry) = self.kbuckets.take_applied_pending() {
2468                let kbucket::Node { key, value } = entry.inserted;
2469                let event = Event::RoutingUpdated {
2470                    bucket_range: self
2471                        .kbuckets
2472                        .bucket(&key)
2473                        .map(|b| b.range())
2474                        .expect("Self to never be applied from pending."),
2475                    peer: key.into_preimage(),
2476                    is_new_peer: true,
2477                    addresses: value,
2478                    old_peer: entry.evicted.map(|n| n.key.into_preimage()),
2479                };
2480                return Poll::Ready(ToSwarm::GenerateEvent(event));
2481            }
2482
2483            // Look for a finished query.
2484            loop {
2485                match self.queries.poll(now) {
2486                    QueryPoolState::Finished(q) => {
2487                        if let Some(event) = self.query_finished(q) {
2488                            return Poll::Ready(ToSwarm::GenerateEvent(event));
2489                        }
2490                    }
2491                    QueryPoolState::Timeout(q) => {
2492                        if let Some(event) = self.query_timeout(q) {
2493                            return Poll::Ready(ToSwarm::GenerateEvent(event));
2494                        }
2495                    }
2496                    QueryPoolState::Waiting(Some((query, peer_id))) => {
2497                        let event = query.inner.info.to_request(query.id());
2498                        // TODO: AddProvider requests yield no response, so the query completes
2499                        // as soon as all requests have been sent. However, the handler should
2500                        // better emit an event when the request has been sent (and report
2501                        // an error if sending fails), instead of immediately reporting
2502                        // "success" somewhat prematurely here.
2503                        if let QueryInfo::AddProvider {
2504                            phase: AddProviderPhase::AddProvider { .. },
2505                            ..
2506                        } = &query.inner.info
2507                        {
2508                            query.on_success(&peer_id, vec![])
2509                        }
2510
2511                        if self.connected_peers.contains(&peer_id) {
2512                            self.queued_events.push_back(ToSwarm::NotifyHandler {
2513                                peer_id,
2514                                event,
2515                                handler: NotifyHandler::Any,
2516                            });
2517                        } else if &peer_id != self.kbuckets.local_key().preimage() {
2518                            query.inner.pending_rpcs.push((peer_id, event));
2519                            self.queued_events.push_back(ToSwarm::Dial {
2520                                opts: DialOpts::peer_id(peer_id)
2521                                    .condition(dial_opts::PeerCondition::NotDialing)
2522                                    .build(),
2523                            });
2524                        }
2525                    }
2526                    QueryPoolState::Waiting(None) | QueryPoolState::Idle => break,
2527                }
2528            }
2529
2530            // No immediate event was produced as a result of a finished query.
2531            // If no new events have been queued either, signal `NotReady` to
2532            // be polled again later.
2533            if self.queued_events.is_empty() {
2534                self.no_events_waker = Some(cx.waker().clone());
2535
2536                return Poll::Pending;
2537            }
2538        }
2539    }
2540
2541    fn on_swarm_event(&mut self, event: FromSwarm) {
2542        self.listen_addresses.on_swarm_event(&event);
2543        let external_addresses_changed = self.external_addresses.on_swarm_event(&event);
2544
2545        if self.auto_mode && external_addresses_changed {
2546            self.determine_mode_from_external_addresses();
2547        }
2548
2549        match event {
2550            FromSwarm::ConnectionEstablished(connection_established) => {
2551                self.on_connection_established(connection_established)
2552            }
2553            FromSwarm::ConnectionClosed(connection_closed) => {
2554                self.on_connection_closed(connection_closed)
2555            }
2556            FromSwarm::DialFailure(dial_failure) => self.on_dial_failure(dial_failure),
2557            FromSwarm::AddressChange(address_change) => self.on_address_change(address_change),
2558            _ => {}
2559        }
2560    }
2561}
2562
2563/// A quorum w.r.t. the configured replication factor specifies the minimum
2564/// number of distinct nodes that must be successfully contacted in order
2565/// for a query to succeed.
2566#[derive(Debug, Copy, Clone, PartialEq, Eq)]
2567pub enum Quorum {
2568    One,
2569    Majority,
2570    All,
2571    N(NonZeroUsize),
2572}
2573
2574impl Quorum {
2575    /// Evaluate the quorum w.r.t a given total (number of peers).
2576    fn eval(&self, total: NonZeroUsize) -> NonZeroUsize {
2577        match self {
2578            Quorum::One => NonZeroUsize::new(1).expect("1 != 0"),
2579            Quorum::Majority => NonZeroUsize::new(total.get() / 2 + 1).expect("n + 1 != 0"),
2580            Quorum::All => total,
2581            Quorum::N(n) => NonZeroUsize::min(total, *n),
2582        }
2583    }
2584}
2585
2586/// A record either received by the given peer or retrieved from the local
2587/// record store.
2588#[derive(Debug, Clone, PartialEq, Eq)]
2589pub struct PeerRecord {
2590    /// The peer from whom the record was received. `None` if the record was
2591    /// retrieved from local storage.
2592    pub peer: Option<PeerId>,
2593    pub record: Record,
2594}
2595
2596//////////////////////////////////////////////////////////////////////////////
2597// Events
2598
2599/// The events produced by the `Kademlia` behaviour.
2600///
2601/// See [`NetworkBehaviour::poll`].
2602#[derive(Debug, Clone)]
2603#[allow(clippy::large_enum_variant)]
2604pub enum Event {
2605    /// An inbound request has been received and handled.
2606    //
2607    // Note on the difference between 'request' and 'query': A request is a
2608    // single request-response style exchange with a single remote peer. A query
2609    // is made of multiple requests across multiple remote peers.
2610    InboundRequest { request: InboundRequest },
2611
2612    /// An outbound query has made progress.
2613    OutboundQueryProgressed {
2614        /// The ID of the query that finished.
2615        id: QueryId,
2616        /// The intermediate result of the query.
2617        result: QueryResult,
2618        /// Execution statistics from the query.
2619        stats: QueryStats,
2620        /// Indicates which event this is, if therer are multiple responses for a single query.
2621        step: ProgressStep,
2622    },
2623
2624    /// The routing table has been updated with a new peer and / or
2625    /// address, thereby possibly evicting another peer.
2626    RoutingUpdated {
2627        /// The ID of the peer that was added or updated.
2628        peer: PeerId,
2629        /// Whether this is a new peer and was thus just added to the routing
2630        /// table, or whether it is an existing peer who's addresses changed.
2631        is_new_peer: bool,
2632        /// The full list of known addresses of `peer`.
2633        addresses: Addresses,
2634        /// Returns the minimum inclusive and maximum inclusive distance for
2635        /// the bucket of the peer.
2636        bucket_range: (Distance, Distance),
2637        /// The ID of the peer that was evicted from the routing table to make
2638        /// room for the new peer, if any.
2639        old_peer: Option<PeerId>,
2640    },
2641
2642    /// A peer has connected for whom no listen address is known.
2643    ///
2644    /// If the peer is to be added to the routing table, a known
2645    /// listen address for the peer must be provided via [`Behaviour::add_address`].
2646    UnroutablePeer { peer: PeerId },
2647
2648    /// A connection to a peer has been established for whom a listen address
2649    /// is known but the peer has not been added to the routing table either
2650    /// because [`BucketInserts::Manual`] is configured or because
2651    /// the corresponding bucket is full.
2652    ///
2653    /// If the peer is to be included in the routing table, it must
2654    /// must be explicitly added via [`Behaviour::add_address`], possibly after
2655    /// removing another peer.
2656    ///
2657    /// See [`Behaviour::kbucket`] for insight into the contents of
2658    /// the k-bucket of `peer`.
2659    RoutablePeer { peer: PeerId, address: Multiaddr },
2660
2661    /// A connection to a peer has been established for whom a listen address
2662    /// is known but the peer is only pending insertion into the routing table
2663    /// if the least-recently disconnected peer is unresponsive, i.e. the peer
2664    /// may not make it into the routing table.
2665    ///
2666    /// If the peer is to be unconditionally included in the routing table,
2667    /// it should be explicitly added via [`Behaviour::add_address`] after
2668    /// removing another peer.
2669    ///
2670    /// See [`Behaviour::kbucket`] for insight into the contents of
2671    /// the k-bucket of `peer`.
2672    PendingRoutablePeer { peer: PeerId, address: Multiaddr },
2673
2674    /// This peer's mode has been updated automatically.
2675    ///
2676    /// This happens in response to an external
2677    /// address being added or removed.
2678    ModeChanged { new_mode: Mode },
2679}
2680
2681/// Information about progress events.
2682#[derive(Debug, Clone)]
2683pub struct ProgressStep {
2684    /// The index into the event
2685    pub count: NonZeroUsize,
2686    /// Is this the final event?
2687    pub last: bool,
2688}
2689
2690impl ProgressStep {
2691    fn first() -> Self {
2692        Self {
2693            count: NonZeroUsize::new(1).expect("1 to be greater than 0."),
2694            last: false,
2695        }
2696    }
2697
2698    fn first_and_last() -> Self {
2699        let mut first = ProgressStep::first();
2700        first.last = true;
2701        first
2702    }
2703
2704    fn next(&self) -> Self {
2705        assert!(!self.last);
2706        let count = NonZeroUsize::new(self.count.get() + 1).expect("Adding 1 not to result in 0.");
2707
2708        Self { count, last: false }
2709    }
2710}
2711
2712/// Information about a received and handled inbound request.
2713#[derive(Debug, Clone)]
2714pub enum InboundRequest {
2715    /// Request for the list of nodes whose IDs are the closest to `key`.
2716    FindNode { num_closer_peers: usize },
2717    /// Same as `FindNode`, but should also return the entries of the local
2718    /// providers list for this key.
2719    GetProvider {
2720        num_closer_peers: usize,
2721        num_provider_peers: usize,
2722    },
2723    /// A peer sent an add provider request.
2724    /// If filtering [`StoreInserts::FilterBoth`] is enabled, the [`ProviderRecord`] is
2725    /// included.
2726    ///
2727    /// See [`StoreInserts`] and [`Config::set_record_filtering`] for details..
2728    AddProvider { record: Option<ProviderRecord> },
2729    /// Request to retrieve a record.
2730    GetRecord {
2731        num_closer_peers: usize,
2732        present_locally: bool,
2733    },
2734    /// A peer sent a put record request.
2735    /// If filtering [`StoreInserts::FilterBoth`] is enabled, the [`Record`] is included.
2736    ///
2737    /// See [`StoreInserts`] and [`Config::set_record_filtering`].
2738    PutRecord {
2739        source: PeerId,
2740        connection: ConnectionId,
2741        record: Option<Record>,
2742    },
2743}
2744
2745/// The results of Kademlia queries.
2746#[derive(Debug, Clone)]
2747pub enum QueryResult {
2748    /// The result of [`Behaviour::bootstrap`].
2749    Bootstrap(BootstrapResult),
2750
2751    /// The result of [`Behaviour::get_closest_peers`].
2752    GetClosestPeers(GetClosestPeersResult),
2753
2754    /// The result of [`Behaviour::get_providers`].
2755    GetProviders(GetProvidersResult),
2756
2757    /// The result of [`Behaviour::start_providing`].
2758    StartProviding(AddProviderResult),
2759
2760    /// The result of a (automatic) republishing of a provider record.
2761    RepublishProvider(AddProviderResult),
2762
2763    /// The result of [`Behaviour::get_record`].
2764    GetRecord(GetRecordResult),
2765
2766    /// The result of [`Behaviour::put_record`].
2767    PutRecord(PutRecordResult),
2768
2769    /// The result of a (automatic) republishing of a (value-)record.
2770    RepublishRecord(PutRecordResult),
2771}
2772
2773/// The result of [`Behaviour::get_record`].
2774pub type GetRecordResult = Result<GetRecordOk, GetRecordError>;
2775
2776/// The successful result of [`Behaviour::get_record`].
2777#[derive(Debug, Clone)]
2778pub enum GetRecordOk {
2779    FoundRecord(PeerRecord),
2780    FinishedWithNoAdditionalRecord {
2781        /// If caching is enabled, these are the peers closest
2782        /// _to the record key_ (not the local node) that were queried but
2783        /// did not return the record, sorted by distance to the record key
2784        /// from closest to farthest. How many of these are tracked is configured
2785        /// by [`Config::set_caching`].
2786        ///
2787        /// Writing back the cache at these peers is a manual operation.
2788        /// ie. you may wish to use these candidates with [`Behaviour::put_record_to`]
2789        /// after selecting one of the returned records.
2790        cache_candidates: BTreeMap<kbucket::Distance, PeerId>,
2791    },
2792}
2793
2794/// The error result of [`Behaviour::get_record`].
2795#[derive(Debug, Clone, Error)]
2796pub enum GetRecordError {
2797    #[error("the record was not found")]
2798    NotFound {
2799        key: record::Key,
2800        closest_peers: Vec<PeerId>,
2801    },
2802    #[error("the quorum failed; needed {quorum} peers")]
2803    QuorumFailed {
2804        key: record::Key,
2805        records: Vec<PeerRecord>,
2806        quorum: NonZeroUsize,
2807    },
2808    #[error("the request timed out")]
2809    Timeout { key: record::Key },
2810}
2811
2812impl GetRecordError {
2813    /// Gets the key of the record for which the operation failed.
2814    pub fn key(&self) -> &record::Key {
2815        match self {
2816            GetRecordError::QuorumFailed { key, .. } => key,
2817            GetRecordError::Timeout { key, .. } => key,
2818            GetRecordError::NotFound { key, .. } => key,
2819        }
2820    }
2821
2822    /// Extracts the key of the record for which the operation failed,
2823    /// consuming the error.
2824    pub fn into_key(self) -> record::Key {
2825        match self {
2826            GetRecordError::QuorumFailed { key, .. } => key,
2827            GetRecordError::Timeout { key, .. } => key,
2828            GetRecordError::NotFound { key, .. } => key,
2829        }
2830    }
2831}
2832
2833/// The result of [`Behaviour::put_record`].
2834pub type PutRecordResult = Result<PutRecordOk, PutRecordError>;
2835
2836/// The successful result of [`Behaviour::put_record`].
2837#[derive(Debug, Clone)]
2838pub struct PutRecordOk {
2839    pub key: record::Key,
2840}
2841
2842/// The error result of [`Behaviour::put_record`].
2843#[derive(Debug, Clone, Error)]
2844pub enum PutRecordError {
2845    #[error("the quorum failed; needed {quorum} peers")]
2846    QuorumFailed {
2847        key: record::Key,
2848        /// [`PeerId`]s of the peers the record was successfully stored on.
2849        success: Vec<PeerId>,
2850        quorum: NonZeroUsize,
2851    },
2852    #[error("the request timed out")]
2853    Timeout {
2854        key: record::Key,
2855        /// [`PeerId`]s of the peers the record was successfully stored on.
2856        success: Vec<PeerId>,
2857        quorum: NonZeroUsize,
2858    },
2859}
2860
2861impl PutRecordError {
2862    /// Gets the key of the record for which the operation failed.
2863    pub fn key(&self) -> &record::Key {
2864        match self {
2865            PutRecordError::QuorumFailed { key, .. } => key,
2866            PutRecordError::Timeout { key, .. } => key,
2867        }
2868    }
2869
2870    /// Extracts the key of the record for which the operation failed,
2871    /// consuming the error.
2872    pub fn into_key(self) -> record::Key {
2873        match self {
2874            PutRecordError::QuorumFailed { key, .. } => key,
2875            PutRecordError::Timeout { key, .. } => key,
2876        }
2877    }
2878}
2879
2880/// The result of [`Behaviour::bootstrap`].
2881pub type BootstrapResult = Result<BootstrapOk, BootstrapError>;
2882
2883/// The successful result of [`Behaviour::bootstrap`].
2884#[derive(Debug, Clone)]
2885pub struct BootstrapOk {
2886    pub peer: PeerId,
2887    pub num_remaining: u32,
2888}
2889
2890/// The error result of [`Behaviour::bootstrap`].
2891#[derive(Debug, Clone, Error)]
2892pub enum BootstrapError {
2893    #[error("the request timed out")]
2894    Timeout {
2895        peer: PeerId,
2896        num_remaining: Option<u32>,
2897    },
2898}
2899
2900/// The result of [`Behaviour::get_closest_peers`].
2901pub type GetClosestPeersResult = Result<GetClosestPeersOk, GetClosestPeersError>;
2902
2903/// The successful result of [`Behaviour::get_closest_peers`].
2904#[derive(Debug, Clone)]
2905pub struct GetClosestPeersOk {
2906    pub key: Vec<u8>,
2907    pub peers: Vec<PeerId>,
2908}
2909
2910/// The error result of [`Behaviour::get_closest_peers`].
2911#[derive(Debug, Clone, Error)]
2912pub enum GetClosestPeersError {
2913    #[error("the request timed out")]
2914    Timeout { key: Vec<u8>, peers: Vec<PeerId> },
2915}
2916
2917impl GetClosestPeersError {
2918    /// Gets the key for which the operation failed.
2919    pub fn key(&self) -> &Vec<u8> {
2920        match self {
2921            GetClosestPeersError::Timeout { key, .. } => key,
2922        }
2923    }
2924
2925    /// Extracts the key for which the operation failed,
2926    /// consuming the error.
2927    pub fn into_key(self) -> Vec<u8> {
2928        match self {
2929            GetClosestPeersError::Timeout { key, .. } => key,
2930        }
2931    }
2932}
2933
2934/// The result of [`Behaviour::get_providers`].
2935pub type GetProvidersResult = Result<GetProvidersOk, GetProvidersError>;
2936
2937/// The successful result of [`Behaviour::get_providers`].
2938#[derive(Debug, Clone)]
2939pub enum GetProvidersOk {
2940    FoundProviders {
2941        key: record::Key,
2942        /// The new set of providers discovered.
2943        providers: HashSet<PeerId>,
2944    },
2945    FinishedWithNoAdditionalRecord {
2946        closest_peers: Vec<PeerId>,
2947    },
2948}
2949
2950/// The error result of [`Behaviour::get_providers`].
2951#[derive(Debug, Clone, Error)]
2952pub enum GetProvidersError {
2953    #[error("the request timed out")]
2954    Timeout {
2955        key: record::Key,
2956        closest_peers: Vec<PeerId>,
2957    },
2958}
2959
2960impl GetProvidersError {
2961    /// Gets the key for which the operation failed.
2962    pub fn key(&self) -> &record::Key {
2963        match self {
2964            GetProvidersError::Timeout { key, .. } => key,
2965        }
2966    }
2967
2968    /// Extracts the key for which the operation failed,
2969    /// consuming the error.
2970    pub fn into_key(self) -> record::Key {
2971        match self {
2972            GetProvidersError::Timeout { key, .. } => key,
2973        }
2974    }
2975}
2976
2977/// The result of publishing a provider record.
2978pub type AddProviderResult = Result<AddProviderOk, AddProviderError>;
2979
2980/// The successful result of publishing a provider record.
2981#[derive(Debug, Clone)]
2982pub struct AddProviderOk {
2983    pub key: record::Key,
2984}
2985
2986/// The possible errors when publishing a provider record.
2987#[derive(Debug, Clone, Error)]
2988pub enum AddProviderError {
2989    #[error("the request timed out")]
2990    Timeout { key: record::Key },
2991}
2992
2993impl AddProviderError {
2994    /// Gets the key for which the operation failed.
2995    pub fn key(&self) -> &record::Key {
2996        match self {
2997            AddProviderError::Timeout { key, .. } => key,
2998        }
2999    }
3000
3001    /// Extracts the key for which the operation failed,
3002    pub fn into_key(self) -> record::Key {
3003        match self {
3004            AddProviderError::Timeout { key, .. } => key,
3005        }
3006    }
3007}
3008
3009impl From<kbucket::EntryView<kbucket::Key<PeerId>, Addresses>> for KadPeer {
3010    fn from(e: kbucket::EntryView<kbucket::Key<PeerId>, Addresses>) -> KadPeer {
3011        KadPeer {
3012            node_id: e.node.key.into_preimage(),
3013            multiaddrs: e.node.value.into_vec(),
3014            connection_ty: match e.status {
3015                NodeStatus::Connected => ConnectionType::Connected,
3016                NodeStatus::Disconnected => ConnectionType::NotConnected,
3017            },
3018        }
3019    }
3020}
3021
3022//////////////////////////////////////////////////////////////////////////////
3023// Internal query state
3024
3025struct QueryInner {
3026    /// The query-specific state.
3027    info: QueryInfo,
3028    /// Addresses of peers discovered during a query.
3029    addresses: FnvHashMap<PeerId, SmallVec<[Multiaddr; 8]>>,
3030    /// A map of pending requests to peers.
3031    ///
3032    /// A request is pending if the targeted peer is not currently connected
3033    /// and these requests are sent as soon as a connection to the peer is established.
3034    pending_rpcs: SmallVec<[(PeerId, HandlerIn); K_VALUE.get()]>,
3035}
3036
3037impl QueryInner {
3038    fn new(info: QueryInfo) -> Self {
3039        QueryInner {
3040            info,
3041            addresses: Default::default(),
3042            pending_rpcs: SmallVec::default(),
3043        }
3044    }
3045}
3046
3047/// The context of a [`QueryInfo::AddProvider`] query.
3048#[derive(Debug, Copy, Clone, PartialEq, Eq)]
3049pub enum AddProviderContext {
3050    /// The context is a [`Behaviour::start_providing`] operation.
3051    Publish,
3052    /// The context is periodic republishing of provider announcements
3053    /// initiated earlier via [`Behaviour::start_providing`].
3054    Republish,
3055}
3056
3057/// The context of a [`QueryInfo::PutRecord`] query.
3058#[derive(Debug, Copy, Clone, PartialEq, Eq)]
3059pub enum PutRecordContext {
3060    /// The context is a [`Behaviour::put_record`] operation.
3061    Publish,
3062    /// The context is periodic republishing of records stored
3063    /// earlier via [`Behaviour::put_record`].
3064    Republish,
3065    /// The context is periodic replication (i.e. without extending
3066    /// the record TTL) of stored records received earlier from another peer.
3067    Replicate,
3068    /// The context is a custom store operation targeting specific
3069    /// peers initiated by [`Behaviour::put_record_to`].
3070    Custom,
3071}
3072
3073/// Information about a running query.
3074#[derive(Debug, Clone)]
3075pub enum QueryInfo {
3076    /// A query initiated by [`Behaviour::bootstrap`].
3077    Bootstrap {
3078        /// The targeted peer ID.
3079        peer: PeerId,
3080        /// The remaining random peer IDs to query, one per
3081        /// bucket that still needs refreshing.
3082        ///
3083        /// This is `None` if the initial self-lookup has not
3084        /// yet completed and `Some` with an exhausted iterator
3085        /// if bootstrapping is complete.
3086        remaining: Option<vec::IntoIter<kbucket::Key<PeerId>>>,
3087        step: ProgressStep,
3088    },
3089
3090    /// A (repeated) query initiated by [`Behaviour::get_closest_peers`].
3091    GetClosestPeers {
3092        /// The key being queried (the preimage).
3093        key: Vec<u8>,
3094        /// Current index of events.
3095        step: ProgressStep,
3096    },
3097
3098    /// A (repeated) query initiated by [`Behaviour::get_providers`].
3099    GetProviders {
3100        /// The key for which to search for providers.
3101        key: record::Key,
3102        /// The number of providers found so far.
3103        providers_found: usize,
3104        /// Current index of events.
3105        step: ProgressStep,
3106    },
3107
3108    /// A (repeated) query initiated by [`Behaviour::start_providing`].
3109    AddProvider {
3110        /// The record key.
3111        key: record::Key,
3112        /// The current phase of the query.
3113        phase: AddProviderPhase,
3114        /// The execution context of the query.
3115        context: AddProviderContext,
3116    },
3117
3118    /// A (repeated) query initiated by [`Behaviour::put_record`].
3119    PutRecord {
3120        record: Record,
3121        /// The expected quorum of responses w.r.t. the replication factor.
3122        quorum: NonZeroUsize,
3123        /// The current phase of the query.
3124        phase: PutRecordPhase,
3125        /// The execution context of the query.
3126        context: PutRecordContext,
3127    },
3128
3129    /// A (repeated) query initiated by [`Behaviour::get_record`].
3130    GetRecord {
3131        /// The key to look for.
3132        key: record::Key,
3133        /// Current index of events.
3134        step: ProgressStep,
3135        /// Did we find at least one record?
3136        found_a_record: bool,
3137        /// The peers closest to the `key` that were queried but did not return a record,
3138        /// i.e. the peers that are candidates for caching the record.
3139        cache_candidates: BTreeMap<kbucket::Distance, PeerId>,
3140    },
3141}
3142
3143impl QueryInfo {
3144    /// Creates an event for a handler to issue an outgoing request in the
3145    /// context of a query.
3146    fn to_request(&self, query_id: QueryId) -> HandlerIn {
3147        match &self {
3148            QueryInfo::Bootstrap { peer, .. } => HandlerIn::FindNodeReq {
3149                key: peer.to_bytes(),
3150                query_id,
3151            },
3152            QueryInfo::GetClosestPeers { key, .. } => HandlerIn::FindNodeReq {
3153                key: key.clone(),
3154                query_id,
3155            },
3156            QueryInfo::GetProviders { key, .. } => HandlerIn::GetProvidersReq {
3157                key: key.clone(),
3158                query_id,
3159            },
3160            QueryInfo::AddProvider { key, phase, .. } => match phase {
3161                AddProviderPhase::GetClosestPeers => HandlerIn::FindNodeReq {
3162                    key: key.to_vec(),
3163                    query_id,
3164                },
3165                AddProviderPhase::AddProvider {
3166                    provider_id,
3167                    external_addresses,
3168                    ..
3169                } => HandlerIn::AddProvider {
3170                    key: key.clone(),
3171                    provider: crate::protocol::KadPeer {
3172                        node_id: *provider_id,
3173                        multiaddrs: external_addresses.clone(),
3174                        connection_ty: crate::protocol::ConnectionType::Connected,
3175                    },
3176                    query_id,
3177                },
3178            },
3179            QueryInfo::GetRecord { key, .. } => HandlerIn::GetRecord {
3180                key: key.clone(),
3181                query_id,
3182            },
3183            QueryInfo::PutRecord { record, phase, .. } => match phase {
3184                PutRecordPhase::GetClosestPeers => HandlerIn::FindNodeReq {
3185                    key: record.key.to_vec(),
3186                    query_id,
3187                },
3188                PutRecordPhase::PutRecord { .. } => HandlerIn::PutRecord {
3189                    record: record.clone(),
3190                    query_id,
3191                },
3192            },
3193        }
3194    }
3195}
3196
3197/// The phases of a [`QueryInfo::AddProvider`] query.
3198#[derive(Debug, Clone)]
3199pub enum AddProviderPhase {
3200    /// The query is searching for the closest nodes to the record key.
3201    GetClosestPeers,
3202
3203    /// The query advertises the local node as a provider for the key to
3204    /// the closest nodes to the key.
3205    AddProvider {
3206        /// The local peer ID that is advertised as a provider.
3207        provider_id: PeerId,
3208        /// The external addresses of the provider being advertised.
3209        external_addresses: Vec<Multiaddr>,
3210        /// Query statistics from the finished `GetClosestPeers` phase.
3211        get_closest_peers_stats: QueryStats,
3212    },
3213}
3214
3215/// The phases of a [`QueryInfo::PutRecord`] query.
3216#[derive(Debug, Clone, PartialEq, Eq)]
3217pub enum PutRecordPhase {
3218    /// The query is searching for the closest nodes to the record key.
3219    GetClosestPeers,
3220
3221    /// The query is replicating the record to the closest nodes to the key.
3222    PutRecord {
3223        /// A list of peers the given record has been successfully replicated to.
3224        success: Vec<PeerId>,
3225        /// Query statistics from the finished `GetClosestPeers` phase.
3226        get_closest_peers_stats: QueryStats,
3227    },
3228}
3229
3230/// A mutable reference to a running query.
3231pub struct QueryMut<'a> {
3232    query: &'a mut Query<QueryInner>,
3233}
3234
3235impl<'a> QueryMut<'a> {
3236    pub fn id(&self) -> QueryId {
3237        self.query.id()
3238    }
3239
3240    /// Gets information about the type and state of the query.
3241    pub fn info(&self) -> &QueryInfo {
3242        &self.query.inner.info
3243    }
3244
3245    /// Gets execution statistics about the query.
3246    ///
3247    /// For a multi-phase query such as `put_record`, these are the
3248    /// statistics of the current phase.
3249    pub fn stats(&self) -> &QueryStats {
3250        self.query.stats()
3251    }
3252
3253    /// Finishes the query asap, without waiting for the
3254    /// regular termination conditions.
3255    pub fn finish(&mut self) {
3256        self.query.finish()
3257    }
3258}
3259
3260/// An immutable reference to a running query.
3261pub struct QueryRef<'a> {
3262    query: &'a Query<QueryInner>,
3263}
3264
3265impl<'a> QueryRef<'a> {
3266    pub fn id(&self) -> QueryId {
3267        self.query.id()
3268    }
3269
3270    /// Gets information about the type and state of the query.
3271    pub fn info(&self) -> &QueryInfo {
3272        &self.query.inner.info
3273    }
3274
3275    /// Gets execution statistics about the query.
3276    ///
3277    /// For a multi-phase query such as `put_record`, these are the
3278    /// statistics of the current phase.
3279    pub fn stats(&self) -> &QueryStats {
3280        self.query.stats()
3281    }
3282}
3283
3284/// An operation failed to due no known peers in the routing table.
3285#[derive(Debug, Clone)]
3286pub struct NoKnownPeers();
3287
3288impl fmt::Display for NoKnownPeers {
3289    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3290        write!(f, "No known peers.")
3291    }
3292}
3293
3294impl std::error::Error for NoKnownPeers {}
3295
3296/// The possible outcomes of [`Behaviour::add_address`].
3297#[derive(Debug, Clone, Copy, PartialEq, Eq)]
3298pub enum RoutingUpdate {
3299    /// The given peer and address has been added to the routing
3300    /// table.
3301    Success,
3302    /// The peer and address is pending insertion into
3303    /// the routing table, if a disconnected peer fails
3304    /// to respond. If the given peer and address ends up
3305    /// in the routing table, [`Event::RoutingUpdated`]
3306    /// is eventually emitted.
3307    Pending,
3308    /// The routing table update failed, either because the
3309    /// corresponding bucket for the peer is full and the
3310    /// pending slot(s) are occupied, or because the given
3311    /// peer ID is deemed invalid (e.g. refers to the local
3312    /// peer ID).
3313    Failed,
3314}
3315
3316#[derive(PartialEq, Copy, Clone, Debug)]
3317pub enum Mode {
3318    Client,
3319    Server,
3320}
3321
3322impl fmt::Display for Mode {
3323    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3324        match self {
3325            Mode::Client => write!(f, "client"),
3326            Mode::Server => write!(f, "server"),
3327        }
3328    }
3329}
3330
3331fn to_comma_separated_list<T>(confirmed_external_addresses: &[T]) -> String
3332where
3333    T: ToString,
3334{
3335    confirmed_external_addresses
3336        .iter()
3337        .map(|addr| addr.to_string())
3338        .collect::<Vec<_>>()
3339        .join(", ")
3340}