libp2p_kad/behaviour.rs
1// Copyright 2018 Parity Technologies (UK) Ltd.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21//! Implementation of the `Kademlia` network behaviour.
22
23mod test;
24
25use crate::addresses::Addresses;
26use crate::handler::{Handler, HandlerEvent, HandlerIn, RequestId};
27use crate::jobs::*;
28use crate::kbucket::{self, Distance, KBucketsTable, NodeStatus};
29use crate::protocol::{ConnectionType, KadPeer, ProtocolConfig};
30use crate::query::{Query, QueryConfig, QueryId, QueryPool, QueryPoolState};
31use crate::record::{
32 self,
33 store::{self, RecordStore},
34 ProviderRecord, Record,
35};
36use crate::K_VALUE;
37use fnv::{FnvHashMap, FnvHashSet};
38use instant::Instant;
39use libp2p_core::{ConnectedPoint, Endpoint, Multiaddr};
40use libp2p_identity::PeerId;
41use libp2p_swarm::behaviour::{
42 AddressChange, ConnectionClosed, ConnectionEstablished, DialFailure, FromSwarm,
43};
44use libp2p_swarm::{
45 dial_opts::{self, DialOpts},
46 ConnectionDenied, ConnectionHandler, ConnectionId, DialError, ExternalAddresses,
47 ListenAddresses, NetworkBehaviour, NotifyHandler, StreamProtocol, THandler, THandlerInEvent,
48 THandlerOutEvent, ToSwarm,
49};
50use smallvec::SmallVec;
51use std::collections::{BTreeMap, HashMap, HashSet, VecDeque};
52use std::fmt;
53use std::num::NonZeroUsize;
54use std::task::{Context, Poll, Waker};
55use std::time::Duration;
56use std::vec;
57use thiserror::Error;
58use tracing::Level;
59
60pub use crate::query::QueryStats;
61
62/// `Behaviour` is a `NetworkBehaviour` that implements the libp2p
63/// Kademlia protocol.
64pub struct Behaviour<TStore> {
65 /// The Kademlia routing table.
66 kbuckets: KBucketsTable<kbucket::Key<PeerId>, Addresses>,
67
68 /// The k-bucket insertion strategy.
69 kbucket_inserts: BucketInserts,
70
71 /// Configuration of the wire protocol.
72 protocol_config: ProtocolConfig,
73
74 /// Configuration of [`RecordStore`] filtering.
75 record_filtering: StoreInserts,
76
77 /// The currently active (i.e. in-progress) queries.
78 queries: QueryPool<QueryInner>,
79
80 /// The currently connected peers.
81 ///
82 /// This is a superset of the connected peers currently in the routing table.
83 connected_peers: FnvHashSet<PeerId>,
84
85 /// Periodic job for re-publication of provider records for keys
86 /// provided by the local node.
87 add_provider_job: Option<AddProviderJob>,
88
89 /// Periodic job for (re-)replication and (re-)publishing of
90 /// regular (value-)records.
91 put_record_job: Option<PutRecordJob>,
92
93 /// The TTL of regular (value-)records.
94 record_ttl: Option<Duration>,
95
96 /// The TTL of provider records.
97 provider_record_ttl: Option<Duration>,
98
99 /// Queued events to return when the behaviour is being polled.
100 queued_events: VecDeque<ToSwarm<Event, HandlerIn>>,
101
102 listen_addresses: ListenAddresses,
103
104 external_addresses: ExternalAddresses,
105
106 connections: HashMap<ConnectionId, PeerId>,
107
108 /// See [`Config::caching`].
109 caching: Caching,
110
111 local_peer_id: PeerId,
112
113 mode: Mode,
114 auto_mode: bool,
115 no_events_waker: Option<Waker>,
116
117 /// The record storage.
118 store: TStore,
119}
120
121/// The configurable strategies for the insertion of peers
122/// and their addresses into the k-buckets of the Kademlia
123/// routing table.
124#[derive(Copy, Clone, Debug, PartialEq, Eq)]
125pub enum BucketInserts {
126 /// Whenever a connection to a peer is established as a
127 /// result of a dialing attempt and that peer is not yet
128 /// in the routing table, it is inserted as long as there
129 /// is a free slot in the corresponding k-bucket. If the
130 /// k-bucket is full but still has a free pending slot,
131 /// it may be inserted into the routing table at a later time if an unresponsive
132 /// disconnected peer is evicted from the bucket.
133 OnConnected,
134 /// New peers and addresses are only added to the routing table via
135 /// explicit calls to [`Behaviour::add_address`].
136 ///
137 /// > **Note**: Even though peers can only get into the
138 /// > routing table as a result of [`Behaviour::add_address`],
139 /// > routing table entries are still updated as peers
140 /// > connect and disconnect (i.e. the order of the entries
141 /// > as well as the network addresses).
142 Manual,
143}
144
145/// The configurable filtering strategies for the acceptance of
146/// incoming records.
147///
148/// This can be used for e.g. signature verification or validating
149/// the accompanying [`Key`].
150///
151/// [`Key`]: crate::record::Key
152#[derive(Copy, Clone, Debug, PartialEq, Eq)]
153pub enum StoreInserts {
154 /// Whenever a (provider) record is received,
155 /// the record is forwarded immediately to the [`RecordStore`].
156 Unfiltered,
157 /// Whenever a (provider) record is received, an event is emitted.
158 /// Provider records generate a [`InboundRequest::AddProvider`] under [`Event::InboundRequest`],
159 /// normal records generate a [`InboundRequest::PutRecord`] under [`Event::InboundRequest`].
160 ///
161 /// When deemed valid, a (provider) record needs to be explicitly stored in
162 /// the [`RecordStore`] via [`RecordStore::put`] or [`RecordStore::add_provider`],
163 /// whichever is applicable. A mutable reference to the [`RecordStore`] can
164 /// be retrieved via [`Behaviour::store_mut`].
165 FilterBoth,
166}
167
168/// The configuration for the `Kademlia` behaviour.
169///
170/// The configuration is consumed by [`Behaviour::new`].
171#[derive(Debug, Clone)]
172pub struct Config {
173 kbucket_pending_timeout: Duration,
174 query_config: QueryConfig,
175 protocol_config: ProtocolConfig,
176 record_ttl: Option<Duration>,
177 record_replication_interval: Option<Duration>,
178 record_publication_interval: Option<Duration>,
179 record_filtering: StoreInserts,
180 provider_record_ttl: Option<Duration>,
181 provider_publication_interval: Option<Duration>,
182 kbucket_inserts: BucketInserts,
183 caching: Caching,
184}
185
186impl Default for Config {
187 fn default() -> Self {
188 Config {
189 kbucket_pending_timeout: Duration::from_secs(60),
190 query_config: QueryConfig::default(),
191 protocol_config: Default::default(),
192 record_ttl: Some(Duration::from_secs(36 * 60 * 60)),
193 record_replication_interval: Some(Duration::from_secs(60 * 60)),
194 record_publication_interval: Some(Duration::from_secs(24 * 60 * 60)),
195 record_filtering: StoreInserts::Unfiltered,
196 provider_publication_interval: Some(Duration::from_secs(12 * 60 * 60)),
197 provider_record_ttl: Some(Duration::from_secs(24 * 60 * 60)),
198 kbucket_inserts: BucketInserts::OnConnected,
199 caching: Caching::Enabled { max_peers: 1 },
200 }
201 }
202}
203
204/// The configuration for Kademlia "write-back" caching after successful
205/// lookups via [`Behaviour::get_record`].
206#[derive(Debug, Clone)]
207pub enum Caching {
208 /// Caching is disabled and the peers closest to records being looked up
209 /// that do not return a record are not tracked, i.e.
210 /// [`GetRecordOk::FinishedWithNoAdditionalRecord`] is always empty.
211 Disabled,
212 /// Up to `max_peers` peers not returning a record that are closest to the key
213 /// being looked up are tracked and returned in [`GetRecordOk::FinishedWithNoAdditionalRecord`].
214 /// The write-back operation must be performed explicitly, if
215 /// desired and after choosing a record from the results, via [`Behaviour::put_record_to`].
216 Enabled { max_peers: u16 },
217}
218
219impl Config {
220 /// Sets custom protocol names.
221 ///
222 /// Kademlia nodes only communicate with other nodes using the same protocol
223 /// name. Using custom name(s) therefore allows to segregate the DHT from
224 /// others, if that is desired.
225 ///
226 /// More than one protocol name can be supplied. In this case the node will
227 /// be able to talk to other nodes supporting any of the provided names.
228 /// Multiple names must be used with caution to avoid network partitioning.
229 pub fn set_protocol_names(&mut self, names: Vec<StreamProtocol>) -> &mut Self {
230 self.protocol_config.set_protocol_names(names);
231 self
232 }
233
234 /// Sets the timeout for a single query.
235 ///
236 /// > **Note**: A single query usually comprises at least as many requests
237 /// > as the replication factor, i.e. this is not a request timeout.
238 ///
239 /// The default is 60 seconds.
240 pub fn set_query_timeout(&mut self, timeout: Duration) -> &mut Self {
241 self.query_config.timeout = timeout;
242 self
243 }
244
245 /// Sets the replication factor to use.
246 ///
247 /// The replication factor determines to how many closest peers
248 /// a record is replicated. The default is [`K_VALUE`].
249 pub fn set_replication_factor(&mut self, replication_factor: NonZeroUsize) -> &mut Self {
250 self.query_config.replication_factor = replication_factor;
251 self
252 }
253
254 /// Sets the allowed level of parallelism for iterative queries.
255 ///
256 /// The `α` parameter in the Kademlia paper. The maximum number of peers
257 /// that an iterative query is allowed to wait for in parallel while
258 /// iterating towards the closest nodes to a target. Defaults to
259 /// `ALPHA_VALUE`.
260 ///
261 /// This only controls the level of parallelism of an iterative query, not
262 /// the level of parallelism of a query to a fixed set of peers.
263 ///
264 /// When used with [`Config::disjoint_query_paths`] it equals
265 /// the amount of disjoint paths used.
266 pub fn set_parallelism(&mut self, parallelism: NonZeroUsize) -> &mut Self {
267 self.query_config.parallelism = parallelism;
268 self
269 }
270
271 /// Require iterative queries to use disjoint paths for increased resiliency
272 /// in the presence of potentially adversarial nodes.
273 ///
274 /// When enabled the number of disjoint paths used equals the configured
275 /// parallelism.
276 ///
277 /// See the S/Kademlia paper for more information on the high level design
278 /// as well as its security improvements.
279 pub fn disjoint_query_paths(&mut self, enabled: bool) -> &mut Self {
280 self.query_config.disjoint_query_paths = enabled;
281 self
282 }
283
284 /// Sets the TTL for stored records.
285 ///
286 /// The TTL should be significantly longer than the (re-)publication
287 /// interval, to avoid premature expiration of records. The default is 36
288 /// hours.
289 ///
290 /// `None` means records never expire.
291 ///
292 /// Does not apply to provider records.
293 pub fn set_record_ttl(&mut self, record_ttl: Option<Duration>) -> &mut Self {
294 self.record_ttl = record_ttl;
295 self
296 }
297
298 /// Sets whether or not records should be filtered before being stored.
299 ///
300 /// See [`StoreInserts`] for the different values.
301 /// Defaults to [`StoreInserts::Unfiltered`].
302 pub fn set_record_filtering(&mut self, filtering: StoreInserts) -> &mut Self {
303 self.record_filtering = filtering;
304 self
305 }
306
307 /// Sets the (re-)replication interval for stored records.
308 ///
309 /// Periodic replication of stored records ensures that the records
310 /// are always replicated to the available nodes closest to the key in the
311 /// context of DHT topology changes (i.e. nodes joining and leaving), thus
312 /// ensuring persistence until the record expires. Replication does not
313 /// prolong the regular lifetime of a record (for otherwise it would live
314 /// forever regardless of the configured TTL). The expiry of a record
315 /// is only extended through re-publication.
316 ///
317 /// This interval should be significantly shorter than the publication
318 /// interval, to ensure persistence between re-publications. The default
319 /// is 1 hour.
320 ///
321 /// `None` means that stored records are never re-replicated.
322 ///
323 /// Does not apply to provider records.
324 pub fn set_replication_interval(&mut self, interval: Option<Duration>) -> &mut Self {
325 self.record_replication_interval = interval;
326 self
327 }
328
329 /// Sets the (re-)publication interval of stored records.
330 ///
331 /// Records persist in the DHT until they expire. By default, published
332 /// records are re-published in regular intervals for as long as the record
333 /// exists in the local storage of the original publisher, thereby extending
334 /// the records lifetime.
335 ///
336 /// This interval should be significantly shorter than the record TTL, to
337 /// ensure records do not expire prematurely. The default is 24 hours.
338 ///
339 /// `None` means that stored records are never automatically re-published.
340 ///
341 /// Does not apply to provider records.
342 pub fn set_publication_interval(&mut self, interval: Option<Duration>) -> &mut Self {
343 self.record_publication_interval = interval;
344 self
345 }
346
347 /// Sets the TTL for provider records.
348 ///
349 /// `None` means that stored provider records never expire.
350 ///
351 /// Must be significantly larger than the provider publication interval.
352 pub fn set_provider_record_ttl(&mut self, ttl: Option<Duration>) -> &mut Self {
353 self.provider_record_ttl = ttl;
354 self
355 }
356
357 /// Sets the interval at which provider records for keys provided
358 /// by the local node are re-published.
359 ///
360 /// `None` means that stored provider records are never automatically
361 /// re-published.
362 ///
363 /// Must be significantly less than the provider record TTL.
364 pub fn set_provider_publication_interval(&mut self, interval: Option<Duration>) -> &mut Self {
365 self.provider_publication_interval = interval;
366 self
367 }
368
369 /// Modifies the maximum allowed size of individual Kademlia packets.
370 ///
371 /// It might be necessary to increase this value if trying to put large
372 /// records.
373 pub fn set_max_packet_size(&mut self, size: usize) -> &mut Self {
374 self.protocol_config.set_max_packet_size(size);
375 self
376 }
377
378 /// Sets the k-bucket insertion strategy for the Kademlia routing table.
379 pub fn set_kbucket_inserts(&mut self, inserts: BucketInserts) -> &mut Self {
380 self.kbucket_inserts = inserts;
381 self
382 }
383
384 /// Sets the [`Caching`] strategy to use for successful lookups.
385 ///
386 /// The default is [`Caching::Enabled`] with a `max_peers` of 1.
387 /// Hence, with default settings and a lookup quorum of 1, a successful lookup
388 /// will result in the record being cached at the closest node to the key that
389 /// did not return the record, i.e. the standard Kademlia behaviour.
390 pub fn set_caching(&mut self, c: Caching) -> &mut Self {
391 self.caching = c;
392 self
393 }
394}
395
396impl<TStore> Behaviour<TStore>
397where
398 TStore: RecordStore + Send + 'static,
399{
400 /// Creates a new `Kademlia` network behaviour with a default configuration.
401 pub fn new(id: PeerId, store: TStore) -> Self {
402 Self::with_config(id, store, Default::default())
403 }
404
405 /// Get the protocol name of this kademlia instance.
406 pub fn protocol_names(&self) -> &[StreamProtocol] {
407 self.protocol_config.protocol_names()
408 }
409
410 /// Creates a new `Kademlia` network behaviour with the given configuration.
411 pub fn with_config(id: PeerId, store: TStore, config: Config) -> Self {
412 let local_key = kbucket::Key::from(id);
413
414 let put_record_job = config
415 .record_replication_interval
416 .or(config.record_publication_interval)
417 .map(|interval| {
418 PutRecordJob::new(
419 id,
420 interval,
421 config.record_publication_interval,
422 config.record_ttl,
423 )
424 });
425
426 let add_provider_job = config
427 .provider_publication_interval
428 .map(AddProviderJob::new);
429
430 Behaviour {
431 store,
432 caching: config.caching,
433 kbuckets: KBucketsTable::new(local_key, config.kbucket_pending_timeout),
434 kbucket_inserts: config.kbucket_inserts,
435 protocol_config: config.protocol_config,
436 record_filtering: config.record_filtering,
437 queued_events: VecDeque::with_capacity(config.query_config.replication_factor.get()),
438 listen_addresses: Default::default(),
439 queries: QueryPool::new(config.query_config),
440 connected_peers: Default::default(),
441 add_provider_job,
442 put_record_job,
443 record_ttl: config.record_ttl,
444 provider_record_ttl: config.provider_record_ttl,
445 external_addresses: Default::default(),
446 local_peer_id: id,
447 connections: Default::default(),
448 mode: Mode::Client,
449 auto_mode: true,
450 no_events_waker: None,
451 }
452 }
453
454 /// Gets an iterator over immutable references to all running queries.
455 pub fn iter_queries(&self) -> impl Iterator<Item = QueryRef<'_>> {
456 self.queries.iter().filter_map(|query| {
457 if !query.is_finished() {
458 Some(QueryRef { query })
459 } else {
460 None
461 }
462 })
463 }
464
465 /// Gets an iterator over mutable references to all running queries.
466 pub fn iter_queries_mut(&mut self) -> impl Iterator<Item = QueryMut<'_>> {
467 self.queries.iter_mut().filter_map(|query| {
468 if !query.is_finished() {
469 Some(QueryMut { query })
470 } else {
471 None
472 }
473 })
474 }
475
476 /// Gets an immutable reference to a running query, if it exists.
477 pub fn query(&self, id: &QueryId) -> Option<QueryRef<'_>> {
478 self.queries.get(id).and_then(|query| {
479 if !query.is_finished() {
480 Some(QueryRef { query })
481 } else {
482 None
483 }
484 })
485 }
486
487 /// Gets a mutable reference to a running query, if it exists.
488 pub fn query_mut<'a>(&'a mut self, id: &QueryId) -> Option<QueryMut<'a>> {
489 self.queries.get_mut(id).and_then(|query| {
490 if !query.is_finished() {
491 Some(QueryMut { query })
492 } else {
493 None
494 }
495 })
496 }
497
498 /// Adds a known listen address of a peer participating in the DHT to the
499 /// routing table.
500 ///
501 /// Explicitly adding addresses of peers serves two purposes:
502 ///
503 /// 1. In order for a node to join the DHT, it must know about at least
504 /// one other node of the DHT.
505 ///
506 /// 2. When a remote peer initiates a connection and that peer is not
507 /// yet in the routing table, the `Kademlia` behaviour must be
508 /// informed of an address on which that peer is listening for
509 /// connections before it can be added to the routing table
510 /// from where it can subsequently be discovered by all peers
511 /// in the DHT.
512 ///
513 /// If the routing table has been updated as a result of this operation,
514 /// a [`Event::RoutingUpdated`] event is emitted.
515 pub fn add_address(&mut self, peer: &PeerId, address: Multiaddr) -> RoutingUpdate {
516 // ensuring address is a fully-qualified /p2p multiaddr
517 let Ok(address) = address.with_p2p(*peer) else {
518 return RoutingUpdate::Failed;
519 };
520 let key = kbucket::Key::from(*peer);
521 match self.kbuckets.entry(&key) {
522 kbucket::Entry::Present(mut entry, _) => {
523 if entry.value().insert(address) {
524 self.queued_events
525 .push_back(ToSwarm::GenerateEvent(Event::RoutingUpdated {
526 peer: *peer,
527 is_new_peer: false,
528 addresses: entry.value().clone(),
529 old_peer: None,
530 bucket_range: self
531 .kbuckets
532 .bucket(&key)
533 .map(|b| b.range())
534 .expect("Not kbucket::Entry::SelfEntry."),
535 }))
536 }
537 RoutingUpdate::Success
538 }
539 kbucket::Entry::Pending(mut entry, _) => {
540 entry.value().insert(address);
541 RoutingUpdate::Pending
542 }
543 kbucket::Entry::Absent(entry) => {
544 let addresses = Addresses::new(address);
545 let status = if self.connected_peers.contains(peer) {
546 NodeStatus::Connected
547 } else {
548 NodeStatus::Disconnected
549 };
550 match entry.insert(addresses.clone(), status) {
551 kbucket::InsertResult::Inserted => {
552 self.queued_events.push_back(ToSwarm::GenerateEvent(
553 Event::RoutingUpdated {
554 peer: *peer,
555 is_new_peer: true,
556 addresses,
557 old_peer: None,
558 bucket_range: self
559 .kbuckets
560 .bucket(&key)
561 .map(|b| b.range())
562 .expect("Not kbucket::Entry::SelfEntry."),
563 },
564 ));
565 RoutingUpdate::Success
566 }
567 kbucket::InsertResult::Full => {
568 tracing::debug!(%peer, "Bucket full. Peer not added to routing table");
569 RoutingUpdate::Failed
570 }
571 kbucket::InsertResult::Pending { disconnected } => {
572 self.queued_events.push_back(ToSwarm::Dial {
573 opts: DialOpts::peer_id(disconnected.into_preimage())
574 .condition(dial_opts::PeerCondition::NotDialing)
575 .build(),
576 });
577 RoutingUpdate::Pending
578 }
579 }
580 }
581 kbucket::Entry::SelfEntry => RoutingUpdate::Failed,
582 }
583 }
584
585 /// Removes an address of a peer from the routing table.
586 ///
587 /// If the given address is the last address of the peer in the
588 /// routing table, the peer is removed from the routing table
589 /// and `Some` is returned with a view of the removed entry.
590 /// The same applies if the peer is currently pending insertion
591 /// into the routing table.
592 ///
593 /// If the given peer or address is not in the routing table,
594 /// this is a no-op.
595 pub fn remove_address(
596 &mut self,
597 peer: &PeerId,
598 address: &Multiaddr,
599 ) -> Option<kbucket::EntryView<kbucket::Key<PeerId>, Addresses>> {
600 let address = &address.to_owned().with_p2p(*peer).ok()?;
601 let key = kbucket::Key::from(*peer);
602 match self.kbuckets.entry(&key) {
603 kbucket::Entry::Present(mut entry, _) => {
604 if entry.value().remove(address).is_err() {
605 Some(entry.remove()) // it is the last address, thus remove the peer.
606 } else {
607 None
608 }
609 }
610 kbucket::Entry::Pending(mut entry, _) => {
611 if entry.value().remove(address).is_err() {
612 Some(entry.remove()) // it is the last address, thus remove the peer.
613 } else {
614 None
615 }
616 }
617 kbucket::Entry::Absent(..) | kbucket::Entry::SelfEntry => None,
618 }
619 }
620
621 /// Removes a peer from the routing table.
622 ///
623 /// Returns `None` if the peer was not in the routing table,
624 /// not even pending insertion.
625 pub fn remove_peer(
626 &mut self,
627 peer: &PeerId,
628 ) -> Option<kbucket::EntryView<kbucket::Key<PeerId>, Addresses>> {
629 let key = kbucket::Key::from(*peer);
630 match self.kbuckets.entry(&key) {
631 kbucket::Entry::Present(entry, _) => Some(entry.remove()),
632 kbucket::Entry::Pending(entry, _) => Some(entry.remove()),
633 kbucket::Entry::Absent(..) | kbucket::Entry::SelfEntry => None,
634 }
635 }
636
637 /// Returns an iterator over all non-empty buckets in the routing table.
638 pub fn kbuckets(
639 &mut self,
640 ) -> impl Iterator<Item = kbucket::KBucketRef<'_, kbucket::Key<PeerId>, Addresses>> {
641 self.kbuckets.iter().filter(|b| !b.is_empty())
642 }
643
644 /// Returns the k-bucket for the distance to the given key.
645 ///
646 /// Returns `None` if the given key refers to the local key.
647 pub fn kbucket<K>(
648 &mut self,
649 key: K,
650 ) -> Option<kbucket::KBucketRef<'_, kbucket::Key<PeerId>, Addresses>>
651 where
652 K: Into<kbucket::Key<K>> + Clone,
653 {
654 self.kbuckets.bucket(&key.into())
655 }
656
657 /// Initiates an iterative query for the closest peers to the given key.
658 ///
659 /// The result of the query is delivered in a
660 /// [`Event::OutboundQueryProgressed{QueryResult::GetClosestPeers}`].
661 pub fn get_closest_peers<K>(&mut self, key: K) -> QueryId
662 where
663 K: Into<kbucket::Key<K>> + Into<Vec<u8>> + Clone,
664 {
665 let target: kbucket::Key<K> = key.clone().into();
666 let key: Vec<u8> = key.into();
667 let info = QueryInfo::GetClosestPeers {
668 key,
669 step: ProgressStep::first(),
670 };
671 let peer_keys: Vec<kbucket::Key<PeerId>> = self.kbuckets.closest_keys(&target).collect();
672 let inner = QueryInner::new(info);
673 self.queries.add_iter_closest(target, peer_keys, inner)
674 }
675
676 /// Returns closest peers to the given key; takes peers from local routing table only.
677 pub fn get_closest_local_peers<'a, K: Clone>(
678 &'a mut self,
679 key: &'a kbucket::Key<K>,
680 ) -> impl Iterator<Item = kbucket::Key<PeerId>> + 'a {
681 self.kbuckets.closest_keys(key)
682 }
683
684 /// Performs a lookup for a record in the DHT.
685 ///
686 /// The result of this operation is delivered in a
687 /// [`Event::OutboundQueryProgressed{QueryResult::GetRecord}`].
688 pub fn get_record(&mut self, key: record::Key) -> QueryId {
689 let record = if let Some(record) = self.store.get(&key) {
690 if record.is_expired(Instant::now()) {
691 self.store.remove(&key);
692 None
693 } else {
694 Some(PeerRecord {
695 peer: None,
696 record: record.into_owned(),
697 })
698 }
699 } else {
700 None
701 };
702
703 let step = ProgressStep::first();
704
705 let target = kbucket::Key::new(key.clone());
706 let info = if record.is_some() {
707 QueryInfo::GetRecord {
708 key,
709 step: step.next(),
710 found_a_record: true,
711 cache_candidates: BTreeMap::new(),
712 }
713 } else {
714 QueryInfo::GetRecord {
715 key,
716 step: step.clone(),
717 found_a_record: false,
718 cache_candidates: BTreeMap::new(),
719 }
720 };
721 let peers = self.kbuckets.closest_keys(&target);
722 let inner = QueryInner::new(info);
723 let id = self.queries.add_iter_closest(target.clone(), peers, inner);
724
725 // No queries were actually done for the results yet.
726 let stats = QueryStats::empty();
727
728 if let Some(record) = record {
729 self.queued_events
730 .push_back(ToSwarm::GenerateEvent(Event::OutboundQueryProgressed {
731 id,
732 result: QueryResult::GetRecord(Ok(GetRecordOk::FoundRecord(record))),
733 step,
734 stats,
735 }));
736 }
737
738 id
739 }
740
741 /// Stores a record in the DHT, locally as well as at the nodes
742 /// closest to the key as per the xor distance metric.
743 ///
744 /// Returns `Ok` if a record has been stored locally, providing the
745 /// `QueryId` of the initial query that replicates the record in the DHT.
746 /// The result of the query is eventually reported as a
747 /// [`Event::OutboundQueryProgressed{QueryResult::PutRecord}`].
748 ///
749 /// The record is always stored locally with the given expiration. If the record's
750 /// expiration is `None`, the common case, it does not expire in local storage
751 /// but is still replicated with the configured record TTL. To remove the record
752 /// locally and stop it from being re-published in the DHT, see [`Behaviour::remove_record`].
753 ///
754 /// After the initial publication of the record, it is subject to (re-)replication
755 /// and (re-)publication as per the configured intervals. Periodic (re-)publication
756 /// does not update the record's expiration in local storage, thus a given record
757 /// with an explicit expiration will always expire at that instant and until then
758 /// is subject to regular (re-)replication and (re-)publication.
759 pub fn put_record(
760 &mut self,
761 mut record: Record,
762 quorum: Quorum,
763 ) -> Result<QueryId, store::Error> {
764 record.publisher = Some(*self.kbuckets.local_key().preimage());
765 self.store.put(record.clone())?;
766 record.expires = record
767 .expires
768 .or_else(|| self.record_ttl.map(|ttl| Instant::now() + ttl));
769 let quorum = quorum.eval(self.queries.config().replication_factor);
770 let target = kbucket::Key::new(record.key.clone());
771 let peers = self.kbuckets.closest_keys(&target);
772 let context = PutRecordContext::Publish;
773 let info = QueryInfo::PutRecord {
774 context,
775 record,
776 quorum,
777 phase: PutRecordPhase::GetClosestPeers,
778 };
779 let inner = QueryInner::new(info);
780 Ok(self.queries.add_iter_closest(target.clone(), peers, inner))
781 }
782
783 /// Stores a record at specific peers, without storing it locally.
784 ///
785 /// The given [`Quorum`] is understood in the context of the total
786 /// number of distinct peers given.
787 ///
788 /// If the record's expiration is `None`, the configured record TTL is used.
789 ///
790 /// > **Note**: This is not a regular Kademlia DHT operation. It needs to be
791 /// > used to selectively update or store a record to specific peers
792 /// > for the purpose of e.g. making sure these peers have the latest
793 /// > "version" of a record or to "cache" a record at further peers
794 /// > to increase the lookup success rate on the DHT for other peers.
795 /// >
796 /// > In particular, there is no automatic storing of records performed, and this
797 /// > method must be used to ensure the standard Kademlia
798 /// > procedure of "caching" (i.e. storing) a found record at the closest
799 /// > node to the key that _did not_ return it.
800 pub fn put_record_to<I>(&mut self, mut record: Record, peers: I, quorum: Quorum) -> QueryId
801 where
802 I: ExactSizeIterator<Item = PeerId>,
803 {
804 let quorum = if peers.len() > 0 {
805 quorum.eval(NonZeroUsize::new(peers.len()).expect("> 0"))
806 } else {
807 // If no peers are given, we just let the query fail immediately
808 // due to the fact that the quorum must be at least one, instead of
809 // introducing a new kind of error.
810 NonZeroUsize::new(1).expect("1 > 0")
811 };
812 record.expires = record
813 .expires
814 .or_else(|| self.record_ttl.map(|ttl| Instant::now() + ttl));
815 let context = PutRecordContext::Custom;
816 let info = QueryInfo::PutRecord {
817 context,
818 record,
819 quorum,
820 phase: PutRecordPhase::PutRecord {
821 success: Vec::new(),
822 get_closest_peers_stats: QueryStats::empty(),
823 },
824 };
825 let inner = QueryInner::new(info);
826 self.queries.add_fixed(peers, inner)
827 }
828
829 /// Removes the record with the given key from _local_ storage,
830 /// if the local node is the publisher of the record.
831 ///
832 /// Has no effect if a record for the given key is stored locally but
833 /// the local node is not a publisher of the record.
834 ///
835 /// This is a _local_ operation. However, it also has the effect that
836 /// the record will no longer be periodically re-published, allowing the
837 /// record to eventually expire throughout the DHT.
838 pub fn remove_record(&mut self, key: &record::Key) {
839 if let Some(r) = self.store.get(key) {
840 if r.publisher.as_ref() == Some(self.kbuckets.local_key().preimage()) {
841 self.store.remove(key)
842 }
843 }
844 }
845
846 /// Gets a mutable reference to the record store.
847 pub fn store_mut(&mut self) -> &mut TStore {
848 &mut self.store
849 }
850
851 /// Bootstraps the local node to join the DHT.
852 ///
853 /// Bootstrapping is a multi-step operation that starts with a lookup of the local node's
854 /// own ID in the DHT. This introduces the local node to the other nodes
855 /// in the DHT and populates its routing table with the closest neighbours.
856 ///
857 /// Subsequently, all buckets farther from the bucket of the closest neighbour are
858 /// refreshed by initiating an additional bootstrapping query for each such
859 /// bucket with random keys.
860 ///
861 /// Returns `Ok` if bootstrapping has been initiated with a self-lookup, providing the
862 /// `QueryId` for the entire bootstrapping process. The progress of bootstrapping is
863 /// reported via [`Event::OutboundQueryProgressed{QueryResult::Bootstrap}`] events,
864 /// with one such event per bootstrapping query.
865 ///
866 /// Returns `Err` if bootstrapping is impossible due an empty routing table.
867 ///
868 /// > **Note**: Bootstrapping requires at least one node of the DHT to be known.
869 /// > See [`Behaviour::add_address`].
870 pub fn bootstrap(&mut self) -> Result<QueryId, NoKnownPeers> {
871 let local_key = self.kbuckets.local_key().clone();
872 let info = QueryInfo::Bootstrap {
873 peer: *local_key.preimage(),
874 remaining: None,
875 step: ProgressStep::first(),
876 };
877 let peers = self.kbuckets.closest_keys(&local_key).collect::<Vec<_>>();
878 if peers.is_empty() {
879 Err(NoKnownPeers())
880 } else {
881 let inner = QueryInner::new(info);
882 Ok(self.queries.add_iter_closest(local_key, peers, inner))
883 }
884 }
885
886 /// Establishes the local node as a provider of a value for the given key.
887 ///
888 /// This operation publishes a provider record with the given key and
889 /// identity of the local node to the peers closest to the key, thus establishing
890 /// the local node as a provider.
891 ///
892 /// Returns `Ok` if a provider record has been stored locally, providing the
893 /// `QueryId` of the initial query that announces the local node as a provider.
894 ///
895 /// The publication of the provider records is periodically repeated as per the
896 /// configured interval, to renew the expiry and account for changes to the DHT
897 /// topology. A provider record may be removed from local storage and
898 /// thus no longer re-published by calling [`Behaviour::stop_providing`].
899 ///
900 /// In contrast to the standard Kademlia push-based model for content distribution
901 /// implemented by [`Behaviour::put_record`], the provider API implements a
902 /// pull-based model that may be used in addition or as an alternative.
903 /// The means by which the actual value is obtained from a provider is out of scope
904 /// of the libp2p Kademlia provider API.
905 ///
906 /// The results of the (repeated) provider announcements sent by this node are
907 /// reported via [`Event::OutboundQueryProgressed{QueryResult::StartProviding}`].
908 pub fn start_providing(&mut self, key: record::Key) -> Result<QueryId, store::Error> {
909 // Note: We store our own provider records locally without local addresses
910 // to avoid redundant storage and outdated addresses. Instead these are
911 // acquired on demand when returning a `ProviderRecord` for the local node.
912 let local_addrs = Vec::new();
913 let record = ProviderRecord::new(
914 key.clone(),
915 *self.kbuckets.local_key().preimage(),
916 local_addrs,
917 );
918 self.store.add_provider(record)?;
919 let target = kbucket::Key::new(key.clone());
920 let peers = self.kbuckets.closest_keys(&target);
921 let context = AddProviderContext::Publish;
922 let info = QueryInfo::AddProvider {
923 context,
924 key,
925 phase: AddProviderPhase::GetClosestPeers,
926 };
927 let inner = QueryInner::new(info);
928 let id = self.queries.add_iter_closest(target.clone(), peers, inner);
929 Ok(id)
930 }
931
932 /// Stops the local node from announcing that it is a provider for the given key.
933 ///
934 /// This is a local operation. The local node will still be considered as a
935 /// provider for the key by other nodes until these provider records expire.
936 pub fn stop_providing(&mut self, key: &record::Key) {
937 self.store
938 .remove_provider(key, self.kbuckets.local_key().preimage());
939 }
940
941 /// Performs a lookup for providers of a value to the given key.
942 ///
943 /// The result of this operation is delivered in a
944 /// reported via [`Event::OutboundQueryProgressed{QueryResult::GetProviders}`].
945 pub fn get_providers(&mut self, key: record::Key) -> QueryId {
946 let providers: HashSet<_> = self
947 .store
948 .providers(&key)
949 .into_iter()
950 .filter(|p| !p.is_expired(Instant::now()))
951 .map(|p| p.provider)
952 .collect();
953
954 let step = ProgressStep::first();
955
956 let info = QueryInfo::GetProviders {
957 key: key.clone(),
958 providers_found: providers.len(),
959 step: if providers.is_empty() {
960 step.clone()
961 } else {
962 step.next()
963 },
964 };
965
966 let target = kbucket::Key::new(key.clone());
967 let peers = self.kbuckets.closest_keys(&target);
968 let inner = QueryInner::new(info);
969 let id = self.queries.add_iter_closest(target.clone(), peers, inner);
970
971 // No queries were actually done for the results yet.
972 let stats = QueryStats::empty();
973
974 if !providers.is_empty() {
975 self.queued_events
976 .push_back(ToSwarm::GenerateEvent(Event::OutboundQueryProgressed {
977 id,
978 result: QueryResult::GetProviders(Ok(GetProvidersOk::FoundProviders {
979 key,
980 providers,
981 })),
982 step,
983 stats,
984 }));
985 }
986 id
987 }
988
989 /// Set the [`Mode`] in which we should operate.
990 ///
991 /// By default, we are in [`Mode::Client`] and will swap into [`Mode::Server`] as soon as we have a confirmed, external address via [`FromSwarm::ExternalAddrConfirmed`].
992 ///
993 /// Setting a mode via this function disables this automatic behaviour and unconditionally operates in the specified mode.
994 /// To reactivate the automatic configuration, pass [`None`] instead.
995 pub fn set_mode(&mut self, mode: Option<Mode>) {
996 match mode {
997 Some(mode) => {
998 self.mode = mode;
999 self.auto_mode = false;
1000 self.reconfigure_mode();
1001 }
1002 None => {
1003 self.auto_mode = true;
1004 self.determine_mode_from_external_addresses();
1005 }
1006 }
1007
1008 if let Some(waker) = self.no_events_waker.take() {
1009 waker.wake();
1010 }
1011 }
1012
1013 fn reconfigure_mode(&mut self) {
1014 if self.connections.is_empty() {
1015 return;
1016 }
1017
1018 let num_connections = self.connections.len();
1019
1020 tracing::debug!(
1021 "Re-configuring {} established connection{}",
1022 num_connections,
1023 if num_connections > 1 { "s" } else { "" }
1024 );
1025
1026 self.queued_events
1027 .extend(
1028 self.connections
1029 .iter()
1030 .map(|(conn_id, peer_id)| ToSwarm::NotifyHandler {
1031 peer_id: *peer_id,
1032 handler: NotifyHandler::One(*conn_id),
1033 event: HandlerIn::ReconfigureMode {
1034 new_mode: self.mode,
1035 },
1036 }),
1037 );
1038 }
1039
1040 fn determine_mode_from_external_addresses(&mut self) {
1041 let old_mode = self.mode;
1042
1043 self.mode = match (self.external_addresses.as_slice(), self.mode) {
1044 ([], Mode::Server) => {
1045 tracing::debug!("Switching to client-mode because we no longer have any confirmed external addresses");
1046
1047 Mode::Client
1048 }
1049 ([], Mode::Client) => {
1050 // Previously client-mode, now also client-mode because no external addresses.
1051
1052 Mode::Client
1053 }
1054 (confirmed_external_addresses, Mode::Client) => {
1055 if tracing::enabled!(Level::DEBUG) {
1056 let confirmed_external_addresses =
1057 to_comma_separated_list(confirmed_external_addresses);
1058
1059 tracing::debug!("Switching to server-mode assuming that one of [{confirmed_external_addresses}] is externally reachable");
1060 }
1061
1062 Mode::Server
1063 }
1064 (confirmed_external_addresses, Mode::Server) => {
1065 debug_assert!(
1066 !confirmed_external_addresses.is_empty(),
1067 "Previous match arm handled empty list"
1068 );
1069
1070 // Previously, server-mode, now also server-mode because > 1 external address. Don't log anything to avoid spam.
1071
1072 Mode::Server
1073 }
1074 };
1075
1076 self.reconfigure_mode();
1077
1078 if old_mode != self.mode {
1079 self.queued_events
1080 .push_back(ToSwarm::GenerateEvent(Event::ModeChanged {
1081 new_mode: self.mode,
1082 }));
1083 }
1084 }
1085
1086 /// Processes discovered peers from a successful request in an iterative `Query`.
1087 fn discovered<'a, I>(&'a mut self, query_id: &QueryId, source: &PeerId, peers: I)
1088 where
1089 I: Iterator<Item = &'a KadPeer> + Clone,
1090 {
1091 let local_id = self.kbuckets.local_key().preimage();
1092 let others_iter = peers.filter(|p| &p.node_id != local_id);
1093 if let Some(query) = self.queries.get_mut(query_id) {
1094 tracing::trace!(peer=%source, query=?query_id, "Request to peer in query succeeded");
1095 for peer in others_iter.clone() {
1096 tracing::trace!(
1097 ?peer,
1098 %source,
1099 query=?query_id,
1100 "Peer reported by source in query"
1101 );
1102 let addrs = peer.multiaddrs.iter().cloned().collect();
1103 query.inner.addresses.insert(peer.node_id, addrs);
1104 }
1105 query.on_success(source, others_iter.cloned().map(|kp| kp.node_id))
1106 }
1107 }
1108
1109 /// Finds the closest peers to a `target` in the context of a request by
1110 /// the `source` peer, such that the `source` peer is never included in the
1111 /// result.
1112 fn find_closest<T: Clone>(
1113 &mut self,
1114 target: &kbucket::Key<T>,
1115 source: &PeerId,
1116 ) -> Vec<KadPeer> {
1117 if target == self.kbuckets.local_key() {
1118 Vec::new()
1119 } else {
1120 self.kbuckets
1121 .closest(target)
1122 .filter(|e| e.node.key.preimage() != source)
1123 .take(self.queries.config().replication_factor.get())
1124 .map(KadPeer::from)
1125 .collect()
1126 }
1127 }
1128
1129 /// Collects all peers who are known to be providers of the value for a given `Multihash`.
1130 fn provider_peers(&mut self, key: &record::Key, source: &PeerId) -> Vec<KadPeer> {
1131 let kbuckets = &mut self.kbuckets;
1132 let connected = &mut self.connected_peers;
1133 let listen_addresses = &self.listen_addresses;
1134 let external_addresses = &self.external_addresses;
1135
1136 self.store
1137 .providers(key)
1138 .into_iter()
1139 .filter_map(move |p| {
1140 if &p.provider != source {
1141 let node_id = p.provider;
1142 let multiaddrs = p.addresses;
1143 let connection_ty = if connected.contains(&node_id) {
1144 ConnectionType::Connected
1145 } else {
1146 ConnectionType::NotConnected
1147 };
1148 if multiaddrs.is_empty() {
1149 // The provider is either the local node and we fill in
1150 // the local addresses on demand, or it is a legacy
1151 // provider record without addresses, in which case we
1152 // try to find addresses in the routing table, as was
1153 // done before provider records were stored along with
1154 // their addresses.
1155 if &node_id == kbuckets.local_key().preimage() {
1156 Some(
1157 listen_addresses
1158 .iter()
1159 .chain(external_addresses.iter())
1160 .cloned()
1161 .collect::<Vec<_>>(),
1162 )
1163 } else {
1164 let key = kbucket::Key::from(node_id);
1165 kbuckets
1166 .entry(&key)
1167 .view()
1168 .map(|e| e.node.value.clone().into_vec())
1169 }
1170 } else {
1171 Some(multiaddrs)
1172 }
1173 .map(|multiaddrs| KadPeer {
1174 node_id,
1175 multiaddrs,
1176 connection_ty,
1177 })
1178 } else {
1179 None
1180 }
1181 })
1182 .take(self.queries.config().replication_factor.get())
1183 .collect()
1184 }
1185
1186 /// Starts an iterative `ADD_PROVIDER` query for the given key.
1187 fn start_add_provider(&mut self, key: record::Key, context: AddProviderContext) {
1188 let info = QueryInfo::AddProvider {
1189 context,
1190 key: key.clone(),
1191 phase: AddProviderPhase::GetClosestPeers,
1192 };
1193 let target = kbucket::Key::new(key);
1194 let peers = self.kbuckets.closest_keys(&target);
1195 let inner = QueryInner::new(info);
1196 self.queries.add_iter_closest(target.clone(), peers, inner);
1197 }
1198
1199 /// Starts an iterative `PUT_VALUE` query for the given record.
1200 fn start_put_record(&mut self, record: Record, quorum: Quorum, context: PutRecordContext) {
1201 let quorum = quorum.eval(self.queries.config().replication_factor);
1202 let target = kbucket::Key::new(record.key.clone());
1203 let peers = self.kbuckets.closest_keys(&target);
1204 let info = QueryInfo::PutRecord {
1205 record,
1206 quorum,
1207 context,
1208 phase: PutRecordPhase::GetClosestPeers,
1209 };
1210 let inner = QueryInner::new(info);
1211 self.queries.add_iter_closest(target.clone(), peers, inner);
1212 }
1213
1214 /// Updates the routing table with a new connection status and address of a peer.
1215 fn connection_updated(
1216 &mut self,
1217 peer: PeerId,
1218 address: Option<Multiaddr>,
1219 new_status: NodeStatus,
1220 ) {
1221 let key = kbucket::Key::from(peer);
1222 match self.kbuckets.entry(&key) {
1223 kbucket::Entry::Present(mut entry, old_status) => {
1224 if old_status != new_status {
1225 entry.update(new_status)
1226 }
1227 if let Some(address) = address {
1228 if entry.value().insert(address) {
1229 self.queued_events.push_back(ToSwarm::GenerateEvent(
1230 Event::RoutingUpdated {
1231 peer,
1232 is_new_peer: false,
1233 addresses: entry.value().clone(),
1234 old_peer: None,
1235 bucket_range: self
1236 .kbuckets
1237 .bucket(&key)
1238 .map(|b| b.range())
1239 .expect("Not kbucket::Entry::SelfEntry."),
1240 },
1241 ))
1242 }
1243 }
1244 }
1245
1246 kbucket::Entry::Pending(mut entry, old_status) => {
1247 if let Some(address) = address {
1248 entry.value().insert(address);
1249 }
1250 if old_status != new_status {
1251 entry.update(new_status);
1252 }
1253 }
1254
1255 kbucket::Entry::Absent(entry) => {
1256 // Only connected nodes with a known address are newly inserted.
1257 if new_status != NodeStatus::Connected {
1258 return;
1259 }
1260 match (address, self.kbucket_inserts) {
1261 (None, _) => {
1262 self.queued_events
1263 .push_back(ToSwarm::GenerateEvent(Event::UnroutablePeer { peer }));
1264 }
1265 (Some(a), BucketInserts::Manual) => {
1266 self.queued_events
1267 .push_back(ToSwarm::GenerateEvent(Event::RoutablePeer {
1268 peer,
1269 address: a,
1270 }));
1271 }
1272 (Some(a), BucketInserts::OnConnected) => {
1273 let addresses = Addresses::new(a);
1274 match entry.insert(addresses.clone(), new_status) {
1275 kbucket::InsertResult::Inserted => {
1276 let event = Event::RoutingUpdated {
1277 peer,
1278 is_new_peer: true,
1279 addresses,
1280 old_peer: None,
1281 bucket_range: self
1282 .kbuckets
1283 .bucket(&key)
1284 .map(|b| b.range())
1285 .expect("Not kbucket::Entry::SelfEntry."),
1286 };
1287 self.queued_events.push_back(ToSwarm::GenerateEvent(event));
1288 }
1289 kbucket::InsertResult::Full => {
1290 tracing::debug!(
1291 %peer,
1292 "Bucket full. Peer not added to routing table"
1293 );
1294 let address = addresses.first().clone();
1295 self.queued_events.push_back(ToSwarm::GenerateEvent(
1296 Event::RoutablePeer { peer, address },
1297 ));
1298 }
1299 kbucket::InsertResult::Pending { disconnected } => {
1300 let address = addresses.first().clone();
1301 self.queued_events.push_back(ToSwarm::GenerateEvent(
1302 Event::PendingRoutablePeer { peer, address },
1303 ));
1304
1305 // `disconnected` might already be in the process of re-connecting.
1306 // In other words `disconnected` might have already re-connected but
1307 // is not yet confirmed to support the Kademlia protocol via
1308 // [`HandlerEvent::ProtocolConfirmed`].
1309 //
1310 // Only try dialing peer if not currently connected.
1311 if !self.connected_peers.contains(disconnected.preimage()) {
1312 self.queued_events.push_back(ToSwarm::Dial {
1313 opts: DialOpts::peer_id(disconnected.into_preimage())
1314 .condition(dial_opts::PeerCondition::NotDialing)
1315 .build(),
1316 })
1317 }
1318 }
1319 }
1320 }
1321 }
1322 }
1323 _ => {}
1324 }
1325 }
1326
1327 /// Handles a finished (i.e. successful) query.
1328 fn query_finished(&mut self, q: Query<QueryInner>) -> Option<Event> {
1329 let query_id = q.id();
1330 tracing::trace!(query=?query_id, "Query finished");
1331 let result = q.into_result();
1332 match result.inner.info {
1333 QueryInfo::Bootstrap {
1334 peer,
1335 remaining,
1336 mut step,
1337 } => {
1338 let local_key = self.kbuckets.local_key().clone();
1339 let mut remaining = remaining.unwrap_or_else(|| {
1340 debug_assert_eq!(&peer, local_key.preimage());
1341 // The lookup for the local key finished. To complete the bootstrap process,
1342 // a bucket refresh should be performed for every bucket farther away than
1343 // the first non-empty bucket (which are most likely no more than the last
1344 // few, i.e. farthest, buckets).
1345 self.kbuckets
1346 .iter()
1347 .skip_while(|b| b.is_empty())
1348 .skip(1) // Skip the bucket with the closest neighbour.
1349 .map(|b| {
1350 // Try to find a key that falls into the bucket. While such keys can
1351 // be generated fully deterministically, the current libp2p kademlia
1352 // wire protocol requires transmission of the preimages of the actual
1353 // keys in the DHT keyspace, hence for now this is just a "best effort"
1354 // to find a key that hashes into a specific bucket. The probabilities
1355 // of finding a key in the bucket `b` with as most 16 trials are as
1356 // follows:
1357 //
1358 // Pr(bucket-255) = 1 - (1/2)^16 ~= 1
1359 // Pr(bucket-254) = 1 - (3/4)^16 ~= 1
1360 // Pr(bucket-253) = 1 - (7/8)^16 ~= 0.88
1361 // Pr(bucket-252) = 1 - (15/16)^16 ~= 0.64
1362 // ...
1363 let mut target = kbucket::Key::from(PeerId::random());
1364 for _ in 0..16 {
1365 let d = local_key.distance(&target);
1366 if b.contains(&d) {
1367 break;
1368 }
1369 target = kbucket::Key::from(PeerId::random());
1370 }
1371 target
1372 })
1373 .collect::<Vec<_>>()
1374 .into_iter()
1375 });
1376
1377 let num_remaining = remaining.len() as u32;
1378
1379 if let Some(target) = remaining.next() {
1380 let info = QueryInfo::Bootstrap {
1381 peer: *target.preimage(),
1382 remaining: Some(remaining),
1383 step: step.next(),
1384 };
1385 let peers = self.kbuckets.closest_keys(&target);
1386 let inner = QueryInner::new(info);
1387 self.queries
1388 .continue_iter_closest(query_id, target.clone(), peers, inner);
1389 } else {
1390 step.last = true;
1391 };
1392
1393 Some(Event::OutboundQueryProgressed {
1394 id: query_id,
1395 stats: result.stats,
1396 result: QueryResult::Bootstrap(Ok(BootstrapOk {
1397 peer,
1398 num_remaining,
1399 })),
1400 step,
1401 })
1402 }
1403
1404 QueryInfo::GetClosestPeers { key, mut step } => {
1405 step.last = true;
1406
1407 Some(Event::OutboundQueryProgressed {
1408 id: query_id,
1409 stats: result.stats,
1410 result: QueryResult::GetClosestPeers(Ok(GetClosestPeersOk {
1411 key,
1412 peers: result.peers.collect(),
1413 })),
1414 step,
1415 })
1416 }
1417
1418 QueryInfo::GetProviders { mut step, .. } => {
1419 step.last = true;
1420
1421 Some(Event::OutboundQueryProgressed {
1422 id: query_id,
1423 stats: result.stats,
1424 result: QueryResult::GetProviders(Ok(
1425 GetProvidersOk::FinishedWithNoAdditionalRecord {
1426 closest_peers: result.peers.collect(),
1427 },
1428 )),
1429 step,
1430 })
1431 }
1432
1433 QueryInfo::AddProvider {
1434 context,
1435 key,
1436 phase: AddProviderPhase::GetClosestPeers,
1437 } => {
1438 let provider_id = self.local_peer_id;
1439 let external_addresses = self.external_addresses.iter().cloned().collect();
1440 let inner = QueryInner::new(QueryInfo::AddProvider {
1441 context,
1442 key,
1443 phase: AddProviderPhase::AddProvider {
1444 provider_id,
1445 external_addresses,
1446 get_closest_peers_stats: result.stats,
1447 },
1448 });
1449 self.queries.continue_fixed(query_id, result.peers, inner);
1450 None
1451 }
1452
1453 QueryInfo::AddProvider {
1454 context,
1455 key,
1456 phase:
1457 AddProviderPhase::AddProvider {
1458 get_closest_peers_stats,
1459 ..
1460 },
1461 } => match context {
1462 AddProviderContext::Publish => Some(Event::OutboundQueryProgressed {
1463 id: query_id,
1464 stats: get_closest_peers_stats.merge(result.stats),
1465 result: QueryResult::StartProviding(Ok(AddProviderOk { key })),
1466 step: ProgressStep::first_and_last(),
1467 }),
1468 AddProviderContext::Republish => Some(Event::OutboundQueryProgressed {
1469 id: query_id,
1470 stats: get_closest_peers_stats.merge(result.stats),
1471 result: QueryResult::RepublishProvider(Ok(AddProviderOk { key })),
1472 step: ProgressStep::first_and_last(),
1473 }),
1474 },
1475
1476 QueryInfo::GetRecord {
1477 key,
1478 mut step,
1479 found_a_record,
1480 cache_candidates,
1481 } => {
1482 step.last = true;
1483
1484 let results = if found_a_record {
1485 Ok(GetRecordOk::FinishedWithNoAdditionalRecord { cache_candidates })
1486 } else {
1487 Err(GetRecordError::NotFound {
1488 key,
1489 closest_peers: result.peers.collect(),
1490 })
1491 };
1492 Some(Event::OutboundQueryProgressed {
1493 id: query_id,
1494 stats: result.stats,
1495 result: QueryResult::GetRecord(results),
1496 step,
1497 })
1498 }
1499
1500 QueryInfo::PutRecord {
1501 context,
1502 record,
1503 quorum,
1504 phase: PutRecordPhase::GetClosestPeers,
1505 } => {
1506 let info = QueryInfo::PutRecord {
1507 context,
1508 record,
1509 quorum,
1510 phase: PutRecordPhase::PutRecord {
1511 success: vec![],
1512 get_closest_peers_stats: result.stats,
1513 },
1514 };
1515 let inner = QueryInner::new(info);
1516 self.queries.continue_fixed(query_id, result.peers, inner);
1517 None
1518 }
1519
1520 QueryInfo::PutRecord {
1521 context,
1522 record,
1523 quorum,
1524 phase:
1525 PutRecordPhase::PutRecord {
1526 success,
1527 get_closest_peers_stats,
1528 },
1529 } => {
1530 let mk_result = |key: record::Key| {
1531 if success.len() >= quorum.get() {
1532 Ok(PutRecordOk { key })
1533 } else {
1534 Err(PutRecordError::QuorumFailed {
1535 key,
1536 quorum,
1537 success,
1538 })
1539 }
1540 };
1541 match context {
1542 PutRecordContext::Publish | PutRecordContext::Custom => {
1543 Some(Event::OutboundQueryProgressed {
1544 id: query_id,
1545 stats: get_closest_peers_stats.merge(result.stats),
1546 result: QueryResult::PutRecord(mk_result(record.key)),
1547 step: ProgressStep::first_and_last(),
1548 })
1549 }
1550 PutRecordContext::Republish => Some(Event::OutboundQueryProgressed {
1551 id: query_id,
1552 stats: get_closest_peers_stats.merge(result.stats),
1553 result: QueryResult::RepublishRecord(mk_result(record.key)),
1554 step: ProgressStep::first_and_last(),
1555 }),
1556 PutRecordContext::Replicate => {
1557 tracing::debug!(record=?record.key, "Record replicated");
1558 None
1559 }
1560 }
1561 }
1562 }
1563 }
1564
1565 /// Handles a query that timed out.
1566 fn query_timeout(&mut self, query: Query<QueryInner>) -> Option<Event> {
1567 let query_id = query.id();
1568 tracing::trace!(query=?query_id, "Query timed out");
1569 let result = query.into_result();
1570 match result.inner.info {
1571 QueryInfo::Bootstrap {
1572 peer,
1573 mut remaining,
1574 mut step,
1575 } => {
1576 let num_remaining = remaining.as_ref().map(|r| r.len().saturating_sub(1) as u32);
1577
1578 // Continue with the next bootstrap query if `remaining` is not empty.
1579 if let Some((target, remaining)) =
1580 remaining.take().and_then(|mut r| Some((r.next()?, r)))
1581 {
1582 let info = QueryInfo::Bootstrap {
1583 peer: target.clone().into_preimage(),
1584 remaining: Some(remaining),
1585 step: step.next(),
1586 };
1587 let peers = self.kbuckets.closest_keys(&target);
1588 let inner = QueryInner::new(info);
1589 self.queries
1590 .continue_iter_closest(query_id, target.clone(), peers, inner);
1591 } else {
1592 step.last = true;
1593 }
1594
1595 Some(Event::OutboundQueryProgressed {
1596 id: query_id,
1597 stats: result.stats,
1598 result: QueryResult::Bootstrap(Err(BootstrapError::Timeout {
1599 peer,
1600 num_remaining,
1601 })),
1602 step,
1603 })
1604 }
1605
1606 QueryInfo::AddProvider { context, key, .. } => Some(match context {
1607 AddProviderContext::Publish => Event::OutboundQueryProgressed {
1608 id: query_id,
1609 stats: result.stats,
1610 result: QueryResult::StartProviding(Err(AddProviderError::Timeout { key })),
1611 step: ProgressStep::first_and_last(),
1612 },
1613 AddProviderContext::Republish => Event::OutboundQueryProgressed {
1614 id: query_id,
1615 stats: result.stats,
1616 result: QueryResult::RepublishProvider(Err(AddProviderError::Timeout { key })),
1617 step: ProgressStep::first_and_last(),
1618 },
1619 }),
1620
1621 QueryInfo::GetClosestPeers { key, mut step } => {
1622 step.last = true;
1623
1624 Some(Event::OutboundQueryProgressed {
1625 id: query_id,
1626 stats: result.stats,
1627 result: QueryResult::GetClosestPeers(Err(GetClosestPeersError::Timeout {
1628 key,
1629 peers: result.peers.collect(),
1630 })),
1631 step,
1632 })
1633 }
1634
1635 QueryInfo::PutRecord {
1636 record,
1637 quorum,
1638 context,
1639 phase,
1640 } => {
1641 let err = Err(PutRecordError::Timeout {
1642 key: record.key,
1643 quorum,
1644 success: match phase {
1645 PutRecordPhase::GetClosestPeers => vec![],
1646 PutRecordPhase::PutRecord { ref success, .. } => success.clone(),
1647 },
1648 });
1649 match context {
1650 PutRecordContext::Publish | PutRecordContext::Custom => {
1651 Some(Event::OutboundQueryProgressed {
1652 id: query_id,
1653 stats: result.stats,
1654 result: QueryResult::PutRecord(err),
1655 step: ProgressStep::first_and_last(),
1656 })
1657 }
1658 PutRecordContext::Republish => Some(Event::OutboundQueryProgressed {
1659 id: query_id,
1660 stats: result.stats,
1661 result: QueryResult::RepublishRecord(err),
1662 step: ProgressStep::first_and_last(),
1663 }),
1664 PutRecordContext::Replicate => match phase {
1665 PutRecordPhase::GetClosestPeers => {
1666 tracing::warn!(
1667 "Locating closest peers for replication failed: {:?}",
1668 err
1669 );
1670 None
1671 }
1672 PutRecordPhase::PutRecord { .. } => {
1673 tracing::debug!("Replicating record failed: {:?}", err);
1674 None
1675 }
1676 },
1677 }
1678 }
1679
1680 QueryInfo::GetRecord { key, mut step, .. } => {
1681 step.last = true;
1682
1683 Some(Event::OutboundQueryProgressed {
1684 id: query_id,
1685 stats: result.stats,
1686 result: QueryResult::GetRecord(Err(GetRecordError::Timeout { key })),
1687 step,
1688 })
1689 }
1690
1691 QueryInfo::GetProviders { key, mut step, .. } => {
1692 step.last = true;
1693
1694 Some(Event::OutboundQueryProgressed {
1695 id: query_id,
1696 stats: result.stats,
1697 result: QueryResult::GetProviders(Err(GetProvidersError::Timeout {
1698 key,
1699 closest_peers: result.peers.collect(),
1700 })),
1701 step,
1702 })
1703 }
1704 }
1705 }
1706
1707 /// Processes a record received from a peer.
1708 fn record_received(
1709 &mut self,
1710 source: PeerId,
1711 connection: ConnectionId,
1712 request_id: RequestId,
1713 mut record: Record,
1714 ) {
1715 if record.publisher.as_ref() == Some(self.kbuckets.local_key().preimage()) {
1716 // If the (alleged) publisher is the local node, do nothing. The record of
1717 // the original publisher should never change as a result of replication
1718 // and the publisher is always assumed to have the "right" value.
1719 self.queued_events.push_back(ToSwarm::NotifyHandler {
1720 peer_id: source,
1721 handler: NotifyHandler::One(connection),
1722 event: HandlerIn::PutRecordRes {
1723 key: record.key,
1724 value: record.value,
1725 request_id,
1726 },
1727 });
1728 return;
1729 }
1730
1731 let now = Instant::now();
1732
1733 // Calculate the expiration exponentially inversely proportional to the
1734 // number of nodes between the local node and the closest node to the key
1735 // (beyond the replication factor). This ensures avoiding over-caching
1736 // outside of the k closest nodes to a key.
1737 let target = kbucket::Key::new(record.key.clone());
1738 let num_between = self.kbuckets.count_nodes_between(&target);
1739 let k = self.queries.config().replication_factor.get();
1740 let num_beyond_k = (usize::max(k, num_between) - k) as u32;
1741 let expiration = self
1742 .record_ttl
1743 .map(|ttl| now + exp_decrease(ttl, num_beyond_k));
1744 // The smaller TTL prevails. Only if neither TTL is set is the record
1745 // stored "forever".
1746 record.expires = record.expires.or(expiration).min(expiration);
1747
1748 if let Some(job) = self.put_record_job.as_mut() {
1749 // Ignore the record in the next run of the replication
1750 // job, since we can assume the sender replicated the
1751 // record to the k closest peers. Effectively, only
1752 // one of the k closest peers performs a replication
1753 // in the configured interval, assuming a shared interval.
1754 job.skip(record.key.clone())
1755 }
1756
1757 // While records received from a publisher, as well as records that do
1758 // not exist locally should always (attempted to) be stored, there is a
1759 // choice here w.r.t. the handling of replicated records whose keys refer
1760 // to records that exist locally: The value and / or the publisher may
1761 // either be overridden or left unchanged. At the moment and in the
1762 // absence of a decisive argument for another option, both are always
1763 // overridden as it avoids having to load the existing record in the
1764 // first place.
1765
1766 if !record.is_expired(now) {
1767 // The record is cloned because of the weird libp2p protocol
1768 // requirement to send back the value in the response, although this
1769 // is a waste of resources.
1770 match self.record_filtering {
1771 StoreInserts::Unfiltered => match self.store.put(record.clone()) {
1772 Ok(()) => {
1773 tracing::debug!(
1774 record=?record.key,
1775 "Record stored: {} bytes",
1776 record.value.len()
1777 );
1778 self.queued_events.push_back(ToSwarm::GenerateEvent(
1779 Event::InboundRequest {
1780 request: InboundRequest::PutRecord {
1781 source,
1782 connection,
1783 record: None,
1784 },
1785 },
1786 ));
1787 }
1788 Err(e) => {
1789 tracing::info!("Record not stored: {:?}", e);
1790 self.queued_events.push_back(ToSwarm::NotifyHandler {
1791 peer_id: source,
1792 handler: NotifyHandler::One(connection),
1793 event: HandlerIn::Reset(request_id),
1794 });
1795
1796 return;
1797 }
1798 },
1799 StoreInserts::FilterBoth => {
1800 self.queued_events
1801 .push_back(ToSwarm::GenerateEvent(Event::InboundRequest {
1802 request: InboundRequest::PutRecord {
1803 source,
1804 connection,
1805 record: Some(record.clone()),
1806 },
1807 }));
1808 }
1809 }
1810 }
1811
1812 // The remote receives a [`HandlerIn::PutRecordRes`] even in the
1813 // case where the record is discarded due to being expired. Given that
1814 // the remote sent the local node a [`HandlerEvent::PutRecord`]
1815 // request, the remote perceives the local node as one node among the k
1816 // closest nodes to the target. In addition returning
1817 // [`HandlerIn::PutRecordRes`] does not reveal any internal
1818 // information to a possibly malicious remote node.
1819 self.queued_events.push_back(ToSwarm::NotifyHandler {
1820 peer_id: source,
1821 handler: NotifyHandler::One(connection),
1822 event: HandlerIn::PutRecordRes {
1823 key: record.key,
1824 value: record.value,
1825 request_id,
1826 },
1827 })
1828 }
1829
1830 /// Processes a provider record received from a peer.
1831 fn provider_received(&mut self, key: record::Key, provider: KadPeer) {
1832 if &provider.node_id != self.kbuckets.local_key().preimage() {
1833 let record = ProviderRecord {
1834 key,
1835 provider: provider.node_id,
1836 expires: self.provider_record_ttl.map(|ttl| Instant::now() + ttl),
1837 addresses: provider.multiaddrs,
1838 };
1839 match self.record_filtering {
1840 StoreInserts::Unfiltered => {
1841 if let Err(e) = self.store.add_provider(record) {
1842 tracing::info!("Provider record not stored: {:?}", e);
1843 return;
1844 }
1845
1846 self.queued_events
1847 .push_back(ToSwarm::GenerateEvent(Event::InboundRequest {
1848 request: InboundRequest::AddProvider { record: None },
1849 }));
1850 }
1851 StoreInserts::FilterBoth => {
1852 self.queued_events
1853 .push_back(ToSwarm::GenerateEvent(Event::InboundRequest {
1854 request: InboundRequest::AddProvider {
1855 record: Some(record),
1856 },
1857 }));
1858 }
1859 }
1860 }
1861 }
1862
1863 fn address_failed(&mut self, peer_id: PeerId, address: &Multiaddr) {
1864 let key = kbucket::Key::from(peer_id);
1865
1866 if let Some(addrs) = self.kbuckets.entry(&key).value() {
1867 // TODO: Ideally, the address should only be removed if the error can
1868 // be classified as "permanent" but since `err` is currently a borrowed
1869 // trait object without a `'static` bound, even downcasting for inspection
1870 // of the error is not possible (and also not truly desirable or ergonomic).
1871 // The error passed in should rather be a dedicated enum.
1872 if addrs.remove(address).is_ok() {
1873 tracing::debug!(
1874 peer=%peer_id,
1875 %address,
1876 "Address removed from peer due to error."
1877 );
1878 } else {
1879 // Despite apparently having no reachable address (any longer),
1880 // the peer is kept in the routing table with the last address to avoid
1881 // (temporary) loss of network connectivity to "flush" the routing
1882 // table. Once in, a peer is only removed from the routing table
1883 // if it is the least recently connected peer, currently disconnected
1884 // and is unreachable in the context of another peer pending insertion
1885 // into the same bucket. This is handled transparently by the
1886 // `KBucketsTable` and takes effect through `KBucketsTable::take_applied_pending`
1887 // within `Behaviour::poll`.
1888 tracing::debug!(
1889 peer=%peer_id,
1890 %address,
1891 "Last remaining address of peer is unreachable."
1892 );
1893 }
1894 }
1895
1896 for query in self.queries.iter_mut() {
1897 if let Some(addrs) = query.inner.addresses.get_mut(&peer_id) {
1898 addrs.retain(|a| a != address);
1899 }
1900 }
1901 }
1902
1903 fn on_connection_established(
1904 &mut self,
1905 ConnectionEstablished {
1906 peer_id,
1907 failed_addresses,
1908 other_established,
1909 ..
1910 }: ConnectionEstablished,
1911 ) {
1912 for addr in failed_addresses {
1913 self.address_failed(peer_id, addr);
1914 }
1915
1916 // Peer's first connection.
1917 if other_established == 0 {
1918 self.connected_peers.insert(peer_id);
1919 }
1920 }
1921
1922 fn on_address_change(
1923 &mut self,
1924 AddressChange {
1925 peer_id: peer,
1926 old,
1927 new,
1928 ..
1929 }: AddressChange,
1930 ) {
1931 let (old, new) = (old.get_remote_address(), new.get_remote_address());
1932
1933 // Update routing table.
1934 if let Some(addrs) = self.kbuckets.entry(&kbucket::Key::from(peer)).value() {
1935 if addrs.replace(old, new) {
1936 tracing::debug!(
1937 %peer,
1938 old_address=%old,
1939 new_address=%new,
1940 "Old address replaced with new address for peer."
1941 );
1942 } else {
1943 tracing::debug!(
1944 %peer,
1945 old_address=%old,
1946 new_address=%new,
1947 "Old address not replaced with new address for peer as old address wasn't present.",
1948 );
1949 }
1950 } else {
1951 tracing::debug!(
1952 %peer,
1953 old_address=%old,
1954 new_address=%new,
1955 "Old address not replaced with new address for peer as peer is not present in the \
1956 routing table."
1957 );
1958 }
1959
1960 // Update query address cache.
1961 //
1962 // Given two connected nodes: local node A and remote node B. Say node B
1963 // is not in node A's routing table. Additionally node B is part of the
1964 // `QueryInner::addresses` list of an ongoing query on node A. Say Node
1965 // B triggers an address change and then disconnects. Later on the
1966 // earlier mentioned query on node A would like to connect to node B.
1967 // Without replacing the address in the `QueryInner::addresses` set node
1968 // A would attempt to dial the old and not the new address.
1969 //
1970 // While upholding correctness, iterating through all discovered
1971 // addresses of a peer in all currently ongoing queries might have a
1972 // large performance impact. If so, the code below might be worth
1973 // revisiting.
1974 for query in self.queries.iter_mut() {
1975 if let Some(addrs) = query.inner.addresses.get_mut(&peer) {
1976 for addr in addrs.iter_mut() {
1977 if addr == old {
1978 *addr = new.clone();
1979 }
1980 }
1981 }
1982 }
1983 }
1984
1985 fn on_dial_failure(&mut self, DialFailure { peer_id, error, .. }: DialFailure) {
1986 let Some(peer_id) = peer_id else { return };
1987
1988 match error {
1989 DialError::LocalPeerId { .. }
1990 | DialError::WrongPeerId { .. }
1991 | DialError::Aborted
1992 | DialError::Denied { .. }
1993 | DialError::Transport(_)
1994 | DialError::NoAddresses => {
1995 if let DialError::Transport(addresses) = error {
1996 for (addr, _) in addresses {
1997 self.address_failed(peer_id, addr)
1998 }
1999 }
2000
2001 for query in self.queries.iter_mut() {
2002 query.on_failure(&peer_id);
2003 }
2004 }
2005 DialError::DialPeerConditionFalse(
2006 dial_opts::PeerCondition::Disconnected
2007 | dial_opts::PeerCondition::NotDialing
2008 | dial_opts::PeerCondition::DisconnectedAndNotDialing,
2009 ) => {
2010 // We might (still) be connected, or about to be connected, thus do not report the
2011 // failure to the queries.
2012 }
2013 DialError::DialPeerConditionFalse(dial_opts::PeerCondition::Always) => {
2014 unreachable!("DialPeerCondition::Always can not trigger DialPeerConditionFalse.");
2015 }
2016 }
2017 }
2018
2019 fn on_connection_closed(
2020 &mut self,
2021 ConnectionClosed {
2022 peer_id,
2023 remaining_established,
2024 connection_id,
2025 ..
2026 }: ConnectionClosed,
2027 ) {
2028 self.connections.remove(&connection_id);
2029
2030 if remaining_established == 0 {
2031 for query in self.queries.iter_mut() {
2032 query.on_failure(&peer_id);
2033 }
2034 self.connection_updated(peer_id, None, NodeStatus::Disconnected);
2035 self.connected_peers.remove(&peer_id);
2036 }
2037 }
2038
2039 /// Preloads a new [`Handler`] with requests that are waiting to be sent to the newly connected peer.
2040 fn preload_new_handler(
2041 &mut self,
2042 handler: &mut Handler,
2043 connection_id: ConnectionId,
2044 peer: PeerId,
2045 ) {
2046 self.connections.insert(connection_id, peer);
2047 // Queue events for sending pending RPCs to the connected peer.
2048 // There can be only one pending RPC for a particular peer and query per definition.
2049 for (_peer_id, event) in self.queries.iter_mut().filter_map(|q| {
2050 q.inner
2051 .pending_rpcs
2052 .iter()
2053 .position(|(p, _)| p == &peer)
2054 .map(|p| q.inner.pending_rpcs.remove(p))
2055 }) {
2056 handler.on_behaviour_event(event)
2057 }
2058 }
2059}
2060
2061/// Exponentially decrease the given duration (base 2).
2062fn exp_decrease(ttl: Duration, exp: u32) -> Duration {
2063 Duration::from_secs(ttl.as_secs().checked_shr(exp).unwrap_or(0))
2064}
2065
2066impl<TStore> NetworkBehaviour for Behaviour<TStore>
2067where
2068 TStore: RecordStore + Send + 'static,
2069{
2070 type ConnectionHandler = Handler;
2071 type ToSwarm = Event;
2072
2073 fn handle_established_inbound_connection(
2074 &mut self,
2075 connection_id: ConnectionId,
2076 peer: PeerId,
2077 local_addr: &Multiaddr,
2078 remote_addr: &Multiaddr,
2079 ) -> Result<THandler<Self>, ConnectionDenied> {
2080 let connected_point = ConnectedPoint::Listener {
2081 local_addr: local_addr.clone(),
2082 send_back_addr: remote_addr.clone(),
2083 };
2084
2085 let mut handler = Handler::new(
2086 self.protocol_config.clone(),
2087 connected_point,
2088 peer,
2089 self.mode,
2090 );
2091 self.preload_new_handler(&mut handler, connection_id, peer);
2092
2093 Ok(handler)
2094 }
2095
2096 fn handle_established_outbound_connection(
2097 &mut self,
2098 connection_id: ConnectionId,
2099 peer: PeerId,
2100 addr: &Multiaddr,
2101 role_override: Endpoint,
2102 ) -> Result<THandler<Self>, ConnectionDenied> {
2103 let connected_point = ConnectedPoint::Dialer {
2104 address: addr.clone(),
2105 role_override,
2106 };
2107
2108 let mut handler = Handler::new(
2109 self.protocol_config.clone(),
2110 connected_point,
2111 peer,
2112 self.mode,
2113 );
2114 self.preload_new_handler(&mut handler, connection_id, peer);
2115
2116 Ok(handler)
2117 }
2118
2119 fn handle_pending_outbound_connection(
2120 &mut self,
2121 _connection_id: ConnectionId,
2122 maybe_peer: Option<PeerId>,
2123 _addresses: &[Multiaddr],
2124 _effective_role: Endpoint,
2125 ) -> Result<Vec<Multiaddr>, ConnectionDenied> {
2126 let peer_id = match maybe_peer {
2127 None => return Ok(vec![]),
2128 Some(peer) => peer,
2129 };
2130
2131 // We should order addresses from decreasing likelyhood of connectivity, so start with
2132 // the addresses of that peer in the k-buckets.
2133 let key = kbucket::Key::from(peer_id);
2134 let mut peer_addrs =
2135 if let kbucket::Entry::Present(mut entry, _) = self.kbuckets.entry(&key) {
2136 let addrs = entry.value().iter().cloned().collect::<Vec<_>>();
2137 debug_assert!(!addrs.is_empty(), "Empty peer addresses in routing table.");
2138 addrs
2139 } else {
2140 Vec::new()
2141 };
2142
2143 // We add to that a temporary list of addresses from the ongoing queries.
2144 for query in self.queries.iter() {
2145 if let Some(addrs) = query.inner.addresses.get(&peer_id) {
2146 peer_addrs.extend(addrs.iter().cloned())
2147 }
2148 }
2149
2150 Ok(peer_addrs)
2151 }
2152
2153 fn on_connection_handler_event(
2154 &mut self,
2155 source: PeerId,
2156 connection: ConnectionId,
2157 event: THandlerOutEvent<Self>,
2158 ) {
2159 match event {
2160 HandlerEvent::ProtocolConfirmed { endpoint } => {
2161 debug_assert!(self.connected_peers.contains(&source));
2162 // The remote's address can only be put into the routing table,
2163 // and thus shared with other nodes, if the local node is the dialer,
2164 // since the remote address on an inbound connection may be specific
2165 // to that connection (e.g. typically the TCP port numbers).
2166 let address = match endpoint {
2167 ConnectedPoint::Dialer { address, .. } => Some(address),
2168 ConnectedPoint::Listener { .. } => None,
2169 };
2170
2171 self.connection_updated(source, address, NodeStatus::Connected);
2172 }
2173
2174 HandlerEvent::ProtocolNotSupported { endpoint } => {
2175 let address = match endpoint {
2176 ConnectedPoint::Dialer { address, .. } => Some(address),
2177 ConnectedPoint::Listener { .. } => None,
2178 };
2179 self.connection_updated(source, address, NodeStatus::Disconnected);
2180 }
2181
2182 HandlerEvent::FindNodeReq { key, request_id } => {
2183 let closer_peers = self.find_closest(&kbucket::Key::new(key), &source);
2184
2185 self.queued_events
2186 .push_back(ToSwarm::GenerateEvent(Event::InboundRequest {
2187 request: InboundRequest::FindNode {
2188 num_closer_peers: closer_peers.len(),
2189 },
2190 }));
2191
2192 self.queued_events.push_back(ToSwarm::NotifyHandler {
2193 peer_id: source,
2194 handler: NotifyHandler::One(connection),
2195 event: HandlerIn::FindNodeRes {
2196 closer_peers,
2197 request_id,
2198 },
2199 });
2200 }
2201
2202 HandlerEvent::FindNodeRes {
2203 closer_peers,
2204 query_id,
2205 } => {
2206 self.discovered(&query_id, &source, closer_peers.iter());
2207 }
2208
2209 HandlerEvent::GetProvidersReq { key, request_id } => {
2210 let provider_peers = self.provider_peers(&key, &source);
2211 let closer_peers = self.find_closest(&kbucket::Key::new(key), &source);
2212
2213 self.queued_events
2214 .push_back(ToSwarm::GenerateEvent(Event::InboundRequest {
2215 request: InboundRequest::GetProvider {
2216 num_closer_peers: closer_peers.len(),
2217 num_provider_peers: provider_peers.len(),
2218 },
2219 }));
2220
2221 self.queued_events.push_back(ToSwarm::NotifyHandler {
2222 peer_id: source,
2223 handler: NotifyHandler::One(connection),
2224 event: HandlerIn::GetProvidersRes {
2225 closer_peers,
2226 provider_peers,
2227 request_id,
2228 },
2229 });
2230 }
2231
2232 HandlerEvent::GetProvidersRes {
2233 closer_peers,
2234 provider_peers,
2235 query_id,
2236 } => {
2237 let peers = closer_peers.iter().chain(provider_peers.iter());
2238 self.discovered(&query_id, &source, peers);
2239 if let Some(query) = self.queries.get_mut(&query_id) {
2240 let stats = query.stats().clone();
2241 if let QueryInfo::GetProviders {
2242 ref key,
2243 ref mut providers_found,
2244 ref mut step,
2245 ..
2246 } = query.inner.info
2247 {
2248 *providers_found += provider_peers.len();
2249 let providers = provider_peers.iter().map(|p| p.node_id).collect();
2250
2251 self.queued_events.push_back(ToSwarm::GenerateEvent(
2252 Event::OutboundQueryProgressed {
2253 id: query_id,
2254 result: QueryResult::GetProviders(Ok(
2255 GetProvidersOk::FoundProviders {
2256 key: key.clone(),
2257 providers,
2258 },
2259 )),
2260 step: step.clone(),
2261 stats,
2262 },
2263 ));
2264 *step = step.next();
2265 }
2266 }
2267 }
2268 HandlerEvent::QueryError { query_id, error } => {
2269 tracing::debug!(
2270 peer=%source,
2271 query=?query_id,
2272 "Request to peer in query failed with {:?}",
2273 error
2274 );
2275 // If the query to which the error relates is still active,
2276 // signal the failure w.r.t. `source`.
2277 if let Some(query) = self.queries.get_mut(&query_id) {
2278 query.on_failure(&source)
2279 }
2280 }
2281
2282 HandlerEvent::AddProvider { key, provider } => {
2283 // Only accept a provider record from a legitimate peer.
2284 if provider.node_id != source {
2285 return;
2286 }
2287
2288 self.provider_received(key, provider);
2289 }
2290
2291 HandlerEvent::GetRecord { key, request_id } => {
2292 // Lookup the record locally.
2293 let record = match self.store.get(&key) {
2294 Some(record) => {
2295 if record.is_expired(Instant::now()) {
2296 self.store.remove(&key);
2297 None
2298 } else {
2299 Some(record.into_owned())
2300 }
2301 }
2302 None => None,
2303 };
2304
2305 let closer_peers = self.find_closest(&kbucket::Key::new(key), &source);
2306
2307 self.queued_events
2308 .push_back(ToSwarm::GenerateEvent(Event::InboundRequest {
2309 request: InboundRequest::GetRecord {
2310 num_closer_peers: closer_peers.len(),
2311 present_locally: record.is_some(),
2312 },
2313 }));
2314
2315 self.queued_events.push_back(ToSwarm::NotifyHandler {
2316 peer_id: source,
2317 handler: NotifyHandler::One(connection),
2318 event: HandlerIn::GetRecordRes {
2319 record,
2320 closer_peers,
2321 request_id,
2322 },
2323 });
2324 }
2325
2326 HandlerEvent::GetRecordRes {
2327 record,
2328 closer_peers,
2329 query_id,
2330 } => {
2331 if let Some(query) = self.queries.get_mut(&query_id) {
2332 let stats = query.stats().clone();
2333 if let QueryInfo::GetRecord {
2334 key,
2335 ref mut step,
2336 ref mut found_a_record,
2337 cache_candidates,
2338 } = &mut query.inner.info
2339 {
2340 if let Some(record) = record {
2341 *found_a_record = true;
2342 let record = PeerRecord {
2343 peer: Some(source),
2344 record,
2345 };
2346
2347 self.queued_events.push_back(ToSwarm::GenerateEvent(
2348 Event::OutboundQueryProgressed {
2349 id: query_id,
2350 result: QueryResult::GetRecord(Ok(GetRecordOk::FoundRecord(
2351 record,
2352 ))),
2353 step: step.clone(),
2354 stats,
2355 },
2356 ));
2357
2358 *step = step.next();
2359 } else {
2360 tracing::trace!(record=?key, %source, "Record not found at source");
2361 if let Caching::Enabled { max_peers } = self.caching {
2362 let source_key = kbucket::Key::from(source);
2363 let target_key = kbucket::Key::from(key.clone());
2364 let distance = source_key.distance(&target_key);
2365 cache_candidates.insert(distance, source);
2366 if cache_candidates.len() > max_peers as usize {
2367 // TODO: `pop_last()` would be nice once stabilised.
2368 // See https://github.com/rust-lang/rust/issues/62924.
2369 let last =
2370 *cache_candidates.keys().next_back().expect("len > 0");
2371 cache_candidates.remove(&last);
2372 }
2373 }
2374 }
2375 }
2376 }
2377
2378 self.discovered(&query_id, &source, closer_peers.iter());
2379 }
2380
2381 HandlerEvent::PutRecord { record, request_id } => {
2382 self.record_received(source, connection, request_id, record);
2383 }
2384
2385 HandlerEvent::PutRecordRes { query_id, .. } => {
2386 if let Some(query) = self.queries.get_mut(&query_id) {
2387 query.on_success(&source, vec![]);
2388 if let QueryInfo::PutRecord {
2389 phase: PutRecordPhase::PutRecord { success, .. },
2390 quorum,
2391 ..
2392 } = &mut query.inner.info
2393 {
2394 success.push(source);
2395
2396 let quorum = quorum.get();
2397 if success.len() >= quorum {
2398 let peers = success.clone();
2399 let finished = query.try_finish(peers.iter());
2400 if !finished {
2401 tracing::debug!(
2402 peer=%source,
2403 query=?query_id,
2404 "PutRecord query reached quorum ({}/{}) with response \
2405 from peer but could not yet finish.",
2406 peers.len(),
2407 quorum,
2408 );
2409 }
2410 }
2411 }
2412 }
2413 }
2414 };
2415 }
2416
2417 #[tracing::instrument(level = "trace", name = "NetworkBehaviour::poll", skip(self, cx))]
2418 fn poll(
2419 &mut self,
2420 cx: &mut Context<'_>,
2421 ) -> Poll<ToSwarm<Self::ToSwarm, THandlerInEvent<Self>>> {
2422 let now = Instant::now();
2423
2424 // Calculate the available capacity for queries triggered by background jobs.
2425 let mut jobs_query_capacity = JOBS_MAX_QUERIES.saturating_sub(self.queries.size());
2426
2427 // Run the periodic provider announcement job.
2428 if let Some(mut job) = self.add_provider_job.take() {
2429 let num = usize::min(JOBS_MAX_NEW_QUERIES, jobs_query_capacity);
2430 for _ in 0..num {
2431 if let Poll::Ready(r) = job.poll(cx, &mut self.store, now) {
2432 self.start_add_provider(r.key, AddProviderContext::Republish)
2433 } else {
2434 break;
2435 }
2436 }
2437 jobs_query_capacity -= num;
2438 self.add_provider_job = Some(job);
2439 }
2440
2441 // Run the periodic record replication / publication job.
2442 if let Some(mut job) = self.put_record_job.take() {
2443 let num = usize::min(JOBS_MAX_NEW_QUERIES, jobs_query_capacity);
2444 for _ in 0..num {
2445 if let Poll::Ready(r) = job.poll(cx, &mut self.store, now) {
2446 let context =
2447 if r.publisher.as_ref() == Some(self.kbuckets.local_key().preimage()) {
2448 PutRecordContext::Republish
2449 } else {
2450 PutRecordContext::Replicate
2451 };
2452 self.start_put_record(r, Quorum::All, context)
2453 } else {
2454 break;
2455 }
2456 }
2457 self.put_record_job = Some(job);
2458 }
2459
2460 loop {
2461 // Drain queued events first.
2462 if let Some(event) = self.queued_events.pop_front() {
2463 return Poll::Ready(event);
2464 }
2465
2466 // Drain applied pending entries from the routing table.
2467 if let Some(entry) = self.kbuckets.take_applied_pending() {
2468 let kbucket::Node { key, value } = entry.inserted;
2469 let event = Event::RoutingUpdated {
2470 bucket_range: self
2471 .kbuckets
2472 .bucket(&key)
2473 .map(|b| b.range())
2474 .expect("Self to never be applied from pending."),
2475 peer: key.into_preimage(),
2476 is_new_peer: true,
2477 addresses: value,
2478 old_peer: entry.evicted.map(|n| n.key.into_preimage()),
2479 };
2480 return Poll::Ready(ToSwarm::GenerateEvent(event));
2481 }
2482
2483 // Look for a finished query.
2484 loop {
2485 match self.queries.poll(now) {
2486 QueryPoolState::Finished(q) => {
2487 if let Some(event) = self.query_finished(q) {
2488 return Poll::Ready(ToSwarm::GenerateEvent(event));
2489 }
2490 }
2491 QueryPoolState::Timeout(q) => {
2492 if let Some(event) = self.query_timeout(q) {
2493 return Poll::Ready(ToSwarm::GenerateEvent(event));
2494 }
2495 }
2496 QueryPoolState::Waiting(Some((query, peer_id))) => {
2497 let event = query.inner.info.to_request(query.id());
2498 // TODO: AddProvider requests yield no response, so the query completes
2499 // as soon as all requests have been sent. However, the handler should
2500 // better emit an event when the request has been sent (and report
2501 // an error if sending fails), instead of immediately reporting
2502 // "success" somewhat prematurely here.
2503 if let QueryInfo::AddProvider {
2504 phase: AddProviderPhase::AddProvider { .. },
2505 ..
2506 } = &query.inner.info
2507 {
2508 query.on_success(&peer_id, vec![])
2509 }
2510
2511 if self.connected_peers.contains(&peer_id) {
2512 self.queued_events.push_back(ToSwarm::NotifyHandler {
2513 peer_id,
2514 event,
2515 handler: NotifyHandler::Any,
2516 });
2517 } else if &peer_id != self.kbuckets.local_key().preimage() {
2518 query.inner.pending_rpcs.push((peer_id, event));
2519 self.queued_events.push_back(ToSwarm::Dial {
2520 opts: DialOpts::peer_id(peer_id)
2521 .condition(dial_opts::PeerCondition::NotDialing)
2522 .build(),
2523 });
2524 }
2525 }
2526 QueryPoolState::Waiting(None) | QueryPoolState::Idle => break,
2527 }
2528 }
2529
2530 // No immediate event was produced as a result of a finished query.
2531 // If no new events have been queued either, signal `NotReady` to
2532 // be polled again later.
2533 if self.queued_events.is_empty() {
2534 self.no_events_waker = Some(cx.waker().clone());
2535
2536 return Poll::Pending;
2537 }
2538 }
2539 }
2540
2541 fn on_swarm_event(&mut self, event: FromSwarm) {
2542 self.listen_addresses.on_swarm_event(&event);
2543 let external_addresses_changed = self.external_addresses.on_swarm_event(&event);
2544
2545 if self.auto_mode && external_addresses_changed {
2546 self.determine_mode_from_external_addresses();
2547 }
2548
2549 match event {
2550 FromSwarm::ConnectionEstablished(connection_established) => {
2551 self.on_connection_established(connection_established)
2552 }
2553 FromSwarm::ConnectionClosed(connection_closed) => {
2554 self.on_connection_closed(connection_closed)
2555 }
2556 FromSwarm::DialFailure(dial_failure) => self.on_dial_failure(dial_failure),
2557 FromSwarm::AddressChange(address_change) => self.on_address_change(address_change),
2558 _ => {}
2559 }
2560 }
2561}
2562
2563/// A quorum w.r.t. the configured replication factor specifies the minimum
2564/// number of distinct nodes that must be successfully contacted in order
2565/// for a query to succeed.
2566#[derive(Debug, Copy, Clone, PartialEq, Eq)]
2567pub enum Quorum {
2568 One,
2569 Majority,
2570 All,
2571 N(NonZeroUsize),
2572}
2573
2574impl Quorum {
2575 /// Evaluate the quorum w.r.t a given total (number of peers).
2576 fn eval(&self, total: NonZeroUsize) -> NonZeroUsize {
2577 match self {
2578 Quorum::One => NonZeroUsize::new(1).expect("1 != 0"),
2579 Quorum::Majority => NonZeroUsize::new(total.get() / 2 + 1).expect("n + 1 != 0"),
2580 Quorum::All => total,
2581 Quorum::N(n) => NonZeroUsize::min(total, *n),
2582 }
2583 }
2584}
2585
2586/// A record either received by the given peer or retrieved from the local
2587/// record store.
2588#[derive(Debug, Clone, PartialEq, Eq)]
2589pub struct PeerRecord {
2590 /// The peer from whom the record was received. `None` if the record was
2591 /// retrieved from local storage.
2592 pub peer: Option<PeerId>,
2593 pub record: Record,
2594}
2595
2596//////////////////////////////////////////////////////////////////////////////
2597// Events
2598
2599/// The events produced by the `Kademlia` behaviour.
2600///
2601/// See [`NetworkBehaviour::poll`].
2602#[derive(Debug, Clone)]
2603#[allow(clippy::large_enum_variant)]
2604pub enum Event {
2605 /// An inbound request has been received and handled.
2606 //
2607 // Note on the difference between 'request' and 'query': A request is a
2608 // single request-response style exchange with a single remote peer. A query
2609 // is made of multiple requests across multiple remote peers.
2610 InboundRequest { request: InboundRequest },
2611
2612 /// An outbound query has made progress.
2613 OutboundQueryProgressed {
2614 /// The ID of the query that finished.
2615 id: QueryId,
2616 /// The intermediate result of the query.
2617 result: QueryResult,
2618 /// Execution statistics from the query.
2619 stats: QueryStats,
2620 /// Indicates which event this is, if therer are multiple responses for a single query.
2621 step: ProgressStep,
2622 },
2623
2624 /// The routing table has been updated with a new peer and / or
2625 /// address, thereby possibly evicting another peer.
2626 RoutingUpdated {
2627 /// The ID of the peer that was added or updated.
2628 peer: PeerId,
2629 /// Whether this is a new peer and was thus just added to the routing
2630 /// table, or whether it is an existing peer who's addresses changed.
2631 is_new_peer: bool,
2632 /// The full list of known addresses of `peer`.
2633 addresses: Addresses,
2634 /// Returns the minimum inclusive and maximum inclusive distance for
2635 /// the bucket of the peer.
2636 bucket_range: (Distance, Distance),
2637 /// The ID of the peer that was evicted from the routing table to make
2638 /// room for the new peer, if any.
2639 old_peer: Option<PeerId>,
2640 },
2641
2642 /// A peer has connected for whom no listen address is known.
2643 ///
2644 /// If the peer is to be added to the routing table, a known
2645 /// listen address for the peer must be provided via [`Behaviour::add_address`].
2646 UnroutablePeer { peer: PeerId },
2647
2648 /// A connection to a peer has been established for whom a listen address
2649 /// is known but the peer has not been added to the routing table either
2650 /// because [`BucketInserts::Manual`] is configured or because
2651 /// the corresponding bucket is full.
2652 ///
2653 /// If the peer is to be included in the routing table, it must
2654 /// must be explicitly added via [`Behaviour::add_address`], possibly after
2655 /// removing another peer.
2656 ///
2657 /// See [`Behaviour::kbucket`] for insight into the contents of
2658 /// the k-bucket of `peer`.
2659 RoutablePeer { peer: PeerId, address: Multiaddr },
2660
2661 /// A connection to a peer has been established for whom a listen address
2662 /// is known but the peer is only pending insertion into the routing table
2663 /// if the least-recently disconnected peer is unresponsive, i.e. the peer
2664 /// may not make it into the routing table.
2665 ///
2666 /// If the peer is to be unconditionally included in the routing table,
2667 /// it should be explicitly added via [`Behaviour::add_address`] after
2668 /// removing another peer.
2669 ///
2670 /// See [`Behaviour::kbucket`] for insight into the contents of
2671 /// the k-bucket of `peer`.
2672 PendingRoutablePeer { peer: PeerId, address: Multiaddr },
2673
2674 /// This peer's mode has been updated automatically.
2675 ///
2676 /// This happens in response to an external
2677 /// address being added or removed.
2678 ModeChanged { new_mode: Mode },
2679}
2680
2681/// Information about progress events.
2682#[derive(Debug, Clone)]
2683pub struct ProgressStep {
2684 /// The index into the event
2685 pub count: NonZeroUsize,
2686 /// Is this the final event?
2687 pub last: bool,
2688}
2689
2690impl ProgressStep {
2691 fn first() -> Self {
2692 Self {
2693 count: NonZeroUsize::new(1).expect("1 to be greater than 0."),
2694 last: false,
2695 }
2696 }
2697
2698 fn first_and_last() -> Self {
2699 let mut first = ProgressStep::first();
2700 first.last = true;
2701 first
2702 }
2703
2704 fn next(&self) -> Self {
2705 assert!(!self.last);
2706 let count = NonZeroUsize::new(self.count.get() + 1).expect("Adding 1 not to result in 0.");
2707
2708 Self { count, last: false }
2709 }
2710}
2711
2712/// Information about a received and handled inbound request.
2713#[derive(Debug, Clone)]
2714pub enum InboundRequest {
2715 /// Request for the list of nodes whose IDs are the closest to `key`.
2716 FindNode { num_closer_peers: usize },
2717 /// Same as `FindNode`, but should also return the entries of the local
2718 /// providers list for this key.
2719 GetProvider {
2720 num_closer_peers: usize,
2721 num_provider_peers: usize,
2722 },
2723 /// A peer sent an add provider request.
2724 /// If filtering [`StoreInserts::FilterBoth`] is enabled, the [`ProviderRecord`] is
2725 /// included.
2726 ///
2727 /// See [`StoreInserts`] and [`Config::set_record_filtering`] for details..
2728 AddProvider { record: Option<ProviderRecord> },
2729 /// Request to retrieve a record.
2730 GetRecord {
2731 num_closer_peers: usize,
2732 present_locally: bool,
2733 },
2734 /// A peer sent a put record request.
2735 /// If filtering [`StoreInserts::FilterBoth`] is enabled, the [`Record`] is included.
2736 ///
2737 /// See [`StoreInserts`] and [`Config::set_record_filtering`].
2738 PutRecord {
2739 source: PeerId,
2740 connection: ConnectionId,
2741 record: Option<Record>,
2742 },
2743}
2744
2745/// The results of Kademlia queries.
2746#[derive(Debug, Clone)]
2747pub enum QueryResult {
2748 /// The result of [`Behaviour::bootstrap`].
2749 Bootstrap(BootstrapResult),
2750
2751 /// The result of [`Behaviour::get_closest_peers`].
2752 GetClosestPeers(GetClosestPeersResult),
2753
2754 /// The result of [`Behaviour::get_providers`].
2755 GetProviders(GetProvidersResult),
2756
2757 /// The result of [`Behaviour::start_providing`].
2758 StartProviding(AddProviderResult),
2759
2760 /// The result of a (automatic) republishing of a provider record.
2761 RepublishProvider(AddProviderResult),
2762
2763 /// The result of [`Behaviour::get_record`].
2764 GetRecord(GetRecordResult),
2765
2766 /// The result of [`Behaviour::put_record`].
2767 PutRecord(PutRecordResult),
2768
2769 /// The result of a (automatic) republishing of a (value-)record.
2770 RepublishRecord(PutRecordResult),
2771}
2772
2773/// The result of [`Behaviour::get_record`].
2774pub type GetRecordResult = Result<GetRecordOk, GetRecordError>;
2775
2776/// The successful result of [`Behaviour::get_record`].
2777#[derive(Debug, Clone)]
2778pub enum GetRecordOk {
2779 FoundRecord(PeerRecord),
2780 FinishedWithNoAdditionalRecord {
2781 /// If caching is enabled, these are the peers closest
2782 /// _to the record key_ (not the local node) that were queried but
2783 /// did not return the record, sorted by distance to the record key
2784 /// from closest to farthest. How many of these are tracked is configured
2785 /// by [`Config::set_caching`].
2786 ///
2787 /// Writing back the cache at these peers is a manual operation.
2788 /// ie. you may wish to use these candidates with [`Behaviour::put_record_to`]
2789 /// after selecting one of the returned records.
2790 cache_candidates: BTreeMap<kbucket::Distance, PeerId>,
2791 },
2792}
2793
2794/// The error result of [`Behaviour::get_record`].
2795#[derive(Debug, Clone, Error)]
2796pub enum GetRecordError {
2797 #[error("the record was not found")]
2798 NotFound {
2799 key: record::Key,
2800 closest_peers: Vec<PeerId>,
2801 },
2802 #[error("the quorum failed; needed {quorum} peers")]
2803 QuorumFailed {
2804 key: record::Key,
2805 records: Vec<PeerRecord>,
2806 quorum: NonZeroUsize,
2807 },
2808 #[error("the request timed out")]
2809 Timeout { key: record::Key },
2810}
2811
2812impl GetRecordError {
2813 /// Gets the key of the record for which the operation failed.
2814 pub fn key(&self) -> &record::Key {
2815 match self {
2816 GetRecordError::QuorumFailed { key, .. } => key,
2817 GetRecordError::Timeout { key, .. } => key,
2818 GetRecordError::NotFound { key, .. } => key,
2819 }
2820 }
2821
2822 /// Extracts the key of the record for which the operation failed,
2823 /// consuming the error.
2824 pub fn into_key(self) -> record::Key {
2825 match self {
2826 GetRecordError::QuorumFailed { key, .. } => key,
2827 GetRecordError::Timeout { key, .. } => key,
2828 GetRecordError::NotFound { key, .. } => key,
2829 }
2830 }
2831}
2832
2833/// The result of [`Behaviour::put_record`].
2834pub type PutRecordResult = Result<PutRecordOk, PutRecordError>;
2835
2836/// The successful result of [`Behaviour::put_record`].
2837#[derive(Debug, Clone)]
2838pub struct PutRecordOk {
2839 pub key: record::Key,
2840}
2841
2842/// The error result of [`Behaviour::put_record`].
2843#[derive(Debug, Clone, Error)]
2844pub enum PutRecordError {
2845 #[error("the quorum failed; needed {quorum} peers")]
2846 QuorumFailed {
2847 key: record::Key,
2848 /// [`PeerId`]s of the peers the record was successfully stored on.
2849 success: Vec<PeerId>,
2850 quorum: NonZeroUsize,
2851 },
2852 #[error("the request timed out")]
2853 Timeout {
2854 key: record::Key,
2855 /// [`PeerId`]s of the peers the record was successfully stored on.
2856 success: Vec<PeerId>,
2857 quorum: NonZeroUsize,
2858 },
2859}
2860
2861impl PutRecordError {
2862 /// Gets the key of the record for which the operation failed.
2863 pub fn key(&self) -> &record::Key {
2864 match self {
2865 PutRecordError::QuorumFailed { key, .. } => key,
2866 PutRecordError::Timeout { key, .. } => key,
2867 }
2868 }
2869
2870 /// Extracts the key of the record for which the operation failed,
2871 /// consuming the error.
2872 pub fn into_key(self) -> record::Key {
2873 match self {
2874 PutRecordError::QuorumFailed { key, .. } => key,
2875 PutRecordError::Timeout { key, .. } => key,
2876 }
2877 }
2878}
2879
2880/// The result of [`Behaviour::bootstrap`].
2881pub type BootstrapResult = Result<BootstrapOk, BootstrapError>;
2882
2883/// The successful result of [`Behaviour::bootstrap`].
2884#[derive(Debug, Clone)]
2885pub struct BootstrapOk {
2886 pub peer: PeerId,
2887 pub num_remaining: u32,
2888}
2889
2890/// The error result of [`Behaviour::bootstrap`].
2891#[derive(Debug, Clone, Error)]
2892pub enum BootstrapError {
2893 #[error("the request timed out")]
2894 Timeout {
2895 peer: PeerId,
2896 num_remaining: Option<u32>,
2897 },
2898}
2899
2900/// The result of [`Behaviour::get_closest_peers`].
2901pub type GetClosestPeersResult = Result<GetClosestPeersOk, GetClosestPeersError>;
2902
2903/// The successful result of [`Behaviour::get_closest_peers`].
2904#[derive(Debug, Clone)]
2905pub struct GetClosestPeersOk {
2906 pub key: Vec<u8>,
2907 pub peers: Vec<PeerId>,
2908}
2909
2910/// The error result of [`Behaviour::get_closest_peers`].
2911#[derive(Debug, Clone, Error)]
2912pub enum GetClosestPeersError {
2913 #[error("the request timed out")]
2914 Timeout { key: Vec<u8>, peers: Vec<PeerId> },
2915}
2916
2917impl GetClosestPeersError {
2918 /// Gets the key for which the operation failed.
2919 pub fn key(&self) -> &Vec<u8> {
2920 match self {
2921 GetClosestPeersError::Timeout { key, .. } => key,
2922 }
2923 }
2924
2925 /// Extracts the key for which the operation failed,
2926 /// consuming the error.
2927 pub fn into_key(self) -> Vec<u8> {
2928 match self {
2929 GetClosestPeersError::Timeout { key, .. } => key,
2930 }
2931 }
2932}
2933
2934/// The result of [`Behaviour::get_providers`].
2935pub type GetProvidersResult = Result<GetProvidersOk, GetProvidersError>;
2936
2937/// The successful result of [`Behaviour::get_providers`].
2938#[derive(Debug, Clone)]
2939pub enum GetProvidersOk {
2940 FoundProviders {
2941 key: record::Key,
2942 /// The new set of providers discovered.
2943 providers: HashSet<PeerId>,
2944 },
2945 FinishedWithNoAdditionalRecord {
2946 closest_peers: Vec<PeerId>,
2947 },
2948}
2949
2950/// The error result of [`Behaviour::get_providers`].
2951#[derive(Debug, Clone, Error)]
2952pub enum GetProvidersError {
2953 #[error("the request timed out")]
2954 Timeout {
2955 key: record::Key,
2956 closest_peers: Vec<PeerId>,
2957 },
2958}
2959
2960impl GetProvidersError {
2961 /// Gets the key for which the operation failed.
2962 pub fn key(&self) -> &record::Key {
2963 match self {
2964 GetProvidersError::Timeout { key, .. } => key,
2965 }
2966 }
2967
2968 /// Extracts the key for which the operation failed,
2969 /// consuming the error.
2970 pub fn into_key(self) -> record::Key {
2971 match self {
2972 GetProvidersError::Timeout { key, .. } => key,
2973 }
2974 }
2975}
2976
2977/// The result of publishing a provider record.
2978pub type AddProviderResult = Result<AddProviderOk, AddProviderError>;
2979
2980/// The successful result of publishing a provider record.
2981#[derive(Debug, Clone)]
2982pub struct AddProviderOk {
2983 pub key: record::Key,
2984}
2985
2986/// The possible errors when publishing a provider record.
2987#[derive(Debug, Clone, Error)]
2988pub enum AddProviderError {
2989 #[error("the request timed out")]
2990 Timeout { key: record::Key },
2991}
2992
2993impl AddProviderError {
2994 /// Gets the key for which the operation failed.
2995 pub fn key(&self) -> &record::Key {
2996 match self {
2997 AddProviderError::Timeout { key, .. } => key,
2998 }
2999 }
3000
3001 /// Extracts the key for which the operation failed,
3002 pub fn into_key(self) -> record::Key {
3003 match self {
3004 AddProviderError::Timeout { key, .. } => key,
3005 }
3006 }
3007}
3008
3009impl From<kbucket::EntryView<kbucket::Key<PeerId>, Addresses>> for KadPeer {
3010 fn from(e: kbucket::EntryView<kbucket::Key<PeerId>, Addresses>) -> KadPeer {
3011 KadPeer {
3012 node_id: e.node.key.into_preimage(),
3013 multiaddrs: e.node.value.into_vec(),
3014 connection_ty: match e.status {
3015 NodeStatus::Connected => ConnectionType::Connected,
3016 NodeStatus::Disconnected => ConnectionType::NotConnected,
3017 },
3018 }
3019 }
3020}
3021
3022//////////////////////////////////////////////////////////////////////////////
3023// Internal query state
3024
3025struct QueryInner {
3026 /// The query-specific state.
3027 info: QueryInfo,
3028 /// Addresses of peers discovered during a query.
3029 addresses: FnvHashMap<PeerId, SmallVec<[Multiaddr; 8]>>,
3030 /// A map of pending requests to peers.
3031 ///
3032 /// A request is pending if the targeted peer is not currently connected
3033 /// and these requests are sent as soon as a connection to the peer is established.
3034 pending_rpcs: SmallVec<[(PeerId, HandlerIn); K_VALUE.get()]>,
3035}
3036
3037impl QueryInner {
3038 fn new(info: QueryInfo) -> Self {
3039 QueryInner {
3040 info,
3041 addresses: Default::default(),
3042 pending_rpcs: SmallVec::default(),
3043 }
3044 }
3045}
3046
3047/// The context of a [`QueryInfo::AddProvider`] query.
3048#[derive(Debug, Copy, Clone, PartialEq, Eq)]
3049pub enum AddProviderContext {
3050 /// The context is a [`Behaviour::start_providing`] operation.
3051 Publish,
3052 /// The context is periodic republishing of provider announcements
3053 /// initiated earlier via [`Behaviour::start_providing`].
3054 Republish,
3055}
3056
3057/// The context of a [`QueryInfo::PutRecord`] query.
3058#[derive(Debug, Copy, Clone, PartialEq, Eq)]
3059pub enum PutRecordContext {
3060 /// The context is a [`Behaviour::put_record`] operation.
3061 Publish,
3062 /// The context is periodic republishing of records stored
3063 /// earlier via [`Behaviour::put_record`].
3064 Republish,
3065 /// The context is periodic replication (i.e. without extending
3066 /// the record TTL) of stored records received earlier from another peer.
3067 Replicate,
3068 /// The context is a custom store operation targeting specific
3069 /// peers initiated by [`Behaviour::put_record_to`].
3070 Custom,
3071}
3072
3073/// Information about a running query.
3074#[derive(Debug, Clone)]
3075pub enum QueryInfo {
3076 /// A query initiated by [`Behaviour::bootstrap`].
3077 Bootstrap {
3078 /// The targeted peer ID.
3079 peer: PeerId,
3080 /// The remaining random peer IDs to query, one per
3081 /// bucket that still needs refreshing.
3082 ///
3083 /// This is `None` if the initial self-lookup has not
3084 /// yet completed and `Some` with an exhausted iterator
3085 /// if bootstrapping is complete.
3086 remaining: Option<vec::IntoIter<kbucket::Key<PeerId>>>,
3087 step: ProgressStep,
3088 },
3089
3090 /// A (repeated) query initiated by [`Behaviour::get_closest_peers`].
3091 GetClosestPeers {
3092 /// The key being queried (the preimage).
3093 key: Vec<u8>,
3094 /// Current index of events.
3095 step: ProgressStep,
3096 },
3097
3098 /// A (repeated) query initiated by [`Behaviour::get_providers`].
3099 GetProviders {
3100 /// The key for which to search for providers.
3101 key: record::Key,
3102 /// The number of providers found so far.
3103 providers_found: usize,
3104 /// Current index of events.
3105 step: ProgressStep,
3106 },
3107
3108 /// A (repeated) query initiated by [`Behaviour::start_providing`].
3109 AddProvider {
3110 /// The record key.
3111 key: record::Key,
3112 /// The current phase of the query.
3113 phase: AddProviderPhase,
3114 /// The execution context of the query.
3115 context: AddProviderContext,
3116 },
3117
3118 /// A (repeated) query initiated by [`Behaviour::put_record`].
3119 PutRecord {
3120 record: Record,
3121 /// The expected quorum of responses w.r.t. the replication factor.
3122 quorum: NonZeroUsize,
3123 /// The current phase of the query.
3124 phase: PutRecordPhase,
3125 /// The execution context of the query.
3126 context: PutRecordContext,
3127 },
3128
3129 /// A (repeated) query initiated by [`Behaviour::get_record`].
3130 GetRecord {
3131 /// The key to look for.
3132 key: record::Key,
3133 /// Current index of events.
3134 step: ProgressStep,
3135 /// Did we find at least one record?
3136 found_a_record: bool,
3137 /// The peers closest to the `key` that were queried but did not return a record,
3138 /// i.e. the peers that are candidates for caching the record.
3139 cache_candidates: BTreeMap<kbucket::Distance, PeerId>,
3140 },
3141}
3142
3143impl QueryInfo {
3144 /// Creates an event for a handler to issue an outgoing request in the
3145 /// context of a query.
3146 fn to_request(&self, query_id: QueryId) -> HandlerIn {
3147 match &self {
3148 QueryInfo::Bootstrap { peer, .. } => HandlerIn::FindNodeReq {
3149 key: peer.to_bytes(),
3150 query_id,
3151 },
3152 QueryInfo::GetClosestPeers { key, .. } => HandlerIn::FindNodeReq {
3153 key: key.clone(),
3154 query_id,
3155 },
3156 QueryInfo::GetProviders { key, .. } => HandlerIn::GetProvidersReq {
3157 key: key.clone(),
3158 query_id,
3159 },
3160 QueryInfo::AddProvider { key, phase, .. } => match phase {
3161 AddProviderPhase::GetClosestPeers => HandlerIn::FindNodeReq {
3162 key: key.to_vec(),
3163 query_id,
3164 },
3165 AddProviderPhase::AddProvider {
3166 provider_id,
3167 external_addresses,
3168 ..
3169 } => HandlerIn::AddProvider {
3170 key: key.clone(),
3171 provider: crate::protocol::KadPeer {
3172 node_id: *provider_id,
3173 multiaddrs: external_addresses.clone(),
3174 connection_ty: crate::protocol::ConnectionType::Connected,
3175 },
3176 query_id,
3177 },
3178 },
3179 QueryInfo::GetRecord { key, .. } => HandlerIn::GetRecord {
3180 key: key.clone(),
3181 query_id,
3182 },
3183 QueryInfo::PutRecord { record, phase, .. } => match phase {
3184 PutRecordPhase::GetClosestPeers => HandlerIn::FindNodeReq {
3185 key: record.key.to_vec(),
3186 query_id,
3187 },
3188 PutRecordPhase::PutRecord { .. } => HandlerIn::PutRecord {
3189 record: record.clone(),
3190 query_id,
3191 },
3192 },
3193 }
3194 }
3195}
3196
3197/// The phases of a [`QueryInfo::AddProvider`] query.
3198#[derive(Debug, Clone)]
3199pub enum AddProviderPhase {
3200 /// The query is searching for the closest nodes to the record key.
3201 GetClosestPeers,
3202
3203 /// The query advertises the local node as a provider for the key to
3204 /// the closest nodes to the key.
3205 AddProvider {
3206 /// The local peer ID that is advertised as a provider.
3207 provider_id: PeerId,
3208 /// The external addresses of the provider being advertised.
3209 external_addresses: Vec<Multiaddr>,
3210 /// Query statistics from the finished `GetClosestPeers` phase.
3211 get_closest_peers_stats: QueryStats,
3212 },
3213}
3214
3215/// The phases of a [`QueryInfo::PutRecord`] query.
3216#[derive(Debug, Clone, PartialEq, Eq)]
3217pub enum PutRecordPhase {
3218 /// The query is searching for the closest nodes to the record key.
3219 GetClosestPeers,
3220
3221 /// The query is replicating the record to the closest nodes to the key.
3222 PutRecord {
3223 /// A list of peers the given record has been successfully replicated to.
3224 success: Vec<PeerId>,
3225 /// Query statistics from the finished `GetClosestPeers` phase.
3226 get_closest_peers_stats: QueryStats,
3227 },
3228}
3229
3230/// A mutable reference to a running query.
3231pub struct QueryMut<'a> {
3232 query: &'a mut Query<QueryInner>,
3233}
3234
3235impl<'a> QueryMut<'a> {
3236 pub fn id(&self) -> QueryId {
3237 self.query.id()
3238 }
3239
3240 /// Gets information about the type and state of the query.
3241 pub fn info(&self) -> &QueryInfo {
3242 &self.query.inner.info
3243 }
3244
3245 /// Gets execution statistics about the query.
3246 ///
3247 /// For a multi-phase query such as `put_record`, these are the
3248 /// statistics of the current phase.
3249 pub fn stats(&self) -> &QueryStats {
3250 self.query.stats()
3251 }
3252
3253 /// Finishes the query asap, without waiting for the
3254 /// regular termination conditions.
3255 pub fn finish(&mut self) {
3256 self.query.finish()
3257 }
3258}
3259
3260/// An immutable reference to a running query.
3261pub struct QueryRef<'a> {
3262 query: &'a Query<QueryInner>,
3263}
3264
3265impl<'a> QueryRef<'a> {
3266 pub fn id(&self) -> QueryId {
3267 self.query.id()
3268 }
3269
3270 /// Gets information about the type and state of the query.
3271 pub fn info(&self) -> &QueryInfo {
3272 &self.query.inner.info
3273 }
3274
3275 /// Gets execution statistics about the query.
3276 ///
3277 /// For a multi-phase query such as `put_record`, these are the
3278 /// statistics of the current phase.
3279 pub fn stats(&self) -> &QueryStats {
3280 self.query.stats()
3281 }
3282}
3283
3284/// An operation failed to due no known peers in the routing table.
3285#[derive(Debug, Clone)]
3286pub struct NoKnownPeers();
3287
3288impl fmt::Display for NoKnownPeers {
3289 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3290 write!(f, "No known peers.")
3291 }
3292}
3293
3294impl std::error::Error for NoKnownPeers {}
3295
3296/// The possible outcomes of [`Behaviour::add_address`].
3297#[derive(Debug, Clone, Copy, PartialEq, Eq)]
3298pub enum RoutingUpdate {
3299 /// The given peer and address has been added to the routing
3300 /// table.
3301 Success,
3302 /// The peer and address is pending insertion into
3303 /// the routing table, if a disconnected peer fails
3304 /// to respond. If the given peer and address ends up
3305 /// in the routing table, [`Event::RoutingUpdated`]
3306 /// is eventually emitted.
3307 Pending,
3308 /// The routing table update failed, either because the
3309 /// corresponding bucket for the peer is full and the
3310 /// pending slot(s) are occupied, or because the given
3311 /// peer ID is deemed invalid (e.g. refers to the local
3312 /// peer ID).
3313 Failed,
3314}
3315
3316#[derive(PartialEq, Copy, Clone, Debug)]
3317pub enum Mode {
3318 Client,
3319 Server,
3320}
3321
3322impl fmt::Display for Mode {
3323 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3324 match self {
3325 Mode::Client => write!(f, "client"),
3326 Mode::Server => write!(f, "server"),
3327 }
3328 }
3329}
3330
3331fn to_comma_separated_list<T>(confirmed_external_addresses: &[T]) -> String
3332where
3333 T: ToString,
3334{
3335 confirmed_external_addresses
3336 .iter()
3337 .map(|addr| addr.to_string())
3338 .collect::<Vec<_>>()
3339 .join(", ")
3340}