libp2p_kad/query/peers/closest/
disjoint.rs

1// Copyright 2020 Parity Technologies (UK) Ltd.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21use super::*;
22use crate::kbucket::{Key, KeyBytes};
23use instant::Instant;
24use libp2p_identity::PeerId;
25use std::{
26    collections::HashMap,
27    iter::{Cycle, Map, Peekable},
28    ops::{Index, IndexMut, Range},
29};
30
31/// Wraps around a set of [`ClosestPeersIter`], enforcing a disjoint discovery
32/// path per configured parallelism according to the S/Kademlia paper.
33pub(crate) struct ClosestDisjointPeersIter {
34    target: KeyBytes,
35
36    /// The set of wrapped [`ClosestPeersIter`].
37    iters: Vec<ClosestPeersIter>,
38    /// Order in which to query the iterators ensuring fairness across
39    /// [`ClosestPeersIter::next`] calls.
40    iter_order: Cycle<Map<Range<usize>, fn(usize) -> IteratorIndex>>,
41
42    /// Mapping of contacted peers by their [`PeerId`] to [`PeerState`]
43    /// containing the corresponding iterator indices as well as the response
44    /// state.
45    ///
46    /// Used to track which iterator contacted which peer. See [`PeerState`]
47    /// for details.
48    contacted_peers: HashMap<PeerId, PeerState>,
49}
50
51impl ClosestDisjointPeersIter {
52    /// Creates a new iterator with a default configuration.
53    #[cfg(test)]
54    pub(crate) fn new<I>(target: KeyBytes, known_closest_peers: I) -> Self
55    where
56        I: IntoIterator<Item = Key<PeerId>>,
57    {
58        Self::with_config(
59            ClosestPeersIterConfig::default(),
60            target,
61            known_closest_peers,
62        )
63    }
64
65    /// Creates a new iterator with the given configuration.
66    pub(crate) fn with_config<I, T>(
67        config: ClosestPeersIterConfig,
68        target: T,
69        known_closest_peers: I,
70    ) -> Self
71    where
72        I: IntoIterator<Item = Key<PeerId>>,
73        T: Into<KeyBytes> + Clone,
74    {
75        let peers = known_closest_peers
76            .into_iter()
77            .take(K_VALUE.get())
78            .collect::<Vec<_>>();
79        let iters = (0..config.parallelism.get())
80            // NOTE: All [`ClosestPeersIter`] share the same set of peers at
81            // initialization. The [`ClosestDisjointPeersIter.contacted_peers`]
82            // mapping ensures that a successful response from a peer is only
83            // ever passed to a single [`ClosestPeersIter`]. See
84            // [`ClosestDisjointPeersIter::on_success`] for details.
85            .map(|_| ClosestPeersIter::with_config(config.clone(), target.clone(), peers.clone()))
86            .collect::<Vec<_>>();
87
88        let iters_len = iters.len();
89
90        ClosestDisjointPeersIter {
91            target: target.into(),
92            iters,
93            iter_order: (0..iters_len)
94                .map(IteratorIndex as fn(usize) -> IteratorIndex)
95                .cycle(),
96            contacted_peers: HashMap::new(),
97        }
98    }
99
100    /// Callback for informing the iterator about a failed request to a peer.
101    ///
102    /// If the iterator is currently waiting for a result from `peer`,
103    /// the iterator state is updated and `true` is returned. In that
104    /// case, after calling this function, `next` should eventually be
105    /// called again to obtain the new state of the iterator.
106    ///
107    /// If the iterator is finished, it is not currently waiting for a
108    /// result from `peer`, or a result for `peer` has already been reported,
109    /// calling this function has no effect and `false` is returned.
110    pub(crate) fn on_failure(&mut self, peer: &PeerId) -> bool {
111        let mut updated = false;
112
113        if let Some(PeerState {
114            initiated_by,
115            response,
116        }) = self.contacted_peers.get_mut(peer)
117        {
118            updated = self.iters[*initiated_by].on_failure(peer);
119
120            if updated {
121                *response = ResponseState::Failed;
122            }
123
124            for (i, iter) in &mut self.iters.iter_mut().enumerate() {
125                if IteratorIndex(i) != *initiated_by {
126                    // This iterator never triggered an actual request to the
127                    // given peer - thus ignore the returned boolean.
128                    iter.on_failure(peer);
129                }
130            }
131        }
132
133        updated
134    }
135
136    /// Callback for delivering the result of a successful request to a peer.
137    ///
138    /// Delivering results of requests back to the iterator allows the iterator
139    /// to make progress. The iterator is said to make progress either when the
140    /// given `closer_peers` contain a peer closer to the target than any peer
141    /// seen so far, or when the iterator did not yet accumulate `num_results`
142    /// closest peers and `closer_peers` contains a new peer, regardless of its
143    /// distance to the target.
144    ///
145    /// If the iterator is currently waiting for a result from `peer`,
146    /// the iterator state is updated and `true` is returned. In that
147    /// case, after calling this function, `next` should eventually be
148    /// called again to obtain the new state of the iterator.
149    ///
150    /// If the iterator is finished, it is not currently waiting for a
151    /// result from `peer`, or a result for `peer` has already been reported,
152    /// calling this function has no effect and `false` is returned.
153    pub(crate) fn on_success<I>(&mut self, peer: &PeerId, closer_peers: I) -> bool
154    where
155        I: IntoIterator<Item = PeerId>,
156    {
157        let mut updated = false;
158
159        if let Some(PeerState {
160            initiated_by,
161            response,
162        }) = self.contacted_peers.get_mut(peer)
163        {
164            // Pass the new `closer_peers` to the iterator that first yielded
165            // the peer.
166            updated = self.iters[*initiated_by].on_success(peer, closer_peers);
167
168            if updated {
169                // Mark the response as succeeded for future iterators yielding
170                // this peer. There is no need to keep the `closer_peers`
171                // around, given that they are only passed to the first
172                // iterator.
173                *response = ResponseState::Succeeded;
174            }
175
176            for (i, iter) in &mut self.iters.iter_mut().enumerate() {
177                if IteratorIndex(i) != *initiated_by {
178                    // Only report the success to all remaining not-first
179                    // iterators. Do not pass the `closer_peers` in order to
180                    // uphold the S/Kademlia disjoint paths guarantee.
181                    //
182                    // This iterator never triggered an actual request to the
183                    // given peer - thus ignore the returned boolean.
184                    iter.on_success(peer, std::iter::empty());
185                }
186            }
187        }
188
189        updated
190    }
191
192    pub(crate) fn next(&mut self, now: Instant) -> PeersIterState<'_> {
193        let mut state = None;
194
195        // Ensure querying each iterator at most once.
196        for _ in 0..self.iters.len() {
197            let i = self.iter_order.next().expect("Cycle never ends.");
198            let iter = &mut self.iters[i];
199
200            loop {
201                match iter.next(now) {
202                    PeersIterState::Waiting(None) => {
203                        match state {
204                            Some(PeersIterState::Waiting(Some(_))) => {
205                                // [`ClosestDisjointPeersIter::next`] returns immediately once a
206                                // [`ClosestPeersIter`] yielded a peer. Thus this state is
207                                // unreachable.
208                                unreachable!();
209                            }
210                            Some(PeersIterState::Waiting(None)) => {}
211                            Some(PeersIterState::WaitingAtCapacity) => {
212                                // At least one ClosestPeersIter is no longer at capacity, thus the
213                                // composite ClosestDisjointPeersIter is no longer at capacity.
214                                state = Some(PeersIterState::Waiting(None))
215                            }
216                            Some(PeersIterState::Finished) => {
217                                // `state` is never set to `Finished`.
218                                unreachable!();
219                            }
220                            None => state = Some(PeersIterState::Waiting(None)),
221                        };
222
223                        break;
224                    }
225                    PeersIterState::Waiting(Some(peer)) => {
226                        match self.contacted_peers.get_mut(&*peer) {
227                            Some(PeerState { response, .. }) => {
228                                // Another iterator already contacted this peer.
229                                let peer = peer.into_owned();
230
231                                match response {
232                                    // The iterator will be notified later whether the given node
233                                    // was successfully contacted or not. See
234                                    // [`ClosestDisjointPeersIter::on_success`] for details.
235                                    ResponseState::Waiting => {}
236                                    ResponseState::Succeeded => {
237                                        // Given that iterator was not the first to contact the peer
238                                        // it will not be made aware of the closer peers discovered
239                                        // to uphold the S/Kademlia disjoint paths guarantee. See
240                                        // [`ClosestDisjointPeersIter::on_success`] for details.
241                                        iter.on_success(&peer, std::iter::empty());
242                                    }
243                                    ResponseState::Failed => {
244                                        iter.on_failure(&peer);
245                                    }
246                                }
247                            }
248                            None => {
249                                // The iterator is the first to contact this peer.
250                                self.contacted_peers
251                                    .insert(peer.clone().into_owned(), PeerState::new(i));
252                                return PeersIterState::Waiting(Some(Cow::Owned(
253                                    peer.into_owned(),
254                                )));
255                            }
256                        }
257                    }
258                    PeersIterState::WaitingAtCapacity => {
259                        match state {
260                            Some(PeersIterState::Waiting(Some(_))) => {
261                                // [`ClosestDisjointPeersIter::next`] returns immediately once a
262                                // [`ClosestPeersIter`] yielded a peer. Thus this state is
263                                // unreachable.
264                                unreachable!();
265                            }
266                            Some(PeersIterState::Waiting(None)) => {}
267                            Some(PeersIterState::WaitingAtCapacity) => {}
268                            Some(PeersIterState::Finished) => {
269                                // `state` is never set to `Finished`.
270                                unreachable!();
271                            }
272                            None => state = Some(PeersIterState::WaitingAtCapacity),
273                        };
274
275                        break;
276                    }
277                    PeersIterState::Finished => break,
278                }
279            }
280        }
281
282        state.unwrap_or(PeersIterState::Finished)
283    }
284
285    /// Finishes all paths containing one of the given peers.
286    ///
287    /// See [`crate::query::Query::try_finish`] for details.
288    pub(crate) fn finish_paths<'a, I>(&mut self, peers: I) -> bool
289    where
290        I: IntoIterator<Item = &'a PeerId>,
291    {
292        for peer in peers {
293            if let Some(PeerState { initiated_by, .. }) = self.contacted_peers.get_mut(peer) {
294                self.iters[*initiated_by].finish();
295            }
296        }
297
298        self.is_finished()
299    }
300
301    /// Immediately transitions the iterator to [`PeersIterState::Finished`].
302    pub(crate) fn finish(&mut self) {
303        for iter in &mut self.iters {
304            iter.finish();
305        }
306    }
307
308    /// Checks whether the iterator has finished.
309    pub(crate) fn is_finished(&self) -> bool {
310        self.iters.iter().all(|i| i.is_finished())
311    }
312
313    /// Note: In the case of no adversarial peers or connectivity issues along
314    ///       any path, all paths return the same result, deduplicated through
315    ///       the `ResultIter`, thus overall `into_result` returns
316    ///       `num_results`. In the case of adversarial peers or connectivity
317    ///       issues `ClosestDisjointPeersIter` tries to return the
318    ///       `num_results` closest benign peers, but as it can not
319    ///       differentiate benign from faulty paths it as well returns faulty
320    ///       peers and thus overall returns more than `num_results` peers.
321    pub(crate) fn into_result(self) -> impl Iterator<Item = PeerId> {
322        let result_per_path = self
323            .iters
324            .into_iter()
325            .map(|iter| iter.into_result().map(Key::from));
326
327        ResultIter::new(self.target, result_per_path).map(Key::into_preimage)
328    }
329}
330
331/// Index into the [`ClosestDisjointPeersIter`] `iters` vector.
332#[derive(Debug, Clone, Copy, PartialEq, Eq)]
333struct IteratorIndex(usize);
334
335impl Index<IteratorIndex> for Vec<ClosestPeersIter> {
336    type Output = ClosestPeersIter;
337
338    fn index(&self, index: IteratorIndex) -> &Self::Output {
339        &self[index.0]
340    }
341}
342
343impl IndexMut<IteratorIndex> for Vec<ClosestPeersIter> {
344    fn index_mut(&mut self, index: IteratorIndex) -> &mut Self::Output {
345        &mut self[index.0]
346    }
347}
348
349/// State tracking the iterator that yielded (i.e. tried to contact) a peer. See
350/// [`ClosestDisjointPeersIter::on_success`] for details.
351#[derive(Debug, PartialEq, Eq)]
352struct PeerState {
353    /// First iterator to yield the peer. Will be notified both of the outcome
354    /// (success/failure) as well as the closer peers.
355    initiated_by: IteratorIndex,
356    /// Keeping track of the response state. In case other iterators later on
357    /// yield the same peer, they can be notified of the response outcome.
358    response: ResponseState,
359}
360
361impl PeerState {
362    fn new(initiated_by: IteratorIndex) -> Self {
363        PeerState {
364            initiated_by,
365            response: ResponseState::Waiting,
366        }
367    }
368}
369
370#[derive(Debug, PartialEq, Eq)]
371enum ResponseState {
372    Waiting,
373    Succeeded,
374    Failed,
375}
376
377/// Iterator combining the result of multiple [`ClosestPeersIter`] into a single
378/// deduplicated ordered iterator.
379//
380// Note: This operates under the assumption that `I` is ordered.
381#[derive(Clone, Debug)]
382struct ResultIter<I>
383where
384    I: Iterator<Item = Key<PeerId>>,
385{
386    target: KeyBytes,
387    iters: Vec<Peekable<I>>,
388}
389
390impl<I: Iterator<Item = Key<PeerId>>> ResultIter<I> {
391    fn new(target: KeyBytes, iters: impl Iterator<Item = I>) -> Self {
392        ResultIter {
393            target,
394            iters: iters.map(Iterator::peekable).collect(),
395        }
396    }
397}
398
399impl<I: Iterator<Item = Key<PeerId>>> Iterator for ResultIter<I> {
400    type Item = I::Item;
401
402    fn next(&mut self) -> Option<Self::Item> {
403        let target = &self.target;
404
405        self.iters
406            .iter_mut()
407            // Find the iterator with the next closest peer.
408            .fold(Option::<&mut Peekable<_>>::None, |iter_a, iter_b| {
409                let Some(iter_a) = iter_a else {
410                    return Some(iter_b);
411                };
412
413                match (iter_a.peek(), iter_b.peek()) {
414                    (Some(next_a), Some(next_b)) => {
415                        if next_a == next_b {
416                            // Remove from one for deduplication.
417                            iter_b.next();
418                            return Some(iter_a);
419                        }
420
421                        if target.distance(next_a) < target.distance(next_b) {
422                            Some(iter_a)
423                        } else {
424                            Some(iter_b)
425                        }
426                    }
427                    (Some(_), None) => Some(iter_a),
428                    (None, Some(_)) => Some(iter_b),
429                    (None, None) => None,
430                }
431            })
432            // Pop off the next closest peer from that iterator.
433            .and_then(Iterator::next)
434    }
435}
436
437#[cfg(test)]
438mod tests {
439    use super::*;
440
441    use crate::{K_VALUE, SHA_256_MH};
442    use libp2p_core::multihash::Multihash;
443    use quickcheck::*;
444    use std::collections::HashSet;
445    use std::iter;
446
447    impl Arbitrary for ResultIter<std::vec::IntoIter<Key<PeerId>>> {
448        fn arbitrary(g: &mut Gen) -> Self {
449            let target = Target::arbitrary(g).0;
450            let num_closest_iters = g.gen_range(0..20 + 1);
451            let peers = random_peers(g.gen_range(0..20 * num_closest_iters + 1), g);
452
453            let iters = (0..num_closest_iters).map(|_| {
454                let num_peers = g.gen_range(0..20 + 1);
455                let mut peers = g
456                    .choose_multiple(&peers, num_peers)
457                    .cloned()
458                    .map(Key::from)
459                    .collect::<Vec<_>>();
460
461                peers.sort_unstable_by_key(|a| target.distance(a));
462
463                peers.into_iter()
464            });
465
466            ResultIter::new(target.clone(), iters)
467        }
468
469        fn shrink(&self) -> Box<dyn Iterator<Item = Self>> {
470            let peers = self
471                .iters
472                .clone()
473                .into_iter()
474                .flatten()
475                .collect::<HashSet<_>>()
476                .into_iter()
477                .collect::<Vec<_>>();
478
479            let iters = self
480                .iters
481                .clone()
482                .into_iter()
483                .map(|iter| iter.collect::<Vec<_>>())
484                .collect();
485
486            Box::new(ResultIterShrinker {
487                target: self.target.clone(),
488                peers,
489                iters,
490            })
491        }
492    }
493
494    struct ResultIterShrinker {
495        target: KeyBytes,
496        peers: Vec<Key<PeerId>>,
497        iters: Vec<Vec<Key<PeerId>>>,
498    }
499
500    impl Iterator for ResultIterShrinker {
501        type Item = ResultIter<std::vec::IntoIter<Key<PeerId>>>;
502
503        /// Return an iterator of [`ResultIter`]s with each of them missing a
504        /// different peer from the original set.
505        fn next(&mut self) -> Option<Self::Item> {
506            // The peer that should not be included.
507            let peer = self.peers.pop()?;
508
509            let iters = self.iters.clone().into_iter().filter_map(|mut iter| {
510                iter.retain(|p| p != &peer);
511                if iter.is_empty() {
512                    return None;
513                }
514                Some(iter.into_iter())
515            });
516
517            Some(ResultIter::new(self.target.clone(), iters))
518        }
519    }
520
521    #[derive(Clone, Debug)]
522    struct ArbitraryPeerId(PeerId);
523
524    impl Arbitrary for ArbitraryPeerId {
525        fn arbitrary(g: &mut Gen) -> ArbitraryPeerId {
526            let hash: [u8; 32] = core::array::from_fn(|_| u8::arbitrary(g));
527            let peer_id =
528                PeerId::from_multihash(Multihash::wrap(SHA_256_MH, &hash).unwrap()).unwrap();
529            ArbitraryPeerId(peer_id)
530        }
531    }
532
533    #[derive(Clone, Debug)]
534    struct Target(KeyBytes);
535
536    impl Arbitrary for Target {
537        fn arbitrary(g: &mut Gen) -> Self {
538            let peer_id = ArbitraryPeerId::arbitrary(g).0;
539            Target(Key::from(peer_id).into())
540        }
541    }
542
543    fn random_peers(n: usize, g: &mut Gen) -> Vec<PeerId> {
544        (0..n).map(|_| ArbitraryPeerId::arbitrary(g).0).collect()
545    }
546
547    #[test]
548    fn result_iter_returns_deduplicated_ordered_peer_id_stream() {
549        fn prop(result_iter: ResultIter<std::vec::IntoIter<Key<PeerId>>>) {
550            let expected = {
551                let mut deduplicated = result_iter
552                    .clone()
553                    .iters
554                    .into_iter()
555                    .flatten()
556                    .collect::<HashSet<_>>()
557                    .into_iter()
558                    .map(Key::from)
559                    .collect::<Vec<_>>();
560
561                deduplicated.sort_unstable_by(|a, b| {
562                    result_iter
563                        .target
564                        .distance(a)
565                        .cmp(&result_iter.target.distance(b))
566                });
567
568                deduplicated
569            };
570
571            assert_eq!(expected, result_iter.collect::<Vec<_>>());
572        }
573
574        QuickCheck::new().quickcheck(prop as fn(_))
575    }
576
577    #[derive(Debug, Clone)]
578    struct Parallelism(NonZeroUsize);
579
580    impl Arbitrary for Parallelism {
581        fn arbitrary(g: &mut Gen) -> Self {
582            Parallelism(NonZeroUsize::new(g.gen_range(1..10)).unwrap())
583        }
584    }
585
586    #[derive(Debug, Clone)]
587    struct NumResults(NonZeroUsize);
588
589    impl Arbitrary for NumResults {
590        fn arbitrary(g: &mut Gen) -> Self {
591            NumResults(NonZeroUsize::new(g.gen_range(1..K_VALUE.get())).unwrap())
592        }
593    }
594
595    impl Arbitrary for ClosestPeersIterConfig {
596        fn arbitrary(g: &mut Gen) -> Self {
597            ClosestPeersIterConfig {
598                parallelism: Parallelism::arbitrary(g).0,
599                num_results: NumResults::arbitrary(g).0,
600                peer_timeout: Duration::from_secs(1),
601            }
602        }
603    }
604
605    #[derive(Debug, Clone)]
606    struct PeerVec(Vec<Key<PeerId>>);
607
608    impl Arbitrary for PeerVec {
609        fn arbitrary(g: &mut Gen) -> Self {
610            PeerVec(
611                (0..g.gen_range(1..60u8))
612                    .map(|_| ArbitraryPeerId::arbitrary(g).0)
613                    .map(Key::from)
614                    .collect(),
615            )
616        }
617    }
618
619    #[test]
620    fn s_kademlia_disjoint_paths() {
621        let now = Instant::now();
622        let target: KeyBytes = Key::from(PeerId::random()).into();
623
624        let mut pool = [0; 12]
625            .iter()
626            .map(|_| Key::from(PeerId::random()))
627            .collect::<Vec<_>>();
628
629        pool.sort_unstable_by_key(|a| target.distance(a));
630
631        let known_closest_peers = pool.split_off(pool.len() - 3);
632
633        let config = ClosestPeersIterConfig {
634            parallelism: NonZeroUsize::new(3).unwrap(),
635            num_results: NonZeroUsize::new(3).unwrap(),
636            ..ClosestPeersIterConfig::default()
637        };
638
639        let mut peers_iter =
640            ClosestDisjointPeersIter::with_config(config, target, known_closest_peers.clone());
641
642        ////////////////////////////////////////////////////////////////////////
643        // First round.
644
645        for _ in 0..3 {
646            if let PeersIterState::Waiting(Some(Cow::Owned(peer))) = peers_iter.next(now) {
647                assert!(known_closest_peers.contains(&Key::from(peer)));
648            } else {
649                panic!("Expected iterator to return peer to query.");
650            }
651        }
652
653        assert_eq!(PeersIterState::WaitingAtCapacity, peers_iter.next(now),);
654
655        let response_2 = pool.split_off(pool.len() - 3);
656        let response_3 = pool.split_off(pool.len() - 3);
657        // Keys are closer than any of the previous two responses from honest
658        // node 1 and 2.
659        let malicious_response_1 = pool.split_off(pool.len() - 3);
660
661        // Response from malicious peer 1.
662        peers_iter.on_success(
663            known_closest_peers[0].preimage(),
664            malicious_response_1
665                .clone()
666                .into_iter()
667                .map(|k| *k.preimage()),
668        );
669
670        // Response from peer 2.
671        peers_iter.on_success(
672            known_closest_peers[1].preimage(),
673            response_2.clone().into_iter().map(|k| *k.preimage()),
674        );
675
676        // Response from peer 3.
677        peers_iter.on_success(
678            known_closest_peers[2].preimage(),
679            response_3.clone().into_iter().map(|k| *k.preimage()),
680        );
681
682        ////////////////////////////////////////////////////////////////////////
683        // Second round.
684
685        let mut next_to_query = vec![];
686        for _ in 0..3 {
687            if let PeersIterState::Waiting(Some(Cow::Owned(peer))) = peers_iter.next(now) {
688                next_to_query.push(peer)
689            } else {
690                panic!("Expected iterator to return peer to query.");
691            }
692        }
693
694        // Expect a peer from each disjoint path.
695        assert!(next_to_query.contains(malicious_response_1[0].preimage()));
696        assert!(next_to_query.contains(response_2[0].preimage()));
697        assert!(next_to_query.contains(response_3[0].preimage()));
698
699        for peer in next_to_query {
700            peers_iter.on_success(&peer, vec![]);
701        }
702
703        // Mark all remaining peers as succeeded.
704        for _ in 0..6 {
705            if let PeersIterState::Waiting(Some(Cow::Owned(peer))) = peers_iter.next(now) {
706                peers_iter.on_success(&peer, vec![]);
707            } else {
708                panic!("Expected iterator to return peer to query.");
709            }
710        }
711
712        assert_eq!(PeersIterState::Finished, peers_iter.next(now),);
713
714        let final_peers: Vec<_> = peers_iter.into_result().collect();
715
716        // Expect final result to contain peer from each disjoint path, even
717        // though not all are among the best ones.
718        assert!(final_peers.contains(malicious_response_1[0].preimage()));
719        assert!(final_peers.contains(response_2[0].preimage()));
720        assert!(final_peers.contains(response_3[0].preimage()));
721    }
722
723    #[derive(Clone)]
724    struct Graph(HashMap<PeerId, Peer>);
725
726    impl std::fmt::Debug for Graph {
727        fn fmt(&self, fmt: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
728            fmt.debug_list().entries(self.0.keys()).finish()
729        }
730    }
731
732    impl Arbitrary for Graph {
733        fn arbitrary(g: &mut Gen) -> Self {
734            let mut peer_ids = random_peers(g.gen_range(K_VALUE.get()..200), g)
735                .into_iter()
736                .map(|peer_id| (peer_id, Key::from(peer_id)))
737                .collect::<Vec<_>>();
738
739            // Make each peer aware of its direct neighborhood.
740            let mut peers = peer_ids
741                .clone()
742                .into_iter()
743                .map(|(peer_id, key)| {
744                    peer_ids
745                        .sort_unstable_by(|(_, a), (_, b)| key.distance(a).cmp(&key.distance(b)));
746
747                    assert_eq!(peer_id, peer_ids[0].0);
748
749                    let known_peers = peer_ids
750                        .iter()
751                        // Skip itself.
752                        .skip(1)
753                        .take(K_VALUE.get())
754                        .cloned()
755                        .collect::<Vec<_>>();
756
757                    (peer_id, Peer { known_peers })
758                })
759                .collect::<HashMap<_, _>>();
760
761            // Make each peer aware of a random set of other peers within the graph.
762            for (peer_id, peer) in peers.iter_mut() {
763                g.shuffle(&mut peer_ids);
764
765                let num_peers = g.gen_range(K_VALUE.get()..peer_ids.len() + 1);
766                let mut random_peer_ids = g
767                    .choose_multiple(&peer_ids, num_peers)
768                    // Make sure not to include itself.
769                    .filter(|(id, _)| peer_id != id)
770                    .cloned()
771                    .collect::<Vec<_>>();
772
773                peer.known_peers.append(&mut random_peer_ids);
774                peer.known_peers = std::mem::take(&mut peer.known_peers)
775                    // Deduplicate peer ids.
776                    .into_iter()
777                    .collect::<HashSet<_>>()
778                    .into_iter()
779                    .collect();
780            }
781
782            Graph(peers)
783        }
784    }
785
786    impl Graph {
787        fn get_closest_peer(&self, target: &KeyBytes) -> PeerId {
788            *self
789                .0
790                .keys()
791                .map(|peer_id| (target.distance(&Key::from(*peer_id)), peer_id))
792                .fold(None, |acc, (distance_b, peer_id_b)| match acc {
793                    None => Some((distance_b, peer_id_b)),
794                    Some((distance_a, peer_id_a)) => {
795                        if distance_a < distance_b {
796                            Some((distance_a, peer_id_a))
797                        } else {
798                            Some((distance_b, peer_id_b))
799                        }
800                    }
801                })
802                .expect("Graph to have at least one peer.")
803                .1
804        }
805    }
806
807    #[derive(Debug, Clone)]
808    struct Peer {
809        known_peers: Vec<(PeerId, Key<PeerId>)>,
810    }
811
812    impl Peer {
813        fn get_closest_peers(&mut self, target: &KeyBytes) -> Vec<PeerId> {
814            self.known_peers
815                .sort_unstable_by(|(_, a), (_, b)| target.distance(a).cmp(&target.distance(b)));
816
817            self.known_peers
818                .iter()
819                .take(K_VALUE.get())
820                .map(|(id, _)| id)
821                .cloned()
822                .collect()
823        }
824    }
825
826    enum PeerIterator {
827        Disjoint(ClosestDisjointPeersIter),
828        Closest(ClosestPeersIter),
829    }
830
831    impl PeerIterator {
832        fn next(&mut self, now: Instant) -> PeersIterState<'_> {
833            match self {
834                PeerIterator::Disjoint(iter) => iter.next(now),
835                PeerIterator::Closest(iter) => iter.next(now),
836            }
837        }
838
839        fn on_success(&mut self, peer: &PeerId, closer_peers: Vec<PeerId>) {
840            match self {
841                PeerIterator::Disjoint(iter) => iter.on_success(peer, closer_peers),
842                PeerIterator::Closest(iter) => iter.on_success(peer, closer_peers),
843            };
844        }
845
846        fn into_result(self) -> Vec<PeerId> {
847            match self {
848                PeerIterator::Disjoint(iter) => iter.into_result().collect(),
849                PeerIterator::Closest(iter) => iter.into_result().collect(),
850            }
851        }
852    }
853
854    /// Ensure [`ClosestPeersIter`] and [`ClosestDisjointPeersIter`] yield same closest peers.
855    #[test]
856    fn closest_and_disjoint_closest_yield_same_result() {
857        fn prop(
858            target: Target,
859            graph: Graph,
860            parallelism: Parallelism,
861            num_results: NumResults,
862        ) -> TestResult {
863            if parallelism.0 > num_results.0 {
864                return TestResult::discard();
865            }
866
867            let target: KeyBytes = target.0;
868            let closest_peer = graph.get_closest_peer(&target);
869
870            let mut known_closest_peers = graph
871                .0
872                .iter()
873                .take(K_VALUE.get())
874                .map(|(key, _peers)| Key::from(*key))
875                .collect::<Vec<_>>();
876            known_closest_peers.sort_unstable_by_key(|a| target.distance(a));
877
878            let cfg = ClosestPeersIterConfig {
879                parallelism: parallelism.0,
880                num_results: num_results.0,
881                ..ClosestPeersIterConfig::default()
882            };
883
884            let closest = drive_to_finish(
885                PeerIterator::Closest(ClosestPeersIter::with_config(
886                    cfg.clone(),
887                    target.clone(),
888                    known_closest_peers.clone(),
889                )),
890                graph.clone(),
891                &target,
892            );
893
894            let disjoint = drive_to_finish(
895                PeerIterator::Disjoint(ClosestDisjointPeersIter::with_config(
896                    cfg,
897                    target.clone(),
898                    known_closest_peers.clone(),
899                )),
900                graph,
901                &target,
902            );
903
904            assert!(
905                closest.contains(&closest_peer),
906                "Expected `ClosestPeersIter` to find closest peer.",
907            );
908            assert!(
909                disjoint.contains(&closest_peer),
910                "Expected `ClosestDisjointPeersIter` to find closest peer.",
911            );
912
913            assert!(
914                closest.len() == num_results.0.get(),
915                "Expected `ClosestPeersIter` to find `num_results` closest \
916                 peers."
917            );
918            assert!(
919                disjoint.len() >= num_results.0.get(),
920                "Expected `ClosestDisjointPeersIter` to find at least \
921                 `num_results` closest peers."
922            );
923
924            if closest.len() > disjoint.len() {
925                let closest_only = closest.difference(&disjoint).collect::<Vec<_>>();
926
927                panic!(
928                    "Expected `ClosestDisjointPeersIter` to find all peers \
929                     found by `ClosestPeersIter`, but it did not find {closest_only:?}.",
930                );
931            };
932
933            TestResult::passed()
934        }
935
936        fn drive_to_finish(
937            mut iter: PeerIterator,
938            mut graph: Graph,
939            target: &KeyBytes,
940        ) -> HashSet<PeerId> {
941            let now = Instant::now();
942            loop {
943                match iter.next(now) {
944                    PeersIterState::Waiting(Some(peer_id)) => {
945                        let peer_id = peer_id.clone().into_owned();
946                        let closest_peers =
947                            graph.0.get_mut(&peer_id).unwrap().get_closest_peers(target);
948                        iter.on_success(&peer_id, closest_peers);
949                    }
950                    PeersIterState::WaitingAtCapacity | PeersIterState::Waiting(None) => {
951                        panic!("There is never more than one request in flight.")
952                    }
953                    PeersIterState::Finished => break,
954                }
955            }
956
957            let mut result = iter
958                .into_result()
959                .into_iter()
960                .map(Key::from)
961                .collect::<Vec<_>>();
962            result.sort_unstable_by_key(|a| target.distance(a));
963            result.into_iter().map(|k| k.into_preimage()).collect()
964        }
965
966        QuickCheck::new()
967            .tests(10)
968            .quickcheck(prop as fn(_, _, _, _) -> _)
969    }
970
971    #[test]
972    fn failure_can_not_overwrite_previous_success() {
973        let now = Instant::now();
974        let peer = PeerId::random();
975        let mut iter = ClosestDisjointPeersIter::new(
976            Key::from(PeerId::random()).into(),
977            iter::once(Key::from(peer)),
978        );
979
980        assert!(matches!(iter.next(now), PeersIterState::Waiting(Some(_))));
981
982        // Expect peer to be marked as succeeded.
983        assert!(iter.on_success(&peer, iter::empty()));
984        assert_eq!(
985            iter.contacted_peers.get(&peer),
986            Some(&PeerState {
987                initiated_by: IteratorIndex(0),
988                response: ResponseState::Succeeded,
989            })
990        );
991
992        // Expect peer to stay marked as succeeded.
993        assert!(!iter.on_failure(&peer));
994        assert_eq!(
995            iter.contacted_peers.get(&peer),
996            Some(&PeerState {
997                initiated_by: IteratorIndex(0),
998                response: ResponseState::Succeeded,
999            })
1000        );
1001    }
1002}