libp2p_kad/query/peers/closest.rs
1// Copyright 2019 Parity Technologies (UK) Ltd.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21use super::*;
22
23use crate::kbucket::{Distance, Key, KeyBytes};
24use crate::{ALPHA_VALUE, K_VALUE};
25use instant::Instant;
26use libp2p_identity::PeerId;
27use std::collections::btree_map::{BTreeMap, Entry};
28use std::{iter::FromIterator, num::NonZeroUsize, time::Duration};
29
30pub(crate) mod disjoint;
31/// A peer iterator for a dynamically changing list of peers, sorted by increasing
32/// distance to a chosen target.
33#[derive(Debug, Clone)]
34pub struct ClosestPeersIter {
35 config: ClosestPeersIterConfig,
36
37 /// The target whose distance to any peer determines the position of
38 /// the peer in the iterator.
39 target: KeyBytes,
40
41 /// The internal iterator state.
42 state: State,
43
44 /// The closest peers to the target, ordered by increasing distance.
45 closest_peers: BTreeMap<Distance, Peer>,
46
47 /// The number of peers for which the iterator is currently waiting for results.
48 num_waiting: usize,
49}
50
51/// Configuration for a `ClosestPeersIter`.
52#[derive(Debug, Clone)]
53pub struct ClosestPeersIterConfig {
54 /// Allowed level of parallelism.
55 ///
56 /// The `α` parameter in the Kademlia paper. The maximum number of peers that
57 /// the iterator is allowed to wait for in parallel while iterating towards the closest
58 /// nodes to a target. Defaults to `ALPHA_VALUE`.
59 pub parallelism: NonZeroUsize,
60
61 /// Number of results (closest peers) to search for.
62 ///
63 /// The number of closest peers for which the iterator must obtain successful results
64 /// in order to finish successfully. Defaults to `K_VALUE`.
65 pub num_results: NonZeroUsize,
66
67 /// The timeout for a single peer.
68 ///
69 /// If a successful result is not reported for a peer within this timeout
70 /// window, the iterator considers the peer unresponsive and will not wait for
71 /// the peer when evaluating the termination conditions, until and unless a
72 /// result is delivered. Defaults to `10` seconds.
73 pub peer_timeout: Duration,
74}
75
76impl Default for ClosestPeersIterConfig {
77 fn default() -> Self {
78 ClosestPeersIterConfig {
79 parallelism: ALPHA_VALUE,
80 num_results: K_VALUE,
81 peer_timeout: Duration::from_secs(10),
82 }
83 }
84}
85
86impl ClosestPeersIter {
87 /// Creates a new iterator with a default configuration.
88 pub fn new<I>(target: KeyBytes, known_closest_peers: I) -> Self
89 where
90 I: IntoIterator<Item = Key<PeerId>>,
91 {
92 Self::with_config(
93 ClosestPeersIterConfig::default(),
94 target,
95 known_closest_peers,
96 )
97 }
98
99 /// Creates a new iterator with the given configuration.
100 pub fn with_config<I, T>(
101 config: ClosestPeersIterConfig,
102 target: T,
103 known_closest_peers: I,
104 ) -> Self
105 where
106 I: IntoIterator<Item = Key<PeerId>>,
107 T: Into<KeyBytes>,
108 {
109 let target = target.into();
110
111 // Initialise the closest peers to start the iterator with.
112 let closest_peers = BTreeMap::from_iter(
113 known_closest_peers
114 .into_iter()
115 .map(|key| {
116 let distance = key.distance(&target);
117 let state = PeerState::NotContacted;
118 (distance, Peer { key, state })
119 })
120 .take(K_VALUE.into()),
121 );
122
123 // The iterator initially makes progress by iterating towards the target.
124 let state = State::Iterating { no_progress: 0 };
125
126 ClosestPeersIter {
127 config,
128 target,
129 state,
130 closest_peers,
131 num_waiting: 0,
132 }
133 }
134
135 /// Callback for delivering the result of a successful request to a peer.
136 ///
137 /// Delivering results of requests back to the iterator allows the iterator to make
138 /// progress. The iterator is said to make progress either when the given
139 /// `closer_peers` contain a peer closer to the target than any peer seen so far,
140 /// or when the iterator did not yet accumulate `num_results` closest peers and
141 /// `closer_peers` contains a new peer, regardless of its distance to the target.
142 ///
143 /// If the iterator is currently waiting for a result from `peer`,
144 /// the iterator state is updated and `true` is returned. In that
145 /// case, after calling this function, `next` should eventually be
146 /// called again to obtain the new state of the iterator.
147 ///
148 /// If the iterator is finished, it is not currently waiting for a
149 /// result from `peer`, or a result for `peer` has already been reported,
150 /// calling this function has no effect and `false` is returned.
151 pub fn on_success<I>(&mut self, peer: &PeerId, closer_peers: I) -> bool
152 where
153 I: IntoIterator<Item = PeerId>,
154 {
155 if let State::Finished = self.state {
156 return false;
157 }
158
159 let key = Key::from(*peer);
160 let distance = key.distance(&self.target);
161
162 // Mark the peer as succeeded.
163 match self.closest_peers.entry(distance) {
164 Entry::Vacant(..) => return false,
165 Entry::Occupied(mut e) => match e.get().state {
166 PeerState::Waiting(..) => {
167 debug_assert!(self.num_waiting > 0);
168 self.num_waiting -= 1;
169 e.get_mut().state = PeerState::Succeeded;
170 }
171 PeerState::Unresponsive => {
172 e.get_mut().state = PeerState::Succeeded;
173 }
174 PeerState::NotContacted | PeerState::Failed | PeerState::Succeeded => return false,
175 },
176 }
177
178 // Incorporate the reported closer peers into the iterator.
179 //
180 // The iterator makes progress if:
181 // 1, the iterator did not yet accumulate enough closest peers.
182 // OR
183 // 2, any of the new peers is closer to the target than any peer seen so far
184 // (i.e. is the first entry after being incorporated)
185 let mut progress = self.closest_peers.len() < self.config.num_results.get();
186 for peer in closer_peers {
187 let key = peer.into();
188 let distance = self.target.distance(&key);
189 let peer = Peer {
190 key,
191 state: PeerState::NotContacted,
192 };
193 self.closest_peers.entry(distance).or_insert(peer);
194
195 progress = self.closest_peers.keys().next() == Some(&distance) || progress;
196 }
197
198 // Update the iterator state.
199 self.state = match self.state {
200 State::Iterating { no_progress } => {
201 let no_progress = if progress { 0 } else { no_progress + 1 };
202 if no_progress >= self.config.parallelism.get() {
203 State::Stalled
204 } else {
205 State::Iterating { no_progress }
206 }
207 }
208 State::Stalled => {
209 if progress {
210 State::Iterating { no_progress: 0 }
211 } else {
212 State::Stalled
213 }
214 }
215 State::Finished => State::Finished,
216 };
217
218 true
219 }
220
221 /// Callback for informing the iterator about a failed request to a peer.
222 ///
223 /// If the iterator is currently waiting for a result from `peer`,
224 /// the iterator state is updated and `true` is returned. In that
225 /// case, after calling this function, `next` should eventually be
226 /// called again to obtain the new state of the iterator.
227 ///
228 /// If the iterator is finished, it is not currently waiting for a
229 /// result from `peer`, or a result for `peer` has already been reported,
230 /// calling this function has no effect and `false` is returned.
231 pub fn on_failure(&mut self, peer: &PeerId) -> bool {
232 if let State::Finished = self.state {
233 return false;
234 }
235
236 let key = Key::from(*peer);
237 let distance = key.distance(&self.target);
238
239 match self.closest_peers.entry(distance) {
240 Entry::Vacant(_) => return false,
241 Entry::Occupied(mut e) => match e.get().state {
242 PeerState::Waiting(_) => {
243 debug_assert!(self.num_waiting > 0);
244 self.num_waiting -= 1;
245 e.get_mut().state = PeerState::Failed
246 }
247 PeerState::Unresponsive => e.get_mut().state = PeerState::Failed,
248 PeerState::NotContacted | PeerState::Failed | PeerState::Succeeded => return false,
249 },
250 }
251
252 true
253 }
254
255 /// Returns the list of peers for which the iterator is currently waiting
256 /// for results.
257 pub fn waiting(&self) -> impl Iterator<Item = &PeerId> {
258 self.closest_peers
259 .values()
260 .filter_map(|peer| match peer.state {
261 PeerState::Waiting(..) => Some(peer.key.preimage()),
262 _ => None,
263 })
264 }
265
266 /// Returns the number of peers for which the iterator is currently
267 /// waiting for results.
268 pub fn num_waiting(&self) -> usize {
269 self.num_waiting
270 }
271
272 /// Returns true if the iterator is waiting for a response from the given peer.
273 pub fn is_waiting(&self, peer: &PeerId) -> bool {
274 self.waiting().any(|p| peer == p)
275 }
276
277 /// Advances the state of the iterator, potentially getting a new peer to contact.
278 pub fn next(&mut self, now: Instant) -> PeersIterState<'_> {
279 if let State::Finished = self.state {
280 return PeersIterState::Finished;
281 }
282
283 // Count the number of peers that returned a result. If there is a
284 // request in progress to one of the `num_results` closest peers, the
285 // counter is set to `None` as the iterator can only finish once
286 // `num_results` closest peers have responded (or there are no more
287 // peers to contact, see `num_waiting`).
288 let mut result_counter = Some(0);
289
290 // Check if the iterator is at capacity w.r.t. the allowed parallelism.
291 let at_capacity = self.at_capacity();
292
293 for peer in self.closest_peers.values_mut() {
294 match peer.state {
295 PeerState::Waiting(timeout) => {
296 if now >= timeout {
297 // Unresponsive peers no longer count towards the limit for the
298 // bounded parallelism, though they might still be ongoing and
299 // their results can still be delivered to the iterator.
300 debug_assert!(self.num_waiting > 0);
301 self.num_waiting -= 1;
302 peer.state = PeerState::Unresponsive
303 } else if at_capacity {
304 // The iterator is still waiting for a result from a peer and is
305 // at capacity w.r.t. the maximum number of peers being waited on.
306 return PeersIterState::WaitingAtCapacity;
307 } else {
308 // The iterator is still waiting for a result from a peer and the
309 // `result_counter` did not yet reach `num_results`. Therefore
310 // the iterator is not yet done, regardless of already successful
311 // queries to peers farther from the target.
312 result_counter = None;
313 }
314 }
315
316 PeerState::Succeeded => {
317 if let Some(ref mut cnt) = result_counter {
318 *cnt += 1;
319 // If `num_results` successful results have been delivered for the
320 // closest peers, the iterator is done.
321 if *cnt >= self.config.num_results.get() {
322 self.state = State::Finished;
323 return PeersIterState::Finished;
324 }
325 }
326 }
327
328 PeerState::NotContacted => {
329 if !at_capacity {
330 let timeout = now + self.config.peer_timeout;
331 peer.state = PeerState::Waiting(timeout);
332 self.num_waiting += 1;
333 return PeersIterState::Waiting(Some(Cow::Borrowed(peer.key.preimage())));
334 } else {
335 return PeersIterState::WaitingAtCapacity;
336 }
337 }
338
339 PeerState::Unresponsive | PeerState::Failed => {
340 // Skip over unresponsive or failed peers.
341 }
342 }
343 }
344
345 if self.num_waiting > 0 {
346 // The iterator is still waiting for results and not at capacity w.r.t.
347 // the allowed parallelism, but there are no new peers to contact
348 // at the moment.
349 PeersIterState::Waiting(None)
350 } else {
351 // The iterator is finished because all available peers have been contacted
352 // and the iterator is not waiting for any more results.
353 self.state = State::Finished;
354 PeersIterState::Finished
355 }
356 }
357
358 /// Immediately transitions the iterator to [`PeersIterState::Finished`].
359 pub fn finish(&mut self) {
360 self.state = State::Finished
361 }
362
363 /// Checks whether the iterator has finished.
364 pub fn is_finished(&self) -> bool {
365 self.state == State::Finished
366 }
367
368 /// Consumes the iterator, returning the closest peers.
369 pub fn into_result(self) -> impl Iterator<Item = PeerId> {
370 self.closest_peers
371 .into_iter()
372 .filter_map(|(_, peer)| {
373 if let PeerState::Succeeded = peer.state {
374 Some(peer.key.into_preimage())
375 } else {
376 None
377 }
378 })
379 .take(self.config.num_results.get())
380 }
381
382 /// Checks if the iterator is at capacity w.r.t. the permitted parallelism.
383 ///
384 /// While the iterator is stalled, up to `num_results` parallel requests
385 /// are allowed. This is a slightly more permissive variant of the
386 /// requirement that the initiator "resends the FIND_NODE to all of the
387 /// k closest nodes it has not already queried".
388 fn at_capacity(&self) -> bool {
389 match self.state {
390 State::Stalled => {
391 self.num_waiting
392 >= usize::max(self.config.num_results.get(), self.config.parallelism.get())
393 }
394 State::Iterating { .. } => self.num_waiting >= self.config.parallelism.get(),
395 State::Finished => true,
396 }
397 }
398}
399
400////////////////////////////////////////////////////////////////////////////////
401// Private state
402
403/// Internal state of the iterator.
404#[derive(Debug, PartialEq, Eq, Copy, Clone)]
405enum State {
406 /// The iterator is making progress by iterating towards `num_results` closest
407 /// peers to the target with a maximum of `parallelism` peers for which the
408 /// iterator is waiting for results at a time.
409 ///
410 /// > **Note**: When the iterator switches back to `Iterating` after being
411 /// > `Stalled`, it may temporarily be waiting for more than `parallelism`
412 /// > results from peers, with new peers only being considered once
413 /// > the number pending results drops below `parallelism`.
414 Iterating {
415 /// The number of consecutive results that did not yield a peer closer
416 /// to the target. When this number reaches `parallelism` and no new
417 /// peer was discovered or at least `num_results` peers are known to
418 /// the iterator, it is considered `Stalled`.
419 no_progress: usize,
420 },
421
422 /// A iterator is stalled when it did not make progress after `parallelism`
423 /// consecutive successful results (see `on_success`).
424 ///
425 /// While the iterator is stalled, the maximum allowed parallelism for pending
426 /// results is increased to `num_results` in an attempt to finish the iterator.
427 /// If the iterator can make progress again upon receiving the remaining
428 /// results, it switches back to `Iterating`. Otherwise it will be finished.
429 Stalled,
430
431 /// The iterator is finished.
432 ///
433 /// A iterator finishes either when it has collected `num_results` results
434 /// from the closest peers (not counting those that failed or are unresponsive)
435 /// or because the iterator ran out of peers that have not yet delivered
436 /// results (or failed).
437 Finished,
438}
439
440/// Representation of a peer in the context of a iterator.
441#[derive(Debug, Clone)]
442struct Peer {
443 key: Key<PeerId>,
444 state: PeerState,
445}
446
447/// The state of a single `Peer`.
448#[derive(Debug, Copy, Clone)]
449enum PeerState {
450 /// The peer has not yet been contacted.
451 ///
452 /// This is the starting state for every peer.
453 NotContacted,
454
455 /// The iterator is waiting for a result from the peer.
456 Waiting(Instant),
457
458 /// A result was not delivered for the peer within the configured timeout.
459 ///
460 /// The peer is not taken into account for the termination conditions
461 /// of the iterator until and unless it responds.
462 Unresponsive,
463
464 /// Obtaining a result from the peer has failed.
465 ///
466 /// This is a final state, reached as a result of a call to `on_failure`.
467 Failed,
468
469 /// A successful result from the peer has been delivered.
470 ///
471 /// This is a final state, reached as a result of a call to `on_success`.
472 Succeeded,
473}
474
475#[cfg(test)]
476mod tests {
477 use super::*;
478 use crate::SHA_256_MH;
479 use libp2p_core::multihash::Multihash;
480 use libp2p_identity::PeerId;
481 use quickcheck::*;
482 use rand::{rngs::StdRng, Rng, SeedableRng};
483 use std::{iter, time::Duration};
484
485 fn random_peers<R: Rng>(n: usize, g: &mut R) -> Vec<PeerId> {
486 (0..n)
487 .map(|_| {
488 PeerId::from_multihash(Multihash::wrap(SHA_256_MH, &g.gen::<[u8; 32]>()).unwrap())
489 .unwrap()
490 })
491 .collect()
492 }
493
494 fn sorted<T: AsRef<KeyBytes>>(target: &T, peers: &[Key<PeerId>]) -> bool {
495 peers
496 .windows(2)
497 .all(|w| w[0].distance(&target) < w[1].distance(&target))
498 }
499
500 #[derive(Clone, Debug)]
501 struct ArbitraryPeerId(PeerId);
502
503 impl Arbitrary for ArbitraryPeerId {
504 fn arbitrary(g: &mut Gen) -> ArbitraryPeerId {
505 let hash: [u8; 32] = core::array::from_fn(|_| u8::arbitrary(g));
506 let peer_id =
507 PeerId::from_multihash(Multihash::wrap(SHA_256_MH, &hash).unwrap()).unwrap();
508 ArbitraryPeerId(peer_id)
509 }
510 }
511
512 impl Arbitrary for ClosestPeersIter {
513 fn arbitrary(g: &mut Gen) -> ClosestPeersIter {
514 let known_closest_peers = (0..g.gen_range(1..60u8))
515 .map(|_| Key::from(ArbitraryPeerId::arbitrary(g).0))
516 .collect::<Vec<_>>();
517 let target = Key::from(ArbitraryPeerId::arbitrary(g).0);
518 let config = ClosestPeersIterConfig {
519 parallelism: NonZeroUsize::new(g.gen_range(1..10)).unwrap(),
520 num_results: NonZeroUsize::new(g.gen_range(1..25)).unwrap(),
521 peer_timeout: Duration::from_secs(g.gen_range(10..30)),
522 };
523 ClosestPeersIter::with_config(config, target, known_closest_peers)
524 }
525 }
526
527 #[derive(Clone, Debug)]
528 struct Seed([u8; 32]);
529
530 impl Arbitrary for Seed {
531 fn arbitrary(g: &mut Gen) -> Seed {
532 let seed = core::array::from_fn(|_| u8::arbitrary(g));
533 Seed(seed)
534 }
535 }
536
537 #[test]
538 fn new_iter() {
539 fn prop(iter: ClosestPeersIter) {
540 let target = iter.target.clone();
541
542 let (keys, states): (Vec<_>, Vec<_>) = iter
543 .closest_peers
544 .values()
545 .map(|e| (e.key.clone(), &e.state))
546 .unzip();
547
548 let none_contacted = states.iter().all(|s| matches!(s, PeerState::NotContacted));
549
550 assert!(none_contacted, "Unexpected peer state in new iterator.");
551 assert!(
552 sorted(&target, &keys),
553 "Closest peers in new iterator not sorted by distance to target."
554 );
555 assert_eq!(
556 iter.num_waiting(),
557 0,
558 "Unexpected peers in progress in new iterator."
559 );
560 assert_eq!(
561 iter.into_result().count(),
562 0,
563 "Unexpected closest peers in new iterator"
564 );
565 }
566
567 QuickCheck::new().tests(10).quickcheck(prop as fn(_) -> _)
568 }
569
570 #[test]
571 fn termination_and_parallelism() {
572 fn prop(mut iter: ClosestPeersIter, seed: Seed) {
573 let now = Instant::now();
574 let mut rng = StdRng::from_seed(seed.0);
575
576 let mut expected = iter
577 .closest_peers
578 .values()
579 .map(|e| e.key.clone())
580 .collect::<Vec<_>>();
581 let num_known = expected.len();
582 let max_parallelism = usize::min(iter.config.parallelism.get(), num_known);
583
584 let target = iter.target.clone();
585 let mut remaining;
586 let mut num_failures = 0;
587
588 'finished: loop {
589 if expected.is_empty() {
590 break;
591 }
592 // Split off the next up to `parallelism` expected peers.
593 else if expected.len() < max_parallelism {
594 remaining = Vec::new();
595 } else {
596 remaining = expected.split_off(max_parallelism);
597 }
598
599 // Advance for maximum parallelism.
600 for k in expected.iter() {
601 match iter.next(now) {
602 PeersIterState::Finished => break 'finished,
603 PeersIterState::Waiting(Some(p)) => assert_eq!(&*p, k.preimage()),
604 PeersIterState::Waiting(None) => panic!("Expected another peer."),
605 PeersIterState::WaitingAtCapacity => {
606 panic!("Unexpectedly reached capacity.")
607 }
608 }
609 }
610 let num_waiting = iter.num_waiting();
611 assert_eq!(num_waiting, expected.len());
612
613 // Check the bounded parallelism.
614 if iter.at_capacity() {
615 assert_eq!(iter.next(now), PeersIterState::WaitingAtCapacity)
616 }
617
618 // Report results back to the iterator with a random number of "closer"
619 // peers or an error, thus finishing the "in-flight requests".
620 for (i, k) in expected.iter().enumerate() {
621 if rng.gen_bool(0.75) {
622 let num_closer = rng.gen_range(0..iter.config.num_results.get() + 1);
623 let closer_peers = random_peers(num_closer, &mut rng);
624 remaining.extend(closer_peers.iter().cloned().map(Key::from));
625 iter.on_success(k.preimage(), closer_peers);
626 } else {
627 num_failures += 1;
628 iter.on_failure(k.preimage());
629 }
630 assert_eq!(iter.num_waiting(), num_waiting - (i + 1));
631 }
632
633 // Re-sort the remaining expected peers for the next "round".
634 remaining.sort_by_key(|k| target.distance(&k));
635
636 expected = remaining
637 }
638
639 // The iterator must be finished.
640 assert_eq!(iter.next(now), PeersIterState::Finished);
641 assert_eq!(iter.state, State::Finished);
642
643 // Determine if all peers have been contacted by the iterator. This _must_ be
644 // the case if the iterator finished with fewer than the requested number
645 // of results.
646 let all_contacted = iter
647 .closest_peers
648 .values()
649 .all(|e| !matches!(e.state, PeerState::NotContacted | PeerState::Waiting { .. }));
650
651 let target = iter.target.clone();
652 let num_results = iter.config.num_results;
653 let result = iter.into_result();
654 let closest = result.map(Key::from).collect::<Vec<_>>();
655
656 assert!(sorted(&target, &closest));
657
658 if closest.len() < num_results.get() {
659 // The iterator returned fewer results than requested. Therefore
660 // either the initial number of known peers must have been
661 // less than the desired number of results, or there must
662 // have been failures.
663 assert!(num_known < num_results.get() || num_failures > 0);
664 // All peers must have been contacted.
665 assert!(all_contacted, "Not all peers have been contacted.");
666 } else {
667 assert_eq!(num_results.get(), closest.len(), "Too many results.");
668 }
669 }
670
671 QuickCheck::new()
672 .tests(10)
673 .quickcheck(prop as fn(_, _) -> _)
674 }
675
676 #[test]
677 fn no_duplicates() {
678 fn prop(mut iter: ClosestPeersIter, closer: ArbitraryPeerId) -> bool {
679 let now = Instant::now();
680
681 let closer = vec![closer.0];
682
683 // A first peer reports a "closer" peer.
684 let peer1 = match iter.next(now) {
685 PeersIterState::Waiting(Some(p)) => p.into_owned(),
686 _ => panic!("No peer."),
687 };
688 iter.on_success(&peer1, closer.clone());
689 // Duplicate result from te same peer.
690 iter.on_success(&peer1, closer.clone());
691
692 // If there is a second peer, let it also report the same "closer" peer.
693 match iter.next(now) {
694 PeersIterState::Waiting(Some(p)) => {
695 let peer2 = p.into_owned();
696 assert!(iter.on_success(&peer2, closer.clone()))
697 }
698 PeersIterState::Finished => {}
699 _ => panic!("Unexpectedly iter state."),
700 };
701
702 // The "closer" peer must only be in the iterator once.
703 let n = iter
704 .closest_peers
705 .values()
706 .filter(|e| e.key.preimage() == &closer[0])
707 .count();
708 assert_eq!(n, 1);
709
710 true
711 }
712
713 QuickCheck::new()
714 .tests(10)
715 .quickcheck(prop as fn(_, _) -> _)
716 }
717
718 #[test]
719 fn timeout() {
720 fn prop(mut iter: ClosestPeersIter) -> bool {
721 let mut now = Instant::now();
722 let peer = iter
723 .closest_peers
724 .values()
725 .next()
726 .unwrap()
727 .key
728 .clone()
729 .into_preimage();
730
731 // Poll the iterator for the first peer to be in progress.
732 match iter.next(now) {
733 PeersIterState::Waiting(Some(id)) => assert_eq!(&*id, &peer),
734 _ => panic!(),
735 }
736
737 // Artificially advance the clock.
738 now += iter.config.peer_timeout;
739
740 // Advancing the iterator again should mark the first peer as unresponsive.
741 let _ = iter.next(now);
742 match &iter.closest_peers.values().next().unwrap() {
743 Peer {
744 key,
745 state: PeerState::Unresponsive,
746 } => {
747 assert_eq!(key.preimage(), &peer);
748 }
749 Peer { state, .. } => panic!("Unexpected peer state: {state:?}"),
750 }
751
752 let finished = iter.is_finished();
753 iter.on_success(&peer, iter::empty());
754 let closest = iter.into_result().collect::<Vec<_>>();
755
756 if finished {
757 // Delivering results when the iterator already finished must have
758 // no effect.
759 assert_eq!(Vec::<PeerId>::new(), closest)
760 } else {
761 // Unresponsive peers can still deliver results while the iterator
762 // is not finished.
763 assert_eq!(vec![peer], closest)
764 }
765 true
766 }
767
768 QuickCheck::new().tests(10).quickcheck(prop as fn(_) -> _)
769 }
770
771 #[test]
772 fn without_success_try_up_to_k_peers() {
773 fn prop(mut iter: ClosestPeersIter) {
774 let now = Instant::now();
775
776 for _ in 0..(usize::min(iter.closest_peers.len(), K_VALUE.get())) {
777 match iter.next(now) {
778 PeersIterState::Waiting(Some(p)) => {
779 let peer = p.clone().into_owned();
780 iter.on_failure(&peer);
781 }
782 _ => panic!("Expected iterator to yield another peer to query."),
783 }
784 }
785
786 assert_eq!(PeersIterState::Finished, iter.next(now));
787 }
788
789 QuickCheck::new().tests(10).quickcheck(prop as fn(_))
790 }
791
792 #[test]
793 fn stalled_at_capacity() {
794 fn prop(mut iter: ClosestPeersIter) {
795 iter.state = State::Stalled;
796
797 for i in 0..usize::max(iter.config.parallelism.get(), iter.config.num_results.get()) {
798 iter.num_waiting = i;
799 assert!(
800 !iter.at_capacity(),
801 "Iterator should not be at capacity if less than \
802 `max(parallelism, num_results)` requests are waiting.",
803 )
804 }
805
806 iter.num_waiting =
807 usize::max(iter.config.parallelism.get(), iter.config.num_results.get());
808 assert!(
809 iter.at_capacity(),
810 "Iterator should be at capacity if `max(parallelism, num_results)` requests are \
811 waiting.",
812 )
813 }
814
815 QuickCheck::new().tests(10).quickcheck(prop as fn(_))
816 }
817}