libp2p_kad/
query.rs

1// Copyright 2019 Parity Technologies (UK) Ltd.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21mod peers;
22
23use peers::closest::{
24    disjoint::ClosestDisjointPeersIter, ClosestPeersIter, ClosestPeersIterConfig,
25};
26use peers::fixed::FixedPeersIter;
27use peers::PeersIterState;
28
29use crate::kbucket::{Key, KeyBytes};
30use crate::{ALPHA_VALUE, K_VALUE};
31use either::Either;
32use fnv::FnvHashMap;
33use instant::Instant;
34use libp2p_identity::PeerId;
35use std::{num::NonZeroUsize, time::Duration};
36
37/// A `QueryPool` provides an aggregate state machine for driving `Query`s to completion.
38///
39/// Internally, a `Query` is in turn driven by an underlying `QueryPeerIter`
40/// that determines the peer selection strategy, i.e. the order in which the
41/// peers involved in the query should be contacted.
42pub(crate) struct QueryPool<TInner> {
43    next_id: usize,
44    config: QueryConfig,
45    queries: FnvHashMap<QueryId, Query<TInner>>,
46}
47
48/// The observable states emitted by [`QueryPool::poll`].
49pub(crate) enum QueryPoolState<'a, TInner> {
50    /// The pool is idle, i.e. there are no queries to process.
51    Idle,
52    /// At least one query is waiting for results. `Some(request)` indicates
53    /// that a new request is now being waited on.
54    Waiting(Option<(&'a mut Query<TInner>, PeerId)>),
55    /// A query has finished.
56    Finished(Query<TInner>),
57    /// A query has timed out.
58    Timeout(Query<TInner>),
59}
60
61impl<TInner> QueryPool<TInner> {
62    /// Creates a new `QueryPool` with the given configuration.
63    pub(crate) fn new(config: QueryConfig) -> Self {
64        QueryPool {
65            next_id: 0,
66            config,
67            queries: Default::default(),
68        }
69    }
70
71    /// Gets a reference to the `QueryConfig` used by the pool.
72    pub(crate) fn config(&self) -> &QueryConfig {
73        &self.config
74    }
75
76    /// Returns an iterator over the queries in the pool.
77    pub(crate) fn iter(&self) -> impl Iterator<Item = &Query<TInner>> {
78        self.queries.values()
79    }
80
81    /// Gets the current size of the pool, i.e. the number of running queries.
82    pub(crate) fn size(&self) -> usize {
83        self.queries.len()
84    }
85
86    /// Returns an iterator that allows modifying each query in the pool.
87    pub(crate) fn iter_mut(&mut self) -> impl Iterator<Item = &mut Query<TInner>> {
88        self.queries.values_mut()
89    }
90
91    /// Adds a query to the pool that contacts a fixed set of peers.
92    pub(crate) fn add_fixed<I>(&mut self, peers: I, inner: TInner) -> QueryId
93    where
94        I: IntoIterator<Item = PeerId>,
95    {
96        let id = self.next_query_id();
97        self.continue_fixed(id, peers, inner);
98        id
99    }
100
101    /// Continues an earlier query with a fixed set of peers, reusing
102    /// the given query ID, which must be from a query that finished
103    /// earlier.
104    pub(crate) fn continue_fixed<I>(&mut self, id: QueryId, peers: I, inner: TInner)
105    where
106        I: IntoIterator<Item = PeerId>,
107    {
108        assert!(!self.queries.contains_key(&id));
109        let parallelism = self.config.replication_factor;
110        let peer_iter = QueryPeerIter::Fixed(FixedPeersIter::new(peers, parallelism));
111        let query = Query::new(id, peer_iter, inner);
112        self.queries.insert(id, query);
113    }
114
115    /// Adds a query to the pool that iterates towards the closest peers to the target.
116    pub(crate) fn add_iter_closest<T, I>(&mut self, target: T, peers: I, inner: TInner) -> QueryId
117    where
118        T: Into<KeyBytes> + Clone,
119        I: IntoIterator<Item = Key<PeerId>>,
120    {
121        let id = self.next_query_id();
122        self.continue_iter_closest(id, target, peers, inner);
123        id
124    }
125
126    /// Adds a query to the pool that iterates towards the closest peers to the target.
127    pub(crate) fn continue_iter_closest<T, I>(
128        &mut self,
129        id: QueryId,
130        target: T,
131        peers: I,
132        inner: TInner,
133    ) where
134        T: Into<KeyBytes> + Clone,
135        I: IntoIterator<Item = Key<PeerId>>,
136    {
137        let cfg = ClosestPeersIterConfig {
138            num_results: self.config.replication_factor,
139            parallelism: self.config.parallelism,
140            ..ClosestPeersIterConfig::default()
141        };
142
143        let peer_iter = if self.config.disjoint_query_paths {
144            QueryPeerIter::ClosestDisjoint(ClosestDisjointPeersIter::with_config(
145                cfg, target, peers,
146            ))
147        } else {
148            QueryPeerIter::Closest(ClosestPeersIter::with_config(cfg, target, peers))
149        };
150
151        let query = Query::new(id, peer_iter, inner);
152        self.queries.insert(id, query);
153    }
154
155    fn next_query_id(&mut self) -> QueryId {
156        let id = QueryId(self.next_id);
157        self.next_id = self.next_id.wrapping_add(1);
158        id
159    }
160
161    /// Returns a reference to a query with the given ID, if it is in the pool.
162    pub(crate) fn get(&self, id: &QueryId) -> Option<&Query<TInner>> {
163        self.queries.get(id)
164    }
165
166    /// Returns a mutablereference to a query with the given ID, if it is in the pool.
167    pub(crate) fn get_mut(&mut self, id: &QueryId) -> Option<&mut Query<TInner>> {
168        self.queries.get_mut(id)
169    }
170
171    /// Polls the pool to advance the queries.
172    pub(crate) fn poll(&mut self, now: Instant) -> QueryPoolState<'_, TInner> {
173        let mut finished = None;
174        let mut timeout = None;
175        let mut waiting = None;
176
177        for (&query_id, query) in self.queries.iter_mut() {
178            query.stats.start = query.stats.start.or(Some(now));
179            match query.next(now) {
180                PeersIterState::Finished => {
181                    finished = Some(query_id);
182                    break;
183                }
184                PeersIterState::Waiting(Some(peer_id)) => {
185                    let peer = peer_id.into_owned();
186                    waiting = Some((query_id, peer));
187                    break;
188                }
189                PeersIterState::Waiting(None) | PeersIterState::WaitingAtCapacity => {
190                    let elapsed = now - query.stats.start.unwrap_or(now);
191                    if elapsed >= self.config.timeout {
192                        timeout = Some(query_id);
193                        break;
194                    }
195                }
196            }
197        }
198
199        if let Some((query_id, peer_id)) = waiting {
200            let query = self.queries.get_mut(&query_id).expect("s.a.");
201            return QueryPoolState::Waiting(Some((query, peer_id)));
202        }
203
204        if let Some(query_id) = finished {
205            let mut query = self.queries.remove(&query_id).expect("s.a.");
206            query.stats.end = Some(now);
207            return QueryPoolState::Finished(query);
208        }
209
210        if let Some(query_id) = timeout {
211            let mut query = self.queries.remove(&query_id).expect("s.a.");
212            query.stats.end = Some(now);
213            return QueryPoolState::Timeout(query);
214        }
215
216        if self.queries.is_empty() {
217            QueryPoolState::Idle
218        } else {
219            QueryPoolState::Waiting(None)
220        }
221    }
222}
223
224/// Unique identifier for an active query.
225#[derive(Debug, Copy, Clone, Hash, PartialEq, Eq)]
226pub struct QueryId(usize);
227
228impl std::fmt::Display for QueryId {
229    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
230        write!(f, "{}", self.0)
231    }
232}
233
234/// The configuration for queries in a `QueryPool`.
235#[derive(Debug, Clone)]
236pub(crate) struct QueryConfig {
237    /// Timeout of a single query.
238    ///
239    /// See [`crate::behaviour::Config::set_query_timeout`] for details.
240    pub(crate) timeout: Duration,
241    /// The replication factor to use.
242    ///
243    /// See [`crate::behaviour::Config::set_replication_factor`] for details.
244    pub(crate) replication_factor: NonZeroUsize,
245    /// Allowed level of parallelism for iterative queries.
246    ///
247    /// See [`crate::behaviour::Config::set_parallelism`] for details.
248    pub(crate) parallelism: NonZeroUsize,
249    /// Whether to use disjoint paths on iterative lookups.
250    ///
251    /// See [`crate::behaviour::Config::disjoint_query_paths`] for details.
252    pub(crate) disjoint_query_paths: bool,
253}
254
255impl Default for QueryConfig {
256    fn default() -> Self {
257        QueryConfig {
258            timeout: Duration::from_secs(60),
259            replication_factor: NonZeroUsize::new(K_VALUE.get()).expect("K_VALUE > 0"),
260            parallelism: ALPHA_VALUE,
261            disjoint_query_paths: false,
262        }
263    }
264}
265
266/// A query in a `QueryPool`.
267pub(crate) struct Query<TInner> {
268    /// The unique ID of the query.
269    id: QueryId,
270    /// The peer iterator that drives the query state.
271    peer_iter: QueryPeerIter,
272    /// Execution statistics of the query.
273    stats: QueryStats,
274    /// The opaque inner query state.
275    pub(crate) inner: TInner,
276}
277
278/// The peer selection strategies that can be used by queries.
279enum QueryPeerIter {
280    Closest(ClosestPeersIter),
281    ClosestDisjoint(ClosestDisjointPeersIter),
282    Fixed(FixedPeersIter),
283}
284
285impl<TInner> Query<TInner> {
286    /// Creates a new query without starting it.
287    fn new(id: QueryId, peer_iter: QueryPeerIter, inner: TInner) -> Self {
288        Query {
289            id,
290            inner,
291            peer_iter,
292            stats: QueryStats::empty(),
293        }
294    }
295
296    /// Gets the unique ID of the query.
297    pub(crate) fn id(&self) -> QueryId {
298        self.id
299    }
300
301    /// Gets the current execution statistics of the query.
302    pub(crate) fn stats(&self) -> &QueryStats {
303        &self.stats
304    }
305
306    /// Informs the query that the attempt to contact `peer` failed.
307    pub(crate) fn on_failure(&mut self, peer: &PeerId) {
308        let updated = match &mut self.peer_iter {
309            QueryPeerIter::Closest(iter) => iter.on_failure(peer),
310            QueryPeerIter::ClosestDisjoint(iter) => iter.on_failure(peer),
311            QueryPeerIter::Fixed(iter) => iter.on_failure(peer),
312        };
313        if updated {
314            self.stats.failure += 1;
315        }
316    }
317
318    /// Informs the query that the attempt to contact `peer` succeeded,
319    /// possibly resulting in new peers that should be incorporated into
320    /// the query, if applicable.
321    pub(crate) fn on_success<I>(&mut self, peer: &PeerId, new_peers: I)
322    where
323        I: IntoIterator<Item = PeerId>,
324    {
325        let updated = match &mut self.peer_iter {
326            QueryPeerIter::Closest(iter) => iter.on_success(peer, new_peers),
327            QueryPeerIter::ClosestDisjoint(iter) => iter.on_success(peer, new_peers),
328            QueryPeerIter::Fixed(iter) => iter.on_success(peer),
329        };
330        if updated {
331            self.stats.success += 1;
332        }
333    }
334
335    /// Advances the state of the underlying peer iterator.
336    fn next(&mut self, now: Instant) -> PeersIterState<'_> {
337        let state = match &mut self.peer_iter {
338            QueryPeerIter::Closest(iter) => iter.next(now),
339            QueryPeerIter::ClosestDisjoint(iter) => iter.next(now),
340            QueryPeerIter::Fixed(iter) => iter.next(),
341        };
342
343        if let PeersIterState::Waiting(Some(_)) = state {
344            self.stats.requests += 1;
345        }
346
347        state
348    }
349
350    /// Tries to (gracefully) finish the query prematurely, providing the peers
351    /// that are no longer of interest for further progress of the query.
352    ///
353    /// A query may require that in order to finish gracefully a certain subset
354    /// of peers must be contacted. E.g. in the case of disjoint query paths a
355    /// query may only finish gracefully if every path contacted a peer whose
356    /// response permits termination of the query. The given peers are those for
357    /// which this is considered to be the case, i.e. for which a termination
358    /// condition is satisfied.
359    ///
360    /// Returns `true` if the query did indeed finish, `false` otherwise. In the
361    /// latter case, a new attempt at finishing the query may be made with new
362    /// `peers`.
363    ///
364    /// A finished query immediately stops yielding new peers to contact and
365    /// will be reported by [`QueryPool::poll`] via
366    /// [`QueryPoolState::Finished`].
367    pub(crate) fn try_finish<'a, I>(&mut self, peers: I) -> bool
368    where
369        I: IntoIterator<Item = &'a PeerId>,
370    {
371        match &mut self.peer_iter {
372            QueryPeerIter::Closest(iter) => {
373                iter.finish();
374                true
375            }
376            QueryPeerIter::ClosestDisjoint(iter) => iter.finish_paths(peers),
377            QueryPeerIter::Fixed(iter) => {
378                iter.finish();
379                true
380            }
381        }
382    }
383
384    /// Finishes the query prematurely.
385    ///
386    /// A finished query immediately stops yielding new peers to contact and will be
387    /// reported by [`QueryPool::poll`] via [`QueryPoolState::Finished`].
388    pub(crate) fn finish(&mut self) {
389        match &mut self.peer_iter {
390            QueryPeerIter::Closest(iter) => iter.finish(),
391            QueryPeerIter::ClosestDisjoint(iter) => iter.finish(),
392            QueryPeerIter::Fixed(iter) => iter.finish(),
393        }
394    }
395
396    /// Checks whether the query has finished.
397    ///
398    /// A finished query is eventually reported by `QueryPool::next()` and
399    /// removed from the pool.
400    pub(crate) fn is_finished(&self) -> bool {
401        match &self.peer_iter {
402            QueryPeerIter::Closest(iter) => iter.is_finished(),
403            QueryPeerIter::ClosestDisjoint(iter) => iter.is_finished(),
404            QueryPeerIter::Fixed(iter) => iter.is_finished(),
405        }
406    }
407
408    /// Consumes the query, producing the final `QueryResult`.
409    pub(crate) fn into_result(self) -> QueryResult<TInner, impl Iterator<Item = PeerId>> {
410        let peers = match self.peer_iter {
411            QueryPeerIter::Closest(iter) => Either::Left(Either::Left(iter.into_result())),
412            QueryPeerIter::ClosestDisjoint(iter) => Either::Left(Either::Right(iter.into_result())),
413            QueryPeerIter::Fixed(iter) => Either::Right(iter.into_result()),
414        };
415        QueryResult {
416            peers,
417            inner: self.inner,
418            stats: self.stats,
419        }
420    }
421}
422
423/// The result of a `Query`.
424pub(crate) struct QueryResult<TInner, TPeers> {
425    /// The opaque inner query state.
426    pub(crate) inner: TInner,
427    /// The successfully contacted peers.
428    pub(crate) peers: TPeers,
429    /// The collected query statistics.
430    pub(crate) stats: QueryStats,
431}
432
433/// Execution statistics of a query.
434#[derive(Clone, Debug, PartialEq, Eq)]
435pub struct QueryStats {
436    requests: u32,
437    success: u32,
438    failure: u32,
439    start: Option<Instant>,
440    end: Option<Instant>,
441}
442
443impl QueryStats {
444    pub fn empty() -> Self {
445        QueryStats {
446            requests: 0,
447            success: 0,
448            failure: 0,
449            start: None,
450            end: None,
451        }
452    }
453
454    /// Gets the total number of requests initiated by the query.
455    pub fn num_requests(&self) -> u32 {
456        self.requests
457    }
458
459    /// Gets the number of successful requests.
460    pub fn num_successes(&self) -> u32 {
461        self.success
462    }
463
464    /// Gets the number of failed requests.
465    pub fn num_failures(&self) -> u32 {
466        self.failure
467    }
468
469    /// Gets the number of pending requests.
470    ///
471    /// > **Note**: A query can finish while still having pending
472    /// > requests, if the termination conditions are already met.
473    pub fn num_pending(&self) -> u32 {
474        self.requests - (self.success + self.failure)
475    }
476
477    /// Gets the duration of the query.
478    ///
479    /// If the query has not yet finished, the duration is measured from the
480    /// start of the query to the current instant.
481    ///
482    /// If the query did not yet start (i.e. yield the first peer to contact),
483    /// `None` is returned.
484    pub fn duration(&self) -> Option<Duration> {
485        if let Some(s) = self.start {
486            if let Some(e) = self.end {
487                Some(e - s)
488            } else {
489                Some(Instant::now() - s)
490            }
491        } else {
492            None
493        }
494    }
495
496    /// Merges these stats with the given stats of another query,
497    /// e.g. to accumulate statistics from a multi-phase query.
498    ///
499    /// Counters are merged cumulatively while the instants for
500    /// start and end of the queries are taken as the minimum and
501    /// maximum, respectively.
502    pub fn merge(self, other: QueryStats) -> Self {
503        QueryStats {
504            requests: self.requests + other.requests,
505            success: self.success + other.success,
506            failure: self.failure + other.failure,
507            start: match (self.start, other.start) {
508                (Some(a), Some(b)) => Some(std::cmp::min(a, b)),
509                (a, b) => a.or(b),
510            },
511            end: std::cmp::max(self.end, other.end),
512        }
513    }
514}