1use super::*;
22use crate::kbucket::{Key, KeyBytes};
23use instant::Instant;
24use libp2p_identity::PeerId;
25use std::{
26 collections::HashMap,
27 iter::{Cycle, Map, Peekable},
28 ops::{Index, IndexMut, Range},
29};
30
31pub(crate) struct ClosestDisjointPeersIter {
34 target: KeyBytes,
35
36 iters: Vec<ClosestPeersIter>,
38 iter_order: Cycle<Map<Range<usize>, fn(usize) -> IteratorIndex>>,
41
42 contacted_peers: HashMap<PeerId, PeerState>,
49}
50
51impl ClosestDisjointPeersIter {
52 #[cfg(test)]
54 pub(crate) fn new<I>(target: KeyBytes, known_closest_peers: I) -> Self
55 where
56 I: IntoIterator<Item = Key<PeerId>>,
57 {
58 Self::with_config(
59 ClosestPeersIterConfig::default(),
60 target,
61 known_closest_peers,
62 )
63 }
64
65 pub(crate) fn with_config<I, T>(
67 config: ClosestPeersIterConfig,
68 target: T,
69 known_closest_peers: I,
70 ) -> Self
71 where
72 I: IntoIterator<Item = Key<PeerId>>,
73 T: Into<KeyBytes> + Clone,
74 {
75 let peers = known_closest_peers
76 .into_iter()
77 .take(K_VALUE.get())
78 .collect::<Vec<_>>();
79 let iters = (0..config.parallelism.get())
80 .map(|_| ClosestPeersIter::with_config(config.clone(), target.clone(), peers.clone()))
86 .collect::<Vec<_>>();
87
88 let iters_len = iters.len();
89
90 ClosestDisjointPeersIter {
91 target: target.into(),
92 iters,
93 iter_order: (0..iters_len)
94 .map(IteratorIndex as fn(usize) -> IteratorIndex)
95 .cycle(),
96 contacted_peers: HashMap::new(),
97 }
98 }
99
100 pub(crate) fn on_failure(&mut self, peer: &PeerId) -> bool {
111 let mut updated = false;
112
113 if let Some(PeerState {
114 initiated_by,
115 response,
116 }) = self.contacted_peers.get_mut(peer)
117 {
118 updated = self.iters[*initiated_by].on_failure(peer);
119
120 if updated {
121 *response = ResponseState::Failed;
122 }
123
124 for (i, iter) in &mut self.iters.iter_mut().enumerate() {
125 if IteratorIndex(i) != *initiated_by {
126 iter.on_failure(peer);
129 }
130 }
131 }
132
133 updated
134 }
135
136 pub(crate) fn on_success<I>(&mut self, peer: &PeerId, closer_peers: I) -> bool
154 where
155 I: IntoIterator<Item = PeerId>,
156 {
157 let mut updated = false;
158
159 if let Some(PeerState {
160 initiated_by,
161 response,
162 }) = self.contacted_peers.get_mut(peer)
163 {
164 updated = self.iters[*initiated_by].on_success(peer, closer_peers);
167
168 if updated {
169 *response = ResponseState::Succeeded;
174 }
175
176 for (i, iter) in &mut self.iters.iter_mut().enumerate() {
177 if IteratorIndex(i) != *initiated_by {
178 iter.on_success(peer, std::iter::empty());
185 }
186 }
187 }
188
189 updated
190 }
191
192 pub(crate) fn next(&mut self, now: Instant) -> PeersIterState<'_> {
193 let mut state = None;
194
195 for _ in 0..self.iters.len() {
197 let i = self.iter_order.next().expect("Cycle never ends.");
198 let iter = &mut self.iters[i];
199
200 loop {
201 match iter.next(now) {
202 PeersIterState::Waiting(None) => {
203 match state {
204 Some(PeersIterState::Waiting(Some(_))) => {
205 unreachable!();
209 }
210 Some(PeersIterState::Waiting(None)) => {}
211 Some(PeersIterState::WaitingAtCapacity) => {
212 state = Some(PeersIterState::Waiting(None))
215 }
216 Some(PeersIterState::Finished) => {
217 unreachable!();
219 }
220 None => state = Some(PeersIterState::Waiting(None)),
221 };
222
223 break;
224 }
225 PeersIterState::Waiting(Some(peer)) => {
226 match self.contacted_peers.get_mut(&*peer) {
227 Some(PeerState { response, .. }) => {
228 let peer = peer.into_owned();
230
231 match response {
232 ResponseState::Waiting => {}
236 ResponseState::Succeeded => {
237 iter.on_success(&peer, std::iter::empty());
242 }
243 ResponseState::Failed => {
244 iter.on_failure(&peer);
245 }
246 }
247 }
248 None => {
249 self.contacted_peers
251 .insert(peer.clone().into_owned(), PeerState::new(i));
252 return PeersIterState::Waiting(Some(Cow::Owned(
253 peer.into_owned(),
254 )));
255 }
256 }
257 }
258 PeersIterState::WaitingAtCapacity => {
259 match state {
260 Some(PeersIterState::Waiting(Some(_))) => {
261 unreachable!();
265 }
266 Some(PeersIterState::Waiting(None)) => {}
267 Some(PeersIterState::WaitingAtCapacity) => {}
268 Some(PeersIterState::Finished) => {
269 unreachable!();
271 }
272 None => state = Some(PeersIterState::WaitingAtCapacity),
273 };
274
275 break;
276 }
277 PeersIterState::Finished => break,
278 }
279 }
280 }
281
282 state.unwrap_or(PeersIterState::Finished)
283 }
284
285 pub(crate) fn finish_paths<'a, I>(&mut self, peers: I) -> bool
289 where
290 I: IntoIterator<Item = &'a PeerId>,
291 {
292 for peer in peers {
293 if let Some(PeerState { initiated_by, .. }) = self.contacted_peers.get_mut(peer) {
294 self.iters[*initiated_by].finish();
295 }
296 }
297
298 self.is_finished()
299 }
300
301 pub(crate) fn finish(&mut self) {
303 for iter in &mut self.iters {
304 iter.finish();
305 }
306 }
307
308 pub(crate) fn is_finished(&self) -> bool {
310 self.iters.iter().all(|i| i.is_finished())
311 }
312
313 pub(crate) fn into_result(self) -> impl Iterator<Item = PeerId> {
322 let result_per_path = self
323 .iters
324 .into_iter()
325 .map(|iter| iter.into_result().map(Key::from));
326
327 ResultIter::new(self.target, result_per_path).map(Key::into_preimage)
328 }
329}
330
331#[derive(Debug, Clone, Copy, PartialEq, Eq)]
333struct IteratorIndex(usize);
334
335impl Index<IteratorIndex> for Vec<ClosestPeersIter> {
336 type Output = ClosestPeersIter;
337
338 fn index(&self, index: IteratorIndex) -> &Self::Output {
339 &self[index.0]
340 }
341}
342
343impl IndexMut<IteratorIndex> for Vec<ClosestPeersIter> {
344 fn index_mut(&mut self, index: IteratorIndex) -> &mut Self::Output {
345 &mut self[index.0]
346 }
347}
348
349#[derive(Debug, PartialEq, Eq)]
352struct PeerState {
353 initiated_by: IteratorIndex,
356 response: ResponseState,
359}
360
361impl PeerState {
362 fn new(initiated_by: IteratorIndex) -> Self {
363 PeerState {
364 initiated_by,
365 response: ResponseState::Waiting,
366 }
367 }
368}
369
370#[derive(Debug, PartialEq, Eq)]
371enum ResponseState {
372 Waiting,
373 Succeeded,
374 Failed,
375}
376
377#[derive(Clone, Debug)]
382struct ResultIter<I>
383where
384 I: Iterator<Item = Key<PeerId>>,
385{
386 target: KeyBytes,
387 iters: Vec<Peekable<I>>,
388}
389
390impl<I: Iterator<Item = Key<PeerId>>> ResultIter<I> {
391 fn new(target: KeyBytes, iters: impl Iterator<Item = I>) -> Self {
392 ResultIter {
393 target,
394 iters: iters.map(Iterator::peekable).collect(),
395 }
396 }
397}
398
399impl<I: Iterator<Item = Key<PeerId>>> Iterator for ResultIter<I> {
400 type Item = I::Item;
401
402 fn next(&mut self) -> Option<Self::Item> {
403 let target = &self.target;
404
405 self.iters
406 .iter_mut()
407 .fold(Option::<&mut Peekable<_>>::None, |iter_a, iter_b| {
409 let Some(iter_a) = iter_a else {
410 return Some(iter_b);
411 };
412
413 match (iter_a.peek(), iter_b.peek()) {
414 (Some(next_a), Some(next_b)) => {
415 if next_a == next_b {
416 iter_b.next();
418 return Some(iter_a);
419 }
420
421 if target.distance(next_a) < target.distance(next_b) {
422 Some(iter_a)
423 } else {
424 Some(iter_b)
425 }
426 }
427 (Some(_), None) => Some(iter_a),
428 (None, Some(_)) => Some(iter_b),
429 (None, None) => None,
430 }
431 })
432 .and_then(Iterator::next)
434 }
435}
436
437#[cfg(test)]
438mod tests {
439 use super::*;
440
441 use crate::{K_VALUE, SHA_256_MH};
442 use libp2p_core::multihash::Multihash;
443 use quickcheck::*;
444 use std::collections::HashSet;
445 use std::iter;
446
447 impl Arbitrary for ResultIter<std::vec::IntoIter<Key<PeerId>>> {
448 fn arbitrary(g: &mut Gen) -> Self {
449 let target = Target::arbitrary(g).0;
450 let num_closest_iters = g.gen_range(0..20 + 1);
451 let peers = random_peers(g.gen_range(0..20 * num_closest_iters + 1), g);
452
453 let iters = (0..num_closest_iters).map(|_| {
454 let num_peers = g.gen_range(0..20 + 1);
455 let mut peers = g
456 .choose_multiple(&peers, num_peers)
457 .cloned()
458 .map(Key::from)
459 .collect::<Vec<_>>();
460
461 peers.sort_unstable_by_key(|a| target.distance(a));
462
463 peers.into_iter()
464 });
465
466 ResultIter::new(target.clone(), iters)
467 }
468
469 fn shrink(&self) -> Box<dyn Iterator<Item = Self>> {
470 let peers = self
471 .iters
472 .clone()
473 .into_iter()
474 .flatten()
475 .collect::<HashSet<_>>()
476 .into_iter()
477 .collect::<Vec<_>>();
478
479 let iters = self
480 .iters
481 .clone()
482 .into_iter()
483 .map(|iter| iter.collect::<Vec<_>>())
484 .collect();
485
486 Box::new(ResultIterShrinker {
487 target: self.target.clone(),
488 peers,
489 iters,
490 })
491 }
492 }
493
494 struct ResultIterShrinker {
495 target: KeyBytes,
496 peers: Vec<Key<PeerId>>,
497 iters: Vec<Vec<Key<PeerId>>>,
498 }
499
500 impl Iterator for ResultIterShrinker {
501 type Item = ResultIter<std::vec::IntoIter<Key<PeerId>>>;
502
503 fn next(&mut self) -> Option<Self::Item> {
506 let peer = self.peers.pop()?;
508
509 let iters = self.iters.clone().into_iter().filter_map(|mut iter| {
510 iter.retain(|p| p != &peer);
511 if iter.is_empty() {
512 return None;
513 }
514 Some(iter.into_iter())
515 });
516
517 Some(ResultIter::new(self.target.clone(), iters))
518 }
519 }
520
521 #[derive(Clone, Debug)]
522 struct ArbitraryPeerId(PeerId);
523
524 impl Arbitrary for ArbitraryPeerId {
525 fn arbitrary(g: &mut Gen) -> ArbitraryPeerId {
526 let hash: [u8; 32] = core::array::from_fn(|_| u8::arbitrary(g));
527 let peer_id =
528 PeerId::from_multihash(Multihash::wrap(SHA_256_MH, &hash).unwrap()).unwrap();
529 ArbitraryPeerId(peer_id)
530 }
531 }
532
533 #[derive(Clone, Debug)]
534 struct Target(KeyBytes);
535
536 impl Arbitrary for Target {
537 fn arbitrary(g: &mut Gen) -> Self {
538 let peer_id = ArbitraryPeerId::arbitrary(g).0;
539 Target(Key::from(peer_id).into())
540 }
541 }
542
543 fn random_peers(n: usize, g: &mut Gen) -> Vec<PeerId> {
544 (0..n).map(|_| ArbitraryPeerId::arbitrary(g).0).collect()
545 }
546
547 #[test]
548 fn result_iter_returns_deduplicated_ordered_peer_id_stream() {
549 fn prop(result_iter: ResultIter<std::vec::IntoIter<Key<PeerId>>>) {
550 let expected = {
551 let mut deduplicated = result_iter
552 .clone()
553 .iters
554 .into_iter()
555 .flatten()
556 .collect::<HashSet<_>>()
557 .into_iter()
558 .map(Key::from)
559 .collect::<Vec<_>>();
560
561 deduplicated.sort_unstable_by(|a, b| {
562 result_iter
563 .target
564 .distance(a)
565 .cmp(&result_iter.target.distance(b))
566 });
567
568 deduplicated
569 };
570
571 assert_eq!(expected, result_iter.collect::<Vec<_>>());
572 }
573
574 QuickCheck::new().quickcheck(prop as fn(_))
575 }
576
577 #[derive(Debug, Clone)]
578 struct Parallelism(NonZeroUsize);
579
580 impl Arbitrary for Parallelism {
581 fn arbitrary(g: &mut Gen) -> Self {
582 Parallelism(NonZeroUsize::new(g.gen_range(1..10)).unwrap())
583 }
584 }
585
586 #[derive(Debug, Clone)]
587 struct NumResults(NonZeroUsize);
588
589 impl Arbitrary for NumResults {
590 fn arbitrary(g: &mut Gen) -> Self {
591 NumResults(NonZeroUsize::new(g.gen_range(1..K_VALUE.get())).unwrap())
592 }
593 }
594
595 impl Arbitrary for ClosestPeersIterConfig {
596 fn arbitrary(g: &mut Gen) -> Self {
597 ClosestPeersIterConfig {
598 parallelism: Parallelism::arbitrary(g).0,
599 num_results: NumResults::arbitrary(g).0,
600 peer_timeout: Duration::from_secs(1),
601 }
602 }
603 }
604
605 #[derive(Debug, Clone)]
606 struct PeerVec(Vec<Key<PeerId>>);
607
608 impl Arbitrary for PeerVec {
609 fn arbitrary(g: &mut Gen) -> Self {
610 PeerVec(
611 (0..g.gen_range(1..60u8))
612 .map(|_| ArbitraryPeerId::arbitrary(g).0)
613 .map(Key::from)
614 .collect(),
615 )
616 }
617 }
618
619 #[test]
620 fn s_kademlia_disjoint_paths() {
621 let now = Instant::now();
622 let target: KeyBytes = Key::from(PeerId::random()).into();
623
624 let mut pool = [0; 12]
625 .iter()
626 .map(|_| Key::from(PeerId::random()))
627 .collect::<Vec<_>>();
628
629 pool.sort_unstable_by_key(|a| target.distance(a));
630
631 let known_closest_peers = pool.split_off(pool.len() - 3);
632
633 let config = ClosestPeersIterConfig {
634 parallelism: NonZeroUsize::new(3).unwrap(),
635 num_results: NonZeroUsize::new(3).unwrap(),
636 ..ClosestPeersIterConfig::default()
637 };
638
639 let mut peers_iter =
640 ClosestDisjointPeersIter::with_config(config, target, known_closest_peers.clone());
641
642 for _ in 0..3 {
646 if let PeersIterState::Waiting(Some(Cow::Owned(peer))) = peers_iter.next(now) {
647 assert!(known_closest_peers.contains(&Key::from(peer)));
648 } else {
649 panic!("Expected iterator to return peer to query.");
650 }
651 }
652
653 assert_eq!(PeersIterState::WaitingAtCapacity, peers_iter.next(now),);
654
655 let response_2 = pool.split_off(pool.len() - 3);
656 let response_3 = pool.split_off(pool.len() - 3);
657 let malicious_response_1 = pool.split_off(pool.len() - 3);
660
661 peers_iter.on_success(
663 known_closest_peers[0].preimage(),
664 malicious_response_1
665 .clone()
666 .into_iter()
667 .map(|k| *k.preimage()),
668 );
669
670 peers_iter.on_success(
672 known_closest_peers[1].preimage(),
673 response_2.clone().into_iter().map(|k| *k.preimage()),
674 );
675
676 peers_iter.on_success(
678 known_closest_peers[2].preimage(),
679 response_3.clone().into_iter().map(|k| *k.preimage()),
680 );
681
682 let mut next_to_query = vec![];
686 for _ in 0..3 {
687 if let PeersIterState::Waiting(Some(Cow::Owned(peer))) = peers_iter.next(now) {
688 next_to_query.push(peer)
689 } else {
690 panic!("Expected iterator to return peer to query.");
691 }
692 }
693
694 assert!(next_to_query.contains(malicious_response_1[0].preimage()));
696 assert!(next_to_query.contains(response_2[0].preimage()));
697 assert!(next_to_query.contains(response_3[0].preimage()));
698
699 for peer in next_to_query {
700 peers_iter.on_success(&peer, vec![]);
701 }
702
703 for _ in 0..6 {
705 if let PeersIterState::Waiting(Some(Cow::Owned(peer))) = peers_iter.next(now) {
706 peers_iter.on_success(&peer, vec![]);
707 } else {
708 panic!("Expected iterator to return peer to query.");
709 }
710 }
711
712 assert_eq!(PeersIterState::Finished, peers_iter.next(now),);
713
714 let final_peers: Vec<_> = peers_iter.into_result().collect();
715
716 assert!(final_peers.contains(malicious_response_1[0].preimage()));
719 assert!(final_peers.contains(response_2[0].preimage()));
720 assert!(final_peers.contains(response_3[0].preimage()));
721 }
722
723 #[derive(Clone)]
724 struct Graph(HashMap<PeerId, Peer>);
725
726 impl std::fmt::Debug for Graph {
727 fn fmt(&self, fmt: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
728 fmt.debug_list().entries(self.0.keys()).finish()
729 }
730 }
731
732 impl Arbitrary for Graph {
733 fn arbitrary(g: &mut Gen) -> Self {
734 let mut peer_ids = random_peers(g.gen_range(K_VALUE.get()..200), g)
735 .into_iter()
736 .map(|peer_id| (peer_id, Key::from(peer_id)))
737 .collect::<Vec<_>>();
738
739 let mut peers = peer_ids
741 .clone()
742 .into_iter()
743 .map(|(peer_id, key)| {
744 peer_ids
745 .sort_unstable_by(|(_, a), (_, b)| key.distance(a).cmp(&key.distance(b)));
746
747 assert_eq!(peer_id, peer_ids[0].0);
748
749 let known_peers = peer_ids
750 .iter()
751 .skip(1)
753 .take(K_VALUE.get())
754 .cloned()
755 .collect::<Vec<_>>();
756
757 (peer_id, Peer { known_peers })
758 })
759 .collect::<HashMap<_, _>>();
760
761 for (peer_id, peer) in peers.iter_mut() {
763 g.shuffle(&mut peer_ids);
764
765 let num_peers = g.gen_range(K_VALUE.get()..peer_ids.len() + 1);
766 let mut random_peer_ids = g
767 .choose_multiple(&peer_ids, num_peers)
768 .filter(|(id, _)| peer_id != id)
770 .cloned()
771 .collect::<Vec<_>>();
772
773 peer.known_peers.append(&mut random_peer_ids);
774 peer.known_peers = std::mem::take(&mut peer.known_peers)
775 .into_iter()
777 .collect::<HashSet<_>>()
778 .into_iter()
779 .collect();
780 }
781
782 Graph(peers)
783 }
784 }
785
786 impl Graph {
787 fn get_closest_peer(&self, target: &KeyBytes) -> PeerId {
788 *self
789 .0
790 .keys()
791 .map(|peer_id| (target.distance(&Key::from(*peer_id)), peer_id))
792 .fold(None, |acc, (distance_b, peer_id_b)| match acc {
793 None => Some((distance_b, peer_id_b)),
794 Some((distance_a, peer_id_a)) => {
795 if distance_a < distance_b {
796 Some((distance_a, peer_id_a))
797 } else {
798 Some((distance_b, peer_id_b))
799 }
800 }
801 })
802 .expect("Graph to have at least one peer.")
803 .1
804 }
805 }
806
807 #[derive(Debug, Clone)]
808 struct Peer {
809 known_peers: Vec<(PeerId, Key<PeerId>)>,
810 }
811
812 impl Peer {
813 fn get_closest_peers(&mut self, target: &KeyBytes) -> Vec<PeerId> {
814 self.known_peers
815 .sort_unstable_by(|(_, a), (_, b)| target.distance(a).cmp(&target.distance(b)));
816
817 self.known_peers
818 .iter()
819 .take(K_VALUE.get())
820 .map(|(id, _)| id)
821 .cloned()
822 .collect()
823 }
824 }
825
826 enum PeerIterator {
827 Disjoint(ClosestDisjointPeersIter),
828 Closest(ClosestPeersIter),
829 }
830
831 impl PeerIterator {
832 fn next(&mut self, now: Instant) -> PeersIterState<'_> {
833 match self {
834 PeerIterator::Disjoint(iter) => iter.next(now),
835 PeerIterator::Closest(iter) => iter.next(now),
836 }
837 }
838
839 fn on_success(&mut self, peer: &PeerId, closer_peers: Vec<PeerId>) {
840 match self {
841 PeerIterator::Disjoint(iter) => iter.on_success(peer, closer_peers),
842 PeerIterator::Closest(iter) => iter.on_success(peer, closer_peers),
843 };
844 }
845
846 fn into_result(self) -> Vec<PeerId> {
847 match self {
848 PeerIterator::Disjoint(iter) => iter.into_result().collect(),
849 PeerIterator::Closest(iter) => iter.into_result().collect(),
850 }
851 }
852 }
853
854 #[test]
856 fn closest_and_disjoint_closest_yield_same_result() {
857 fn prop(
858 target: Target,
859 graph: Graph,
860 parallelism: Parallelism,
861 num_results: NumResults,
862 ) -> TestResult {
863 if parallelism.0 > num_results.0 {
864 return TestResult::discard();
865 }
866
867 let target: KeyBytes = target.0;
868 let closest_peer = graph.get_closest_peer(&target);
869
870 let mut known_closest_peers = graph
871 .0
872 .iter()
873 .take(K_VALUE.get())
874 .map(|(key, _peers)| Key::from(*key))
875 .collect::<Vec<_>>();
876 known_closest_peers.sort_unstable_by_key(|a| target.distance(a));
877
878 let cfg = ClosestPeersIterConfig {
879 parallelism: parallelism.0,
880 num_results: num_results.0,
881 ..ClosestPeersIterConfig::default()
882 };
883
884 let closest = drive_to_finish(
885 PeerIterator::Closest(ClosestPeersIter::with_config(
886 cfg.clone(),
887 target.clone(),
888 known_closest_peers.clone(),
889 )),
890 graph.clone(),
891 &target,
892 );
893
894 let disjoint = drive_to_finish(
895 PeerIterator::Disjoint(ClosestDisjointPeersIter::with_config(
896 cfg,
897 target.clone(),
898 known_closest_peers.clone(),
899 )),
900 graph,
901 &target,
902 );
903
904 assert!(
905 closest.contains(&closest_peer),
906 "Expected `ClosestPeersIter` to find closest peer.",
907 );
908 assert!(
909 disjoint.contains(&closest_peer),
910 "Expected `ClosestDisjointPeersIter` to find closest peer.",
911 );
912
913 assert!(
914 closest.len() == num_results.0.get(),
915 "Expected `ClosestPeersIter` to find `num_results` closest \
916 peers."
917 );
918 assert!(
919 disjoint.len() >= num_results.0.get(),
920 "Expected `ClosestDisjointPeersIter` to find at least \
921 `num_results` closest peers."
922 );
923
924 if closest.len() > disjoint.len() {
925 let closest_only = closest.difference(&disjoint).collect::<Vec<_>>();
926
927 panic!(
928 "Expected `ClosestDisjointPeersIter` to find all peers \
929 found by `ClosestPeersIter`, but it did not find {closest_only:?}.",
930 );
931 };
932
933 TestResult::passed()
934 }
935
936 fn drive_to_finish(
937 mut iter: PeerIterator,
938 mut graph: Graph,
939 target: &KeyBytes,
940 ) -> HashSet<PeerId> {
941 let now = Instant::now();
942 loop {
943 match iter.next(now) {
944 PeersIterState::Waiting(Some(peer_id)) => {
945 let peer_id = peer_id.clone().into_owned();
946 let closest_peers =
947 graph.0.get_mut(&peer_id).unwrap().get_closest_peers(target);
948 iter.on_success(&peer_id, closest_peers);
949 }
950 PeersIterState::WaitingAtCapacity | PeersIterState::Waiting(None) => {
951 panic!("There is never more than one request in flight.")
952 }
953 PeersIterState::Finished => break,
954 }
955 }
956
957 let mut result = iter
958 .into_result()
959 .into_iter()
960 .map(Key::from)
961 .collect::<Vec<_>>();
962 result.sort_unstable_by_key(|a| target.distance(a));
963 result.into_iter().map(|k| k.into_preimage()).collect()
964 }
965
966 QuickCheck::new()
967 .tests(10)
968 .quickcheck(prop as fn(_, _, _, _) -> _)
969 }
970
971 #[test]
972 fn failure_can_not_overwrite_previous_success() {
973 let now = Instant::now();
974 let peer = PeerId::random();
975 let mut iter = ClosestDisjointPeersIter::new(
976 Key::from(PeerId::random()).into(),
977 iter::once(Key::from(peer)),
978 );
979
980 assert!(matches!(iter.next(now), PeersIterState::Waiting(Some(_))));
981
982 assert!(iter.on_success(&peer, iter::empty()));
984 assert_eq!(
985 iter.contacted_peers.get(&peer),
986 Some(&PeerState {
987 initiated_by: IteratorIndex(0),
988 response: ResponseState::Succeeded,
989 })
990 );
991
992 assert!(!iter.on_failure(&peer));
994 assert_eq!(
995 iter.contacted_peers.get(&peer),
996 Some(&PeerState {
997 initiated_by: IteratorIndex(0),
998 response: ResponseState::Succeeded,
999 })
1000 );
1001 }
1002}