1mod peers;
22
23use peers::closest::{
24 disjoint::ClosestDisjointPeersIter, ClosestPeersIter, ClosestPeersIterConfig,
25};
26use peers::fixed::FixedPeersIter;
27use peers::PeersIterState;
28
29use crate::kbucket::{Key, KeyBytes};
30use crate::{ALPHA_VALUE, K_VALUE};
31use either::Either;
32use fnv::FnvHashMap;
33use instant::Instant;
34use libp2p_identity::PeerId;
35use std::{num::NonZeroUsize, time::Duration};
36
37pub(crate) struct QueryPool<TInner> {
43 next_id: usize,
44 config: QueryConfig,
45 queries: FnvHashMap<QueryId, Query<TInner>>,
46}
47
48pub(crate) enum QueryPoolState<'a, TInner> {
50 Idle,
52 Waiting(Option<(&'a mut Query<TInner>, PeerId)>),
55 Finished(Query<TInner>),
57 Timeout(Query<TInner>),
59}
60
61impl<TInner> QueryPool<TInner> {
62 pub(crate) fn new(config: QueryConfig) -> Self {
64 QueryPool {
65 next_id: 0,
66 config,
67 queries: Default::default(),
68 }
69 }
70
71 pub(crate) fn config(&self) -> &QueryConfig {
73 &self.config
74 }
75
76 pub(crate) fn iter(&self) -> impl Iterator<Item = &Query<TInner>> {
78 self.queries.values()
79 }
80
81 pub(crate) fn size(&self) -> usize {
83 self.queries.len()
84 }
85
86 pub(crate) fn iter_mut(&mut self) -> impl Iterator<Item = &mut Query<TInner>> {
88 self.queries.values_mut()
89 }
90
91 pub(crate) fn add_fixed<I>(&mut self, peers: I, inner: TInner) -> QueryId
93 where
94 I: IntoIterator<Item = PeerId>,
95 {
96 let id = self.next_query_id();
97 self.continue_fixed(id, peers, inner);
98 id
99 }
100
101 pub(crate) fn continue_fixed<I>(&mut self, id: QueryId, peers: I, inner: TInner)
105 where
106 I: IntoIterator<Item = PeerId>,
107 {
108 assert!(!self.queries.contains_key(&id));
109 let parallelism = self.config.replication_factor;
110 let peer_iter = QueryPeerIter::Fixed(FixedPeersIter::new(peers, parallelism));
111 let query = Query::new(id, peer_iter, inner);
112 self.queries.insert(id, query);
113 }
114
115 pub(crate) fn add_iter_closest<T, I>(&mut self, target: T, peers: I, inner: TInner) -> QueryId
117 where
118 T: Into<KeyBytes> + Clone,
119 I: IntoIterator<Item = Key<PeerId>>,
120 {
121 let id = self.next_query_id();
122 self.continue_iter_closest(id, target, peers, inner);
123 id
124 }
125
126 pub(crate) fn continue_iter_closest<T, I>(
128 &mut self,
129 id: QueryId,
130 target: T,
131 peers: I,
132 inner: TInner,
133 ) where
134 T: Into<KeyBytes> + Clone,
135 I: IntoIterator<Item = Key<PeerId>>,
136 {
137 let cfg = ClosestPeersIterConfig {
138 num_results: self.config.replication_factor,
139 parallelism: self.config.parallelism,
140 ..ClosestPeersIterConfig::default()
141 };
142
143 let peer_iter = if self.config.disjoint_query_paths {
144 QueryPeerIter::ClosestDisjoint(ClosestDisjointPeersIter::with_config(
145 cfg, target, peers,
146 ))
147 } else {
148 QueryPeerIter::Closest(ClosestPeersIter::with_config(cfg, target, peers))
149 };
150
151 let query = Query::new(id, peer_iter, inner);
152 self.queries.insert(id, query);
153 }
154
155 fn next_query_id(&mut self) -> QueryId {
156 let id = QueryId(self.next_id);
157 self.next_id = self.next_id.wrapping_add(1);
158 id
159 }
160
161 pub(crate) fn get(&self, id: &QueryId) -> Option<&Query<TInner>> {
163 self.queries.get(id)
164 }
165
166 pub(crate) fn get_mut(&mut self, id: &QueryId) -> Option<&mut Query<TInner>> {
168 self.queries.get_mut(id)
169 }
170
171 pub(crate) fn poll(&mut self, now: Instant) -> QueryPoolState<'_, TInner> {
173 let mut finished = None;
174 let mut timeout = None;
175 let mut waiting = None;
176
177 for (&query_id, query) in self.queries.iter_mut() {
178 query.stats.start = query.stats.start.or(Some(now));
179 match query.next(now) {
180 PeersIterState::Finished => {
181 finished = Some(query_id);
182 break;
183 }
184 PeersIterState::Waiting(Some(peer_id)) => {
185 let peer = peer_id.into_owned();
186 waiting = Some((query_id, peer));
187 break;
188 }
189 PeersIterState::Waiting(None) | PeersIterState::WaitingAtCapacity => {
190 let elapsed = now - query.stats.start.unwrap_or(now);
191 if elapsed >= self.config.timeout {
192 timeout = Some(query_id);
193 break;
194 }
195 }
196 }
197 }
198
199 if let Some((query_id, peer_id)) = waiting {
200 let query = self.queries.get_mut(&query_id).expect("s.a.");
201 return QueryPoolState::Waiting(Some((query, peer_id)));
202 }
203
204 if let Some(query_id) = finished {
205 let mut query = self.queries.remove(&query_id).expect("s.a.");
206 query.stats.end = Some(now);
207 return QueryPoolState::Finished(query);
208 }
209
210 if let Some(query_id) = timeout {
211 let mut query = self.queries.remove(&query_id).expect("s.a.");
212 query.stats.end = Some(now);
213 return QueryPoolState::Timeout(query);
214 }
215
216 if self.queries.is_empty() {
217 QueryPoolState::Idle
218 } else {
219 QueryPoolState::Waiting(None)
220 }
221 }
222}
223
224#[derive(Debug, Copy, Clone, Hash, PartialEq, Eq)]
226pub struct QueryId(usize);
227
228impl std::fmt::Display for QueryId {
229 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
230 write!(f, "{}", self.0)
231 }
232}
233
234#[derive(Debug, Clone)]
236pub(crate) struct QueryConfig {
237 pub(crate) timeout: Duration,
241 pub(crate) replication_factor: NonZeroUsize,
245 pub(crate) parallelism: NonZeroUsize,
249 pub(crate) disjoint_query_paths: bool,
253}
254
255impl Default for QueryConfig {
256 fn default() -> Self {
257 QueryConfig {
258 timeout: Duration::from_secs(60),
259 replication_factor: NonZeroUsize::new(K_VALUE.get()).expect("K_VALUE > 0"),
260 parallelism: ALPHA_VALUE,
261 disjoint_query_paths: false,
262 }
263 }
264}
265
266pub(crate) struct Query<TInner> {
268 id: QueryId,
270 peer_iter: QueryPeerIter,
272 stats: QueryStats,
274 pub(crate) inner: TInner,
276}
277
278enum QueryPeerIter {
280 Closest(ClosestPeersIter),
281 ClosestDisjoint(ClosestDisjointPeersIter),
282 Fixed(FixedPeersIter),
283}
284
285impl<TInner> Query<TInner> {
286 fn new(id: QueryId, peer_iter: QueryPeerIter, inner: TInner) -> Self {
288 Query {
289 id,
290 inner,
291 peer_iter,
292 stats: QueryStats::empty(),
293 }
294 }
295
296 pub(crate) fn id(&self) -> QueryId {
298 self.id
299 }
300
301 pub(crate) fn stats(&self) -> &QueryStats {
303 &self.stats
304 }
305
306 pub(crate) fn on_failure(&mut self, peer: &PeerId) {
308 let updated = match &mut self.peer_iter {
309 QueryPeerIter::Closest(iter) => iter.on_failure(peer),
310 QueryPeerIter::ClosestDisjoint(iter) => iter.on_failure(peer),
311 QueryPeerIter::Fixed(iter) => iter.on_failure(peer),
312 };
313 if updated {
314 self.stats.failure += 1;
315 }
316 }
317
318 pub(crate) fn on_success<I>(&mut self, peer: &PeerId, new_peers: I)
322 where
323 I: IntoIterator<Item = PeerId>,
324 {
325 let updated = match &mut self.peer_iter {
326 QueryPeerIter::Closest(iter) => iter.on_success(peer, new_peers),
327 QueryPeerIter::ClosestDisjoint(iter) => iter.on_success(peer, new_peers),
328 QueryPeerIter::Fixed(iter) => iter.on_success(peer),
329 };
330 if updated {
331 self.stats.success += 1;
332 }
333 }
334
335 fn next(&mut self, now: Instant) -> PeersIterState<'_> {
337 let state = match &mut self.peer_iter {
338 QueryPeerIter::Closest(iter) => iter.next(now),
339 QueryPeerIter::ClosestDisjoint(iter) => iter.next(now),
340 QueryPeerIter::Fixed(iter) => iter.next(),
341 };
342
343 if let PeersIterState::Waiting(Some(_)) = state {
344 self.stats.requests += 1;
345 }
346
347 state
348 }
349
350 pub(crate) fn try_finish<'a, I>(&mut self, peers: I) -> bool
368 where
369 I: IntoIterator<Item = &'a PeerId>,
370 {
371 match &mut self.peer_iter {
372 QueryPeerIter::Closest(iter) => {
373 iter.finish();
374 true
375 }
376 QueryPeerIter::ClosestDisjoint(iter) => iter.finish_paths(peers),
377 QueryPeerIter::Fixed(iter) => {
378 iter.finish();
379 true
380 }
381 }
382 }
383
384 pub(crate) fn finish(&mut self) {
389 match &mut self.peer_iter {
390 QueryPeerIter::Closest(iter) => iter.finish(),
391 QueryPeerIter::ClosestDisjoint(iter) => iter.finish(),
392 QueryPeerIter::Fixed(iter) => iter.finish(),
393 }
394 }
395
396 pub(crate) fn is_finished(&self) -> bool {
401 match &self.peer_iter {
402 QueryPeerIter::Closest(iter) => iter.is_finished(),
403 QueryPeerIter::ClosestDisjoint(iter) => iter.is_finished(),
404 QueryPeerIter::Fixed(iter) => iter.is_finished(),
405 }
406 }
407
408 pub(crate) fn into_result(self) -> QueryResult<TInner, impl Iterator<Item = PeerId>> {
410 let peers = match self.peer_iter {
411 QueryPeerIter::Closest(iter) => Either::Left(Either::Left(iter.into_result())),
412 QueryPeerIter::ClosestDisjoint(iter) => Either::Left(Either::Right(iter.into_result())),
413 QueryPeerIter::Fixed(iter) => Either::Right(iter.into_result()),
414 };
415 QueryResult {
416 peers,
417 inner: self.inner,
418 stats: self.stats,
419 }
420 }
421}
422
423pub(crate) struct QueryResult<TInner, TPeers> {
425 pub(crate) inner: TInner,
427 pub(crate) peers: TPeers,
429 pub(crate) stats: QueryStats,
431}
432
433#[derive(Clone, Debug, PartialEq, Eq)]
435pub struct QueryStats {
436 requests: u32,
437 success: u32,
438 failure: u32,
439 start: Option<Instant>,
440 end: Option<Instant>,
441}
442
443impl QueryStats {
444 pub fn empty() -> Self {
445 QueryStats {
446 requests: 0,
447 success: 0,
448 failure: 0,
449 start: None,
450 end: None,
451 }
452 }
453
454 pub fn num_requests(&self) -> u32 {
456 self.requests
457 }
458
459 pub fn num_successes(&self) -> u32 {
461 self.success
462 }
463
464 pub fn num_failures(&self) -> u32 {
466 self.failure
467 }
468
469 pub fn num_pending(&self) -> u32 {
474 self.requests - (self.success + self.failure)
475 }
476
477 pub fn duration(&self) -> Option<Duration> {
485 if let Some(s) = self.start {
486 if let Some(e) = self.end {
487 Some(e - s)
488 } else {
489 Some(Instant::now() - s)
490 }
491 } else {
492 None
493 }
494 }
495
496 pub fn merge(self, other: QueryStats) -> Self {
503 QueryStats {
504 requests: self.requests + other.requests,
505 success: self.success + other.success,
506 failure: self.failure + other.failure,
507 start: match (self.start, other.start) {
508 (Some(a), Some(b)) => Some(std::cmp::min(a, b)),
509 (a, b) => a.or(b),
510 },
511 end: std::cmp::max(self.end, other.end),
512 }
513 }
514}