libp2p_kad/query/peers/
closest.rs

1// Copyright 2019 Parity Technologies (UK) Ltd.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21use super::*;
22
23use crate::kbucket::{Distance, Key, KeyBytes};
24use crate::{ALPHA_VALUE, K_VALUE};
25use instant::Instant;
26use libp2p_identity::PeerId;
27use std::collections::btree_map::{BTreeMap, Entry};
28use std::{iter::FromIterator, num::NonZeroUsize, time::Duration};
29
30pub(crate) mod disjoint;
31/// A peer iterator for a dynamically changing list of peers, sorted by increasing
32/// distance to a chosen target.
33#[derive(Debug, Clone)]
34pub struct ClosestPeersIter {
35    config: ClosestPeersIterConfig,
36
37    /// The target whose distance to any peer determines the position of
38    /// the peer in the iterator.
39    target: KeyBytes,
40
41    /// The internal iterator state.
42    state: State,
43
44    /// The closest peers to the target, ordered by increasing distance.
45    closest_peers: BTreeMap<Distance, Peer>,
46
47    /// The number of peers for which the iterator is currently waiting for results.
48    num_waiting: usize,
49}
50
51/// Configuration for a `ClosestPeersIter`.
52#[derive(Debug, Clone)]
53pub struct ClosestPeersIterConfig {
54    /// Allowed level of parallelism.
55    ///
56    /// The `α` parameter in the Kademlia paper. The maximum number of peers that
57    /// the iterator is allowed to wait for in parallel while iterating towards the closest
58    /// nodes to a target. Defaults to `ALPHA_VALUE`.
59    pub parallelism: NonZeroUsize,
60
61    /// Number of results (closest peers) to search for.
62    ///
63    /// The number of closest peers for which the iterator must obtain successful results
64    /// in order to finish successfully. Defaults to `K_VALUE`.
65    pub num_results: NonZeroUsize,
66
67    /// The timeout for a single peer.
68    ///
69    /// If a successful result is not reported for a peer within this timeout
70    /// window, the iterator considers the peer unresponsive and will not wait for
71    /// the peer when evaluating the termination conditions, until and unless a
72    /// result is delivered. Defaults to `10` seconds.
73    pub peer_timeout: Duration,
74}
75
76impl Default for ClosestPeersIterConfig {
77    fn default() -> Self {
78        ClosestPeersIterConfig {
79            parallelism: ALPHA_VALUE,
80            num_results: K_VALUE,
81            peer_timeout: Duration::from_secs(10),
82        }
83    }
84}
85
86impl ClosestPeersIter {
87    /// Creates a new iterator with a default configuration.
88    pub fn new<I>(target: KeyBytes, known_closest_peers: I) -> Self
89    where
90        I: IntoIterator<Item = Key<PeerId>>,
91    {
92        Self::with_config(
93            ClosestPeersIterConfig::default(),
94            target,
95            known_closest_peers,
96        )
97    }
98
99    /// Creates a new iterator with the given configuration.
100    pub fn with_config<I, T>(
101        config: ClosestPeersIterConfig,
102        target: T,
103        known_closest_peers: I,
104    ) -> Self
105    where
106        I: IntoIterator<Item = Key<PeerId>>,
107        T: Into<KeyBytes>,
108    {
109        let target = target.into();
110
111        // Initialise the closest peers to start the iterator with.
112        let closest_peers = BTreeMap::from_iter(
113            known_closest_peers
114                .into_iter()
115                .map(|key| {
116                    let distance = key.distance(&target);
117                    let state = PeerState::NotContacted;
118                    (distance, Peer { key, state })
119                })
120                .take(K_VALUE.into()),
121        );
122
123        // The iterator initially makes progress by iterating towards the target.
124        let state = State::Iterating { no_progress: 0 };
125
126        ClosestPeersIter {
127            config,
128            target,
129            state,
130            closest_peers,
131            num_waiting: 0,
132        }
133    }
134
135    /// Callback for delivering the result of a successful request to a peer.
136    ///
137    /// Delivering results of requests back to the iterator allows the iterator to make
138    /// progress. The iterator is said to make progress either when the given
139    /// `closer_peers` contain a peer closer to the target than any peer seen so far,
140    /// or when the iterator did not yet accumulate `num_results` closest peers and
141    /// `closer_peers` contains a new peer, regardless of its distance to the target.
142    ///
143    /// If the iterator is currently waiting for a result from `peer`,
144    /// the iterator state is updated and `true` is returned. In that
145    /// case, after calling this function, `next` should eventually be
146    /// called again to obtain the new state of the iterator.
147    ///
148    /// If the iterator is finished, it is not currently waiting for a
149    /// result from `peer`, or a result for `peer` has already been reported,
150    /// calling this function has no effect and `false` is returned.
151    pub fn on_success<I>(&mut self, peer: &PeerId, closer_peers: I) -> bool
152    where
153        I: IntoIterator<Item = PeerId>,
154    {
155        if let State::Finished = self.state {
156            return false;
157        }
158
159        let key = Key::from(*peer);
160        let distance = key.distance(&self.target);
161
162        // Mark the peer as succeeded.
163        match self.closest_peers.entry(distance) {
164            Entry::Vacant(..) => return false,
165            Entry::Occupied(mut e) => match e.get().state {
166                PeerState::Waiting(..) => {
167                    debug_assert!(self.num_waiting > 0);
168                    self.num_waiting -= 1;
169                    e.get_mut().state = PeerState::Succeeded;
170                }
171                PeerState::Unresponsive => {
172                    e.get_mut().state = PeerState::Succeeded;
173                }
174                PeerState::NotContacted | PeerState::Failed | PeerState::Succeeded => return false,
175            },
176        }
177
178        // Incorporate the reported closer peers into the iterator.
179        //
180        // The iterator makes progress if:
181        //     1, the iterator did not yet accumulate enough closest peers.
182        //   OR
183        //     2, any of the new peers is closer to the target than any peer seen so far
184        //        (i.e. is the first entry after being incorporated)
185        let mut progress = self.closest_peers.len() < self.config.num_results.get();
186        for peer in closer_peers {
187            let key = peer.into();
188            let distance = self.target.distance(&key);
189            let peer = Peer {
190                key,
191                state: PeerState::NotContacted,
192            };
193            self.closest_peers.entry(distance).or_insert(peer);
194
195            progress = self.closest_peers.keys().next() == Some(&distance) || progress;
196        }
197
198        // Update the iterator state.
199        self.state = match self.state {
200            State::Iterating { no_progress } => {
201                let no_progress = if progress { 0 } else { no_progress + 1 };
202                if no_progress >= self.config.parallelism.get() {
203                    State::Stalled
204                } else {
205                    State::Iterating { no_progress }
206                }
207            }
208            State::Stalled => {
209                if progress {
210                    State::Iterating { no_progress: 0 }
211                } else {
212                    State::Stalled
213                }
214            }
215            State::Finished => State::Finished,
216        };
217
218        true
219    }
220
221    /// Callback for informing the iterator about a failed request to a peer.
222    ///
223    /// If the iterator is currently waiting for a result from `peer`,
224    /// the iterator state is updated and `true` is returned. In that
225    /// case, after calling this function, `next` should eventually be
226    /// called again to obtain the new state of the iterator.
227    ///
228    /// If the iterator is finished, it is not currently waiting for a
229    /// result from `peer`, or a result for `peer` has already been reported,
230    /// calling this function has no effect and `false` is returned.
231    pub fn on_failure(&mut self, peer: &PeerId) -> bool {
232        if let State::Finished = self.state {
233            return false;
234        }
235
236        let key = Key::from(*peer);
237        let distance = key.distance(&self.target);
238
239        match self.closest_peers.entry(distance) {
240            Entry::Vacant(_) => return false,
241            Entry::Occupied(mut e) => match e.get().state {
242                PeerState::Waiting(_) => {
243                    debug_assert!(self.num_waiting > 0);
244                    self.num_waiting -= 1;
245                    e.get_mut().state = PeerState::Failed
246                }
247                PeerState::Unresponsive => e.get_mut().state = PeerState::Failed,
248                PeerState::NotContacted | PeerState::Failed | PeerState::Succeeded => return false,
249            },
250        }
251
252        true
253    }
254
255    /// Returns the list of peers for which the iterator is currently waiting
256    /// for results.
257    pub fn waiting(&self) -> impl Iterator<Item = &PeerId> {
258        self.closest_peers
259            .values()
260            .filter_map(|peer| match peer.state {
261                PeerState::Waiting(..) => Some(peer.key.preimage()),
262                _ => None,
263            })
264    }
265
266    /// Returns the number of peers for which the iterator is currently
267    /// waiting for results.
268    pub fn num_waiting(&self) -> usize {
269        self.num_waiting
270    }
271
272    /// Returns true if the iterator is waiting for a response from the given peer.
273    pub fn is_waiting(&self, peer: &PeerId) -> bool {
274        self.waiting().any(|p| peer == p)
275    }
276
277    /// Advances the state of the iterator, potentially getting a new peer to contact.
278    pub fn next(&mut self, now: Instant) -> PeersIterState<'_> {
279        if let State::Finished = self.state {
280            return PeersIterState::Finished;
281        }
282
283        // Count the number of peers that returned a result. If there is a
284        // request in progress to one of the `num_results` closest peers, the
285        // counter is set to `None` as the iterator can only finish once
286        // `num_results` closest peers have responded (or there are no more
287        // peers to contact, see `num_waiting`).
288        let mut result_counter = Some(0);
289
290        // Check if the iterator is at capacity w.r.t. the allowed parallelism.
291        let at_capacity = self.at_capacity();
292
293        for peer in self.closest_peers.values_mut() {
294            match peer.state {
295                PeerState::Waiting(timeout) => {
296                    if now >= timeout {
297                        // Unresponsive peers no longer count towards the limit for the
298                        // bounded parallelism, though they might still be ongoing and
299                        // their results can still be delivered to the iterator.
300                        debug_assert!(self.num_waiting > 0);
301                        self.num_waiting -= 1;
302                        peer.state = PeerState::Unresponsive
303                    } else if at_capacity {
304                        // The iterator is still waiting for a result from a peer and is
305                        // at capacity w.r.t. the maximum number of peers being waited on.
306                        return PeersIterState::WaitingAtCapacity;
307                    } else {
308                        // The iterator is still waiting for a result from a peer and the
309                        // `result_counter` did not yet reach `num_results`. Therefore
310                        // the iterator is not yet done, regardless of already successful
311                        // queries to peers farther from the target.
312                        result_counter = None;
313                    }
314                }
315
316                PeerState::Succeeded => {
317                    if let Some(ref mut cnt) = result_counter {
318                        *cnt += 1;
319                        // If `num_results` successful results have been delivered for the
320                        // closest peers, the iterator is done.
321                        if *cnt >= self.config.num_results.get() {
322                            self.state = State::Finished;
323                            return PeersIterState::Finished;
324                        }
325                    }
326                }
327
328                PeerState::NotContacted => {
329                    if !at_capacity {
330                        let timeout = now + self.config.peer_timeout;
331                        peer.state = PeerState::Waiting(timeout);
332                        self.num_waiting += 1;
333                        return PeersIterState::Waiting(Some(Cow::Borrowed(peer.key.preimage())));
334                    } else {
335                        return PeersIterState::WaitingAtCapacity;
336                    }
337                }
338
339                PeerState::Unresponsive | PeerState::Failed => {
340                    // Skip over unresponsive or failed peers.
341                }
342            }
343        }
344
345        if self.num_waiting > 0 {
346            // The iterator is still waiting for results and not at capacity w.r.t.
347            // the allowed parallelism, but there are no new peers to contact
348            // at the moment.
349            PeersIterState::Waiting(None)
350        } else {
351            // The iterator is finished because all available peers have been contacted
352            // and the iterator is not waiting for any more results.
353            self.state = State::Finished;
354            PeersIterState::Finished
355        }
356    }
357
358    /// Immediately transitions the iterator to [`PeersIterState::Finished`].
359    pub fn finish(&mut self) {
360        self.state = State::Finished
361    }
362
363    /// Checks whether the iterator has finished.
364    pub fn is_finished(&self) -> bool {
365        self.state == State::Finished
366    }
367
368    /// Consumes the iterator, returning the closest peers.
369    pub fn into_result(self) -> impl Iterator<Item = PeerId> {
370        self.closest_peers
371            .into_iter()
372            .filter_map(|(_, peer)| {
373                if let PeerState::Succeeded = peer.state {
374                    Some(peer.key.into_preimage())
375                } else {
376                    None
377                }
378            })
379            .take(self.config.num_results.get())
380    }
381
382    /// Checks if the iterator is at capacity w.r.t. the permitted parallelism.
383    ///
384    /// While the iterator is stalled, up to `num_results` parallel requests
385    /// are allowed. This is a slightly more permissive variant of the
386    /// requirement that the initiator "resends the FIND_NODE to all of the
387    /// k closest nodes it has not already queried".
388    fn at_capacity(&self) -> bool {
389        match self.state {
390            State::Stalled => {
391                self.num_waiting
392                    >= usize::max(self.config.num_results.get(), self.config.parallelism.get())
393            }
394            State::Iterating { .. } => self.num_waiting >= self.config.parallelism.get(),
395            State::Finished => true,
396        }
397    }
398}
399
400////////////////////////////////////////////////////////////////////////////////
401// Private state
402
403/// Internal state of the iterator.
404#[derive(Debug, PartialEq, Eq, Copy, Clone)]
405enum State {
406    /// The iterator is making progress by iterating towards `num_results` closest
407    /// peers to the target with a maximum of `parallelism` peers for which the
408    /// iterator is waiting for results at a time.
409    ///
410    /// > **Note**: When the iterator switches back to `Iterating` after being
411    /// > `Stalled`, it may temporarily be waiting for more than `parallelism`
412    /// > results from peers, with new peers only being considered once
413    /// > the number pending results drops below `parallelism`.
414    Iterating {
415        /// The number of consecutive results that did not yield a peer closer
416        /// to the target. When this number reaches `parallelism` and no new
417        /// peer was discovered or at least `num_results` peers are known to
418        /// the iterator, it is considered `Stalled`.
419        no_progress: usize,
420    },
421
422    /// A iterator is stalled when it did not make progress after `parallelism`
423    /// consecutive successful results (see `on_success`).
424    ///
425    /// While the iterator is stalled, the maximum allowed parallelism for pending
426    /// results is increased to `num_results` in an attempt to finish the iterator.
427    /// If the iterator can make progress again upon receiving the remaining
428    /// results, it switches back to `Iterating`. Otherwise it will be finished.
429    Stalled,
430
431    /// The iterator is finished.
432    ///
433    /// A iterator finishes either when it has collected `num_results` results
434    /// from the closest peers (not counting those that failed or are unresponsive)
435    /// or because the iterator ran out of peers that have not yet delivered
436    /// results (or failed).
437    Finished,
438}
439
440/// Representation of a peer in the context of a iterator.
441#[derive(Debug, Clone)]
442struct Peer {
443    key: Key<PeerId>,
444    state: PeerState,
445}
446
447/// The state of a single `Peer`.
448#[derive(Debug, Copy, Clone)]
449enum PeerState {
450    /// The peer has not yet been contacted.
451    ///
452    /// This is the starting state for every peer.
453    NotContacted,
454
455    /// The iterator is waiting for a result from the peer.
456    Waiting(Instant),
457
458    /// A result was not delivered for the peer within the configured timeout.
459    ///
460    /// The peer is not taken into account for the termination conditions
461    /// of the iterator until and unless it responds.
462    Unresponsive,
463
464    /// Obtaining a result from the peer has failed.
465    ///
466    /// This is a final state, reached as a result of a call to `on_failure`.
467    Failed,
468
469    /// A successful result from the peer has been delivered.
470    ///
471    /// This is a final state, reached as a result of a call to `on_success`.
472    Succeeded,
473}
474
475#[cfg(test)]
476mod tests {
477    use super::*;
478    use crate::SHA_256_MH;
479    use libp2p_core::multihash::Multihash;
480    use libp2p_identity::PeerId;
481    use quickcheck::*;
482    use rand::{rngs::StdRng, Rng, SeedableRng};
483    use std::{iter, time::Duration};
484
485    fn random_peers<R: Rng>(n: usize, g: &mut R) -> Vec<PeerId> {
486        (0..n)
487            .map(|_| {
488                PeerId::from_multihash(Multihash::wrap(SHA_256_MH, &g.gen::<[u8; 32]>()).unwrap())
489                    .unwrap()
490            })
491            .collect()
492    }
493
494    fn sorted<T: AsRef<KeyBytes>>(target: &T, peers: &[Key<PeerId>]) -> bool {
495        peers
496            .windows(2)
497            .all(|w| w[0].distance(&target) < w[1].distance(&target))
498    }
499
500    #[derive(Clone, Debug)]
501    struct ArbitraryPeerId(PeerId);
502
503    impl Arbitrary for ArbitraryPeerId {
504        fn arbitrary(g: &mut Gen) -> ArbitraryPeerId {
505            let hash: [u8; 32] = core::array::from_fn(|_| u8::arbitrary(g));
506            let peer_id =
507                PeerId::from_multihash(Multihash::wrap(SHA_256_MH, &hash).unwrap()).unwrap();
508            ArbitraryPeerId(peer_id)
509        }
510    }
511
512    impl Arbitrary for ClosestPeersIter {
513        fn arbitrary(g: &mut Gen) -> ClosestPeersIter {
514            let known_closest_peers = (0..g.gen_range(1..60u8))
515                .map(|_| Key::from(ArbitraryPeerId::arbitrary(g).0))
516                .collect::<Vec<_>>();
517            let target = Key::from(ArbitraryPeerId::arbitrary(g).0);
518            let config = ClosestPeersIterConfig {
519                parallelism: NonZeroUsize::new(g.gen_range(1..10)).unwrap(),
520                num_results: NonZeroUsize::new(g.gen_range(1..25)).unwrap(),
521                peer_timeout: Duration::from_secs(g.gen_range(10..30)),
522            };
523            ClosestPeersIter::with_config(config, target, known_closest_peers)
524        }
525    }
526
527    #[derive(Clone, Debug)]
528    struct Seed([u8; 32]);
529
530    impl Arbitrary for Seed {
531        fn arbitrary(g: &mut Gen) -> Seed {
532            let seed = core::array::from_fn(|_| u8::arbitrary(g));
533            Seed(seed)
534        }
535    }
536
537    #[test]
538    fn new_iter() {
539        fn prop(iter: ClosestPeersIter) {
540            let target = iter.target.clone();
541
542            let (keys, states): (Vec<_>, Vec<_>) = iter
543                .closest_peers
544                .values()
545                .map(|e| (e.key.clone(), &e.state))
546                .unzip();
547
548            let none_contacted = states.iter().all(|s| matches!(s, PeerState::NotContacted));
549
550            assert!(none_contacted, "Unexpected peer state in new iterator.");
551            assert!(
552                sorted(&target, &keys),
553                "Closest peers in new iterator not sorted by distance to target."
554            );
555            assert_eq!(
556                iter.num_waiting(),
557                0,
558                "Unexpected peers in progress in new iterator."
559            );
560            assert_eq!(
561                iter.into_result().count(),
562                0,
563                "Unexpected closest peers in new iterator"
564            );
565        }
566
567        QuickCheck::new().tests(10).quickcheck(prop as fn(_) -> _)
568    }
569
570    #[test]
571    fn termination_and_parallelism() {
572        fn prop(mut iter: ClosestPeersIter, seed: Seed) {
573            let now = Instant::now();
574            let mut rng = StdRng::from_seed(seed.0);
575
576            let mut expected = iter
577                .closest_peers
578                .values()
579                .map(|e| e.key.clone())
580                .collect::<Vec<_>>();
581            let num_known = expected.len();
582            let max_parallelism = usize::min(iter.config.parallelism.get(), num_known);
583
584            let target = iter.target.clone();
585            let mut remaining;
586            let mut num_failures = 0;
587
588            'finished: loop {
589                if expected.is_empty() {
590                    break;
591                }
592                // Split off the next up to `parallelism` expected peers.
593                else if expected.len() < max_parallelism {
594                    remaining = Vec::new();
595                } else {
596                    remaining = expected.split_off(max_parallelism);
597                }
598
599                // Advance for maximum parallelism.
600                for k in expected.iter() {
601                    match iter.next(now) {
602                        PeersIterState::Finished => break 'finished,
603                        PeersIterState::Waiting(Some(p)) => assert_eq!(&*p, k.preimage()),
604                        PeersIterState::Waiting(None) => panic!("Expected another peer."),
605                        PeersIterState::WaitingAtCapacity => {
606                            panic!("Unexpectedly reached capacity.")
607                        }
608                    }
609                }
610                let num_waiting = iter.num_waiting();
611                assert_eq!(num_waiting, expected.len());
612
613                // Check the bounded parallelism.
614                if iter.at_capacity() {
615                    assert_eq!(iter.next(now), PeersIterState::WaitingAtCapacity)
616                }
617
618                // Report results back to the iterator with a random number of "closer"
619                // peers or an error, thus finishing the "in-flight requests".
620                for (i, k) in expected.iter().enumerate() {
621                    if rng.gen_bool(0.75) {
622                        let num_closer = rng.gen_range(0..iter.config.num_results.get() + 1);
623                        let closer_peers = random_peers(num_closer, &mut rng);
624                        remaining.extend(closer_peers.iter().cloned().map(Key::from));
625                        iter.on_success(k.preimage(), closer_peers);
626                    } else {
627                        num_failures += 1;
628                        iter.on_failure(k.preimage());
629                    }
630                    assert_eq!(iter.num_waiting(), num_waiting - (i + 1));
631                }
632
633                // Re-sort the remaining expected peers for the next "round".
634                remaining.sort_by_key(|k| target.distance(&k));
635
636                expected = remaining
637            }
638
639            // The iterator must be finished.
640            assert_eq!(iter.next(now), PeersIterState::Finished);
641            assert_eq!(iter.state, State::Finished);
642
643            // Determine if all peers have been contacted by the iterator. This _must_ be
644            // the case if the iterator finished with fewer than the requested number
645            // of results.
646            let all_contacted = iter
647                .closest_peers
648                .values()
649                .all(|e| !matches!(e.state, PeerState::NotContacted | PeerState::Waiting { .. }));
650
651            let target = iter.target.clone();
652            let num_results = iter.config.num_results;
653            let result = iter.into_result();
654            let closest = result.map(Key::from).collect::<Vec<_>>();
655
656            assert!(sorted(&target, &closest));
657
658            if closest.len() < num_results.get() {
659                // The iterator returned fewer results than requested. Therefore
660                // either the initial number of known peers must have been
661                // less than the desired number of results, or there must
662                // have been failures.
663                assert!(num_known < num_results.get() || num_failures > 0);
664                // All peers must have been contacted.
665                assert!(all_contacted, "Not all peers have been contacted.");
666            } else {
667                assert_eq!(num_results.get(), closest.len(), "Too  many results.");
668            }
669        }
670
671        QuickCheck::new()
672            .tests(10)
673            .quickcheck(prop as fn(_, _) -> _)
674    }
675
676    #[test]
677    fn no_duplicates() {
678        fn prop(mut iter: ClosestPeersIter, closer: ArbitraryPeerId) -> bool {
679            let now = Instant::now();
680
681            let closer = vec![closer.0];
682
683            // A first peer reports a "closer" peer.
684            let peer1 = match iter.next(now) {
685                PeersIterState::Waiting(Some(p)) => p.into_owned(),
686                _ => panic!("No peer."),
687            };
688            iter.on_success(&peer1, closer.clone());
689            // Duplicate result from te same peer.
690            iter.on_success(&peer1, closer.clone());
691
692            // If there is a second peer, let it also report the same "closer" peer.
693            match iter.next(now) {
694                PeersIterState::Waiting(Some(p)) => {
695                    let peer2 = p.into_owned();
696                    assert!(iter.on_success(&peer2, closer.clone()))
697                }
698                PeersIterState::Finished => {}
699                _ => panic!("Unexpectedly iter state."),
700            };
701
702            // The "closer" peer must only be in the iterator once.
703            let n = iter
704                .closest_peers
705                .values()
706                .filter(|e| e.key.preimage() == &closer[0])
707                .count();
708            assert_eq!(n, 1);
709
710            true
711        }
712
713        QuickCheck::new()
714            .tests(10)
715            .quickcheck(prop as fn(_, _) -> _)
716    }
717
718    #[test]
719    fn timeout() {
720        fn prop(mut iter: ClosestPeersIter) -> bool {
721            let mut now = Instant::now();
722            let peer = iter
723                .closest_peers
724                .values()
725                .next()
726                .unwrap()
727                .key
728                .clone()
729                .into_preimage();
730
731            // Poll the iterator for the first peer to be in progress.
732            match iter.next(now) {
733                PeersIterState::Waiting(Some(id)) => assert_eq!(&*id, &peer),
734                _ => panic!(),
735            }
736
737            // Artificially advance the clock.
738            now += iter.config.peer_timeout;
739
740            // Advancing the iterator again should mark the first peer as unresponsive.
741            let _ = iter.next(now);
742            match &iter.closest_peers.values().next().unwrap() {
743                Peer {
744                    key,
745                    state: PeerState::Unresponsive,
746                } => {
747                    assert_eq!(key.preimage(), &peer);
748                }
749                Peer { state, .. } => panic!("Unexpected peer state: {state:?}"),
750            }
751
752            let finished = iter.is_finished();
753            iter.on_success(&peer, iter::empty());
754            let closest = iter.into_result().collect::<Vec<_>>();
755
756            if finished {
757                // Delivering results when the iterator already finished must have
758                // no effect.
759                assert_eq!(Vec::<PeerId>::new(), closest)
760            } else {
761                // Unresponsive peers can still deliver results while the iterator
762                // is not finished.
763                assert_eq!(vec![peer], closest)
764            }
765            true
766        }
767
768        QuickCheck::new().tests(10).quickcheck(prop as fn(_) -> _)
769    }
770
771    #[test]
772    fn without_success_try_up_to_k_peers() {
773        fn prop(mut iter: ClosestPeersIter) {
774            let now = Instant::now();
775
776            for _ in 0..(usize::min(iter.closest_peers.len(), K_VALUE.get())) {
777                match iter.next(now) {
778                    PeersIterState::Waiting(Some(p)) => {
779                        let peer = p.clone().into_owned();
780                        iter.on_failure(&peer);
781                    }
782                    _ => panic!("Expected iterator to yield another peer to query."),
783                }
784            }
785
786            assert_eq!(PeersIterState::Finished, iter.next(now));
787        }
788
789        QuickCheck::new().tests(10).quickcheck(prop as fn(_))
790    }
791
792    #[test]
793    fn stalled_at_capacity() {
794        fn prop(mut iter: ClosestPeersIter) {
795            iter.state = State::Stalled;
796
797            for i in 0..usize::max(iter.config.parallelism.get(), iter.config.num_results.get()) {
798                iter.num_waiting = i;
799                assert!(
800                    !iter.at_capacity(),
801                    "Iterator should not be at capacity if less than \
802                     `max(parallelism, num_results)` requests are waiting.",
803                )
804            }
805
806            iter.num_waiting =
807                usize::max(iter.config.parallelism.get(), iter.config.num_results.get());
808            assert!(
809                iter.at_capacity(),
810                "Iterator should be at capacity if `max(parallelism, num_results)` requests are \
811                 waiting.",
812            )
813        }
814
815        QuickCheck::new().tests(10).quickcheck(prop as fn(_))
816    }
817}