1use crate::connection::{Connection, ConnectionId, PendingPoint};
22use crate::{
23 connection::{
24 Connected, ConnectionError, IncomingInfo, PendingConnectionError,
25 PendingInboundConnectionError, PendingOutboundConnectionError,
26 },
27 transport::TransportError,
28 ConnectedPoint, ConnectionHandler, Executor, Multiaddr, PeerId,
29};
30use concurrent_dial::ConcurrentDial;
31use fnv::FnvHashMap;
32use futures::prelude::*;
33use futures::stream::SelectAll;
34use futures::{
35 channel::{mpsc, oneshot},
36 future::{poll_fn, BoxFuture, Either},
37 ready,
38 stream::FuturesUnordered,
39};
40use instant::{Duration, Instant};
41use libp2p_core::connection::Endpoint;
42use libp2p_core::muxing::{StreamMuxerBox, StreamMuxerExt};
43use std::task::Waker;
44use std::{
45 collections::HashMap,
46 fmt,
47 num::{NonZeroU8, NonZeroUsize},
48 pin::Pin,
49 task::Context,
50 task::Poll,
51};
52use tracing::Instrument;
53use void::Void;
54
55mod concurrent_dial;
56mod task;
57
58enum ExecSwitch {
59 Executor(Box<dyn Executor + Send>),
60 LocalSpawn(FuturesUnordered<Pin<Box<dyn Future<Output = ()> + Send>>>),
61}
62
63impl ExecSwitch {
64 fn advance_local(&mut self, cx: &mut Context) {
65 match self {
66 ExecSwitch::Executor(_) => {}
67 ExecSwitch::LocalSpawn(local) => {
68 while let Poll::Ready(Some(())) = local.poll_next_unpin(cx) {}
69 }
70 }
71 }
72
73 fn spawn(&mut self, task: impl Future<Output = ()> + Send + 'static) {
74 let task = task.boxed();
75
76 match self {
77 Self::Executor(executor) => executor.exec(task),
78 Self::LocalSpawn(local) => local.push(task),
79 }
80 }
81}
82
83pub(crate) struct Pool<THandler>
85where
86 THandler: ConnectionHandler,
87{
88 local_id: PeerId,
89
90 counters: ConnectionCounters,
92
93 established: FnvHashMap<
95 PeerId,
96 FnvHashMap<ConnectionId, EstablishedConnection<THandler::FromBehaviour>>,
97 >,
98
99 pending: HashMap<ConnectionId, PendingConnection>,
101
102 task_command_buffer_size: usize,
104
105 dial_concurrency_factor: NonZeroU8,
107
108 substream_upgrade_protocol_override: Option<libp2p_core::upgrade::Version>,
110
111 max_negotiating_inbound_streams: usize,
115
116 per_connection_event_buffer_size: usize,
118
119 executor: ExecSwitch,
122
123 pending_connection_events_tx: mpsc::Sender<task::PendingConnectionEvent>,
126
127 pending_connection_events_rx: mpsc::Receiver<task::PendingConnectionEvent>,
129
130 no_established_connections_waker: Option<Waker>,
132
133 established_connection_events:
135 SelectAll<mpsc::Receiver<task::EstablishedConnectionEvent<THandler::ToBehaviour>>>,
136
137 new_connection_dropped_listeners: FuturesUnordered<oneshot::Receiver<StreamMuxerBox>>,
139
140 idle_connection_timeout: Duration,
142}
143
144#[derive(Debug)]
145pub(crate) struct EstablishedConnection<TInEvent> {
146 endpoint: ConnectedPoint,
147 sender: mpsc::Sender<task::Command<TInEvent>>,
149}
150
151impl<TInEvent> EstablishedConnection<TInEvent> {
152 pub(crate) fn notify_handler(&mut self, event: TInEvent) -> Result<(), TInEvent> {
163 let cmd = task::Command::NotifyHandler(event);
164 self.sender.try_send(cmd).map_err(|e| match e.into_inner() {
165 task::Command::NotifyHandler(event) => event,
166 _ => unreachable!("Expect failed send to return initial event."),
167 })
168 }
169
170 pub(crate) fn poll_ready_notify_handler(
177 &mut self,
178 cx: &mut Context<'_>,
179 ) -> Poll<Result<(), ()>> {
180 self.sender.poll_ready(cx).map_err(|_| ())
181 }
182
183 pub(crate) fn start_close(&mut self) {
187 match self.sender.clone().try_send(task::Command::Close) {
190 Ok(()) => {}
191 Err(e) => assert!(e.is_disconnected(), "No capacity for close command."),
192 };
193 }
194}
195
196struct PendingConnection {
197 peer_id: Option<PeerId>,
199 endpoint: PendingPoint,
200 abort_notifier: Option<oneshot::Sender<Void>>,
202 accepted_at: Instant,
204}
205
206impl PendingConnection {
207 fn is_for_same_remote_as(&self, other: PeerId) -> bool {
208 self.peer_id.map_or(false, |peer| peer == other)
209 }
210
211 fn abort(&mut self) {
213 if let Some(notifier) = self.abort_notifier.take() {
214 drop(notifier);
215 }
216 }
217}
218
219impl<THandler: ConnectionHandler> fmt::Debug for Pool<THandler> {
220 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
221 f.debug_struct("Pool")
222 .field("counters", &self.counters)
223 .finish()
224 }
225}
226
227#[derive(Debug)]
229pub(crate) enum PoolEvent<ToBehaviour> {
230 ConnectionEstablished {
232 id: ConnectionId,
233 peer_id: PeerId,
234 endpoint: ConnectedPoint,
235 connection: NewConnection,
236 concurrent_dial_errors: Option<Vec<(Multiaddr, TransportError<std::io::Error>)>>,
240 established_in: std::time::Duration,
242 },
243
244 ConnectionClosed {
256 id: ConnectionId,
257 connected: Connected,
259 error: Option<ConnectionError>,
262 remaining_established_connection_ids: Vec<ConnectionId>,
264 },
265
266 PendingOutboundConnectionError {
268 id: ConnectionId,
270 error: PendingOutboundConnectionError,
272 peer: Option<PeerId>,
274 },
275
276 PendingInboundConnectionError {
278 id: ConnectionId,
280 send_back_addr: Multiaddr,
282 local_addr: Multiaddr,
284 error: PendingInboundConnectionError,
286 },
287
288 ConnectionEvent {
290 id: ConnectionId,
291 peer_id: PeerId,
292 event: ToBehaviour,
294 },
295
296 AddressChange {
298 id: ConnectionId,
299 peer_id: PeerId,
300 new_endpoint: ConnectedPoint,
302 old_endpoint: ConnectedPoint,
304 },
305}
306
307impl<THandler> Pool<THandler>
308where
309 THandler: ConnectionHandler,
310{
311 pub(crate) fn new(local_id: PeerId, config: PoolConfig) -> Self {
313 let (pending_connection_events_tx, pending_connection_events_rx) = mpsc::channel(0);
314 let executor = match config.executor {
315 Some(exec) => ExecSwitch::Executor(exec),
316 None => ExecSwitch::LocalSpawn(Default::default()),
317 };
318 Pool {
319 local_id,
320 counters: ConnectionCounters::new(),
321 established: Default::default(),
322 pending: Default::default(),
323 task_command_buffer_size: config.task_command_buffer_size,
324 dial_concurrency_factor: config.dial_concurrency_factor,
325 substream_upgrade_protocol_override: config.substream_upgrade_protocol_override,
326 max_negotiating_inbound_streams: config.max_negotiating_inbound_streams,
327 per_connection_event_buffer_size: config.per_connection_event_buffer_size,
328 idle_connection_timeout: config.idle_connection_timeout,
329 executor,
330 pending_connection_events_tx,
331 pending_connection_events_rx,
332 no_established_connections_waker: None,
333 established_connection_events: Default::default(),
334 new_connection_dropped_listeners: Default::default(),
335 }
336 }
337
338 pub(crate) fn counters(&self) -> &ConnectionCounters {
340 &self.counters
341 }
342
343 pub(crate) fn get_established(
345 &mut self,
346 id: ConnectionId,
347 ) -> Option<&mut EstablishedConnection<THandler::FromBehaviour>> {
348 self.established
349 .values_mut()
350 .find_map(|connections| connections.get_mut(&id))
351 }
352
353 pub(crate) fn is_connected(&self, id: PeerId) -> bool {
357 self.established.contains_key(&id)
358 }
359
360 pub(crate) fn num_peers(&self) -> usize {
363 self.established.len()
364 }
365
366 pub(crate) fn disconnect(&mut self, peer: PeerId) {
372 if let Some(conns) = self.established.get_mut(&peer) {
373 for (_, conn) in conns.iter_mut() {
374 conn.start_close();
375 }
376 }
377
378 for connection in self
379 .pending
380 .iter_mut()
381 .filter_map(|(_, info)| info.is_for_same_remote_as(peer).then_some(info))
382 {
383 connection.abort()
384 }
385 }
386
387 pub(crate) fn iter_established_connections_of_peer(
389 &mut self,
390 peer: &PeerId,
391 ) -> impl Iterator<Item = ConnectionId> + '_ {
392 match self.established.get(peer) {
393 Some(conns) => either::Either::Left(conns.iter().map(|(id, _)| *id)),
394 None => either::Either::Right(std::iter::empty()),
395 }
396 }
397
398 pub(crate) fn is_dialing(&self, peer: PeerId) -> bool {
400 self.pending.iter().any(|(_, info)| {
401 matches!(info.endpoint, PendingPoint::Dialer { .. }) && info.is_for_same_remote_as(peer)
402 })
403 }
404
405 pub(crate) fn iter_connected(&self) -> impl Iterator<Item = &PeerId> {
408 self.established.keys()
409 }
410
411 pub(crate) fn add_outgoing(
414 &mut self,
415 dials: Vec<
416 BoxFuture<
417 'static,
418 (
419 Multiaddr,
420 Result<(PeerId, StreamMuxerBox), TransportError<std::io::Error>>,
421 ),
422 >,
423 >,
424 peer: Option<PeerId>,
425 role_override: Endpoint,
426 dial_concurrency_factor_override: Option<NonZeroU8>,
427 connection_id: ConnectionId,
428 ) {
429 let concurrency_factor =
430 dial_concurrency_factor_override.unwrap_or(self.dial_concurrency_factor);
431 let span = tracing::debug_span!(parent: tracing::Span::none(), "new_outgoing_connection", %concurrency_factor, num_dials=%dials.len(), id = %connection_id);
432 span.follows_from(tracing::Span::current());
433
434 let (abort_notifier, abort_receiver) = oneshot::channel();
435
436 self.executor.spawn(
437 task::new_for_pending_outgoing_connection(
438 connection_id,
439 ConcurrentDial::new(dials, concurrency_factor),
440 abort_receiver,
441 self.pending_connection_events_tx.clone(),
442 )
443 .instrument(span),
444 );
445
446 let endpoint = PendingPoint::Dialer { role_override };
447
448 self.counters.inc_pending(&endpoint);
449 self.pending.insert(
450 connection_id,
451 PendingConnection {
452 peer_id: peer,
453 endpoint,
454 abort_notifier: Some(abort_notifier),
455 accepted_at: Instant::now(),
456 },
457 );
458 }
459
460 pub(crate) fn add_incoming<TFut>(
463 &mut self,
464 future: TFut,
465 info: IncomingInfo<'_>,
466 connection_id: ConnectionId,
467 ) where
468 TFut: Future<Output = Result<(PeerId, StreamMuxerBox), std::io::Error>> + Send + 'static,
469 {
470 let endpoint = info.create_connected_point();
471
472 let (abort_notifier, abort_receiver) = oneshot::channel();
473
474 let span = tracing::debug_span!(parent: tracing::Span::none(), "new_incoming_connection", remote_addr = %info.send_back_addr, id = %connection_id);
475 span.follows_from(tracing::Span::current());
476
477 self.executor.spawn(
478 task::new_for_pending_incoming_connection(
479 connection_id,
480 future,
481 abort_receiver,
482 self.pending_connection_events_tx.clone(),
483 )
484 .instrument(span),
485 );
486
487 self.counters.inc_pending_incoming();
488 self.pending.insert(
489 connection_id,
490 PendingConnection {
491 peer_id: None,
492 endpoint: endpoint.into(),
493 abort_notifier: Some(abort_notifier),
494 accepted_at: Instant::now(),
495 },
496 );
497 }
498
499 pub(crate) fn spawn_connection(
500 &mut self,
501 id: ConnectionId,
502 obtained_peer_id: PeerId,
503 endpoint: &ConnectedPoint,
504 connection: NewConnection,
505 handler: THandler,
506 ) {
507 let connection = connection.extract();
508 let conns = self.established.entry(obtained_peer_id).or_default();
509 self.counters.inc_established(endpoint);
510
511 let (command_sender, command_receiver) = mpsc::channel(self.task_command_buffer_size);
512 let (event_sender, event_receiver) = mpsc::channel(self.per_connection_event_buffer_size);
513
514 conns.insert(
515 id,
516 EstablishedConnection {
517 endpoint: endpoint.clone(),
518 sender: command_sender,
519 },
520 );
521 self.established_connection_events.push(event_receiver);
522 if let Some(waker) = self.no_established_connections_waker.take() {
523 waker.wake();
524 }
525
526 let connection = Connection::new(
527 connection,
528 handler,
529 self.substream_upgrade_protocol_override,
530 self.max_negotiating_inbound_streams,
531 self.idle_connection_timeout,
532 );
533
534 let span = tracing::debug_span!(parent: tracing::Span::none(), "new_established_connection", remote_addr = %endpoint.get_remote_address(), %id, peer = %obtained_peer_id);
535 span.follows_from(tracing::Span::current());
536
537 self.executor.spawn(
538 task::new_for_established_connection(
539 id,
540 obtained_peer_id,
541 connection,
542 command_receiver,
543 event_sender,
544 )
545 .instrument(span),
546 )
547 }
548
549 #[tracing::instrument(level = "debug", name = "Pool::poll", skip(self, cx))]
551 pub(crate) fn poll(&mut self, cx: &mut Context<'_>) -> Poll<PoolEvent<THandler::ToBehaviour>>
552 where
553 THandler: ConnectionHandler + 'static,
554 <THandler as ConnectionHandler>::OutboundOpenInfo: Send,
555 {
556 match self.established_connection_events.poll_next_unpin(cx) {
561 Poll::Pending => {}
562 Poll::Ready(None) => {
563 self.no_established_connections_waker = Some(cx.waker().clone());
564 }
565
566 Poll::Ready(Some(task::EstablishedConnectionEvent::Notify { id, peer_id, event })) => {
567 return Poll::Ready(PoolEvent::ConnectionEvent { peer_id, id, event });
568 }
569 Poll::Ready(Some(task::EstablishedConnectionEvent::AddressChange {
570 id,
571 peer_id,
572 new_address,
573 })) => {
574 let connection = self
575 .established
576 .get_mut(&peer_id)
577 .expect("Receive `AddressChange` event for established peer.")
578 .get_mut(&id)
579 .expect("Receive `AddressChange` event from established connection");
580 let mut new_endpoint = connection.endpoint.clone();
581 new_endpoint.set_remote_address(new_address);
582 let old_endpoint =
583 std::mem::replace(&mut connection.endpoint, new_endpoint.clone());
584
585 return Poll::Ready(PoolEvent::AddressChange {
586 peer_id,
587 id,
588 new_endpoint,
589 old_endpoint,
590 });
591 }
592 Poll::Ready(Some(task::EstablishedConnectionEvent::Closed { id, peer_id, error })) => {
593 let connections = self
594 .established
595 .get_mut(&peer_id)
596 .expect("`Closed` event for established connection");
597 let EstablishedConnection { endpoint, .. } =
598 connections.remove(&id).expect("Connection to be present");
599 self.counters.dec_established(&endpoint);
600 let remaining_established_connection_ids: Vec<ConnectionId> =
601 connections.keys().cloned().collect();
602 if remaining_established_connection_ids.is_empty() {
603 self.established.remove(&peer_id);
604 }
605 return Poll::Ready(PoolEvent::ConnectionClosed {
606 id,
607 connected: Connected { endpoint, peer_id },
608 error,
609 remaining_established_connection_ids,
610 });
611 }
612 }
613
614 loop {
616 if let Poll::Ready(Some(result)) =
617 self.new_connection_dropped_listeners.poll_next_unpin(cx)
618 {
619 if let Ok(dropped_connection) = result {
620 self.executor.spawn(async move {
621 let _ = dropped_connection.close().await;
622 });
623 }
624 continue;
625 }
626
627 let event = match self.pending_connection_events_rx.poll_next_unpin(cx) {
628 Poll::Ready(Some(event)) => event,
629 Poll::Pending => break,
630 Poll::Ready(None) => unreachable!("Pool holds both sender and receiver."),
631 };
632
633 match event {
634 task::PendingConnectionEvent::ConnectionEstablished {
635 id,
636 output: (obtained_peer_id, mut muxer),
637 outgoing,
638 } => {
639 let PendingConnection {
640 peer_id: expected_peer_id,
641 endpoint,
642 abort_notifier: _,
643 accepted_at,
644 } = self
645 .pending
646 .remove(&id)
647 .expect("Entry in `self.pending` for previously pending connection.");
648
649 self.counters.dec_pending(&endpoint);
650
651 let (endpoint, concurrent_dial_errors) = match (endpoint, outgoing) {
652 (PendingPoint::Dialer { role_override }, Some((address, errors))) => (
653 ConnectedPoint::Dialer {
654 address,
655 role_override,
656 },
657 Some(errors),
658 ),
659 (
660 PendingPoint::Listener {
661 local_addr,
662 send_back_addr,
663 },
664 None,
665 ) => (
666 ConnectedPoint::Listener {
667 local_addr,
668 send_back_addr,
669 },
670 None,
671 ),
672 (PendingPoint::Dialer { .. }, None) => unreachable!(
673 "Established incoming connection via pending outgoing connection."
674 ),
675 (PendingPoint::Listener { .. }, Some(_)) => unreachable!(
676 "Established outgoing connection via pending incoming connection."
677 ),
678 };
679
680 let check_peer_id = || {
681 if let Some(peer) = expected_peer_id {
682 if peer != obtained_peer_id {
683 return Err(PendingConnectionError::WrongPeerId {
684 obtained: obtained_peer_id,
685 endpoint: endpoint.clone(),
686 });
687 }
688 }
689
690 if self.local_id == obtained_peer_id {
691 return Err(PendingConnectionError::LocalPeerId {
692 endpoint: endpoint.clone(),
693 });
694 }
695
696 Ok(())
697 };
698
699 if let Err(error) = check_peer_id() {
700 self.executor.spawn(poll_fn(move |cx| {
701 if let Err(e) = ready!(muxer.poll_close_unpin(cx)) {
702 tracing::debug!(
703 peer=%obtained_peer_id,
704 connection=%id,
705 "Failed to close connection to peer: {:?}",
706 e
707 );
708 }
709 Poll::Ready(())
710 }));
711
712 match endpoint {
713 ConnectedPoint::Dialer { .. } => {
714 return Poll::Ready(PoolEvent::PendingOutboundConnectionError {
715 id,
716 error: error
717 .map(|t| vec![(endpoint.get_remote_address().clone(), t)]),
718 peer: expected_peer_id.or(Some(obtained_peer_id)),
719 })
720 }
721 ConnectedPoint::Listener {
722 send_back_addr,
723 local_addr,
724 } => {
725 return Poll::Ready(PoolEvent::PendingInboundConnectionError {
726 id,
727 error,
728 send_back_addr,
729 local_addr,
730 })
731 }
732 };
733 }
734
735 let established_in = accepted_at.elapsed();
736
737 let (connection, drop_listener) = NewConnection::new(muxer);
738 self.new_connection_dropped_listeners.push(drop_listener);
739
740 return Poll::Ready(PoolEvent::ConnectionEstablished {
741 peer_id: obtained_peer_id,
742 endpoint,
743 id,
744 connection,
745 concurrent_dial_errors,
746 established_in,
747 });
748 }
749 task::PendingConnectionEvent::PendingFailed { id, error } => {
750 if let Some(PendingConnection {
751 peer_id,
752 endpoint,
753 abort_notifier: _,
754 accepted_at: _, }) = self.pending.remove(&id)
756 {
757 self.counters.dec_pending(&endpoint);
758
759 match (endpoint, error) {
760 (PendingPoint::Dialer { .. }, Either::Left(error)) => {
761 return Poll::Ready(PoolEvent::PendingOutboundConnectionError {
762 id,
763 error,
764 peer: peer_id,
765 });
766 }
767 (
768 PendingPoint::Listener {
769 send_back_addr,
770 local_addr,
771 },
772 Either::Right(error),
773 ) => {
774 return Poll::Ready(PoolEvent::PendingInboundConnectionError {
775 id,
776 error,
777 send_back_addr,
778 local_addr,
779 });
780 }
781 (PendingPoint::Dialer { .. }, Either::Right(_)) => {
782 unreachable!("Inbound error for outbound connection.")
783 }
784 (PendingPoint::Listener { .. }, Either::Left(_)) => {
785 unreachable!("Outbound error for inbound connection.")
786 }
787 }
788 }
789 }
790 }
791 }
792
793 self.executor.advance_local(cx);
794
795 Poll::Pending
796 }
797}
798
799#[derive(Debug)]
806pub(crate) struct NewConnection {
807 connection: Option<StreamMuxerBox>,
808 drop_sender: Option<oneshot::Sender<StreamMuxerBox>>,
809}
810
811impl NewConnection {
812 fn new(conn: StreamMuxerBox) -> (Self, oneshot::Receiver<StreamMuxerBox>) {
813 let (sender, receiver) = oneshot::channel();
814
815 (
816 Self {
817 connection: Some(conn),
818 drop_sender: Some(sender),
819 },
820 receiver,
821 )
822 }
823
824 fn extract(mut self) -> StreamMuxerBox {
825 self.connection.take().unwrap()
826 }
827}
828
829impl Drop for NewConnection {
830 fn drop(&mut self) {
831 if let Some(connection) = self.connection.take() {
832 let _ = self
833 .drop_sender
834 .take()
835 .expect("`drop_sender` to always be `Some`")
836 .send(connection);
837 }
838 }
839}
840
841#[derive(Debug, Clone)]
843pub struct ConnectionCounters {
844 pending_incoming: u32,
846 pending_outgoing: u32,
848 established_incoming: u32,
850 established_outgoing: u32,
852}
853
854impl ConnectionCounters {
855 fn new() -> Self {
856 Self {
857 pending_incoming: 0,
858 pending_outgoing: 0,
859 established_incoming: 0,
860 established_outgoing: 0,
861 }
862 }
863
864 pub fn num_connections(&self) -> u32 {
866 self.num_pending() + self.num_established()
867 }
868
869 pub fn num_pending(&self) -> u32 {
871 self.pending_incoming + self.pending_outgoing
872 }
873
874 pub fn num_pending_incoming(&self) -> u32 {
876 self.pending_incoming
877 }
878
879 pub fn num_pending_outgoing(&self) -> u32 {
881 self.pending_outgoing
882 }
883
884 pub fn num_established_incoming(&self) -> u32 {
886 self.established_incoming
887 }
888
889 pub fn num_established_outgoing(&self) -> u32 {
891 self.established_outgoing
892 }
893
894 pub fn num_established(&self) -> u32 {
896 self.established_outgoing + self.established_incoming
897 }
898
899 fn inc_pending(&mut self, endpoint: &PendingPoint) {
900 match endpoint {
901 PendingPoint::Dialer { .. } => {
902 self.pending_outgoing += 1;
903 }
904 PendingPoint::Listener { .. } => {
905 self.pending_incoming += 1;
906 }
907 }
908 }
909
910 fn inc_pending_incoming(&mut self) {
911 self.pending_incoming += 1;
912 }
913
914 fn dec_pending(&mut self, endpoint: &PendingPoint) {
915 match endpoint {
916 PendingPoint::Dialer { .. } => {
917 self.pending_outgoing -= 1;
918 }
919 PendingPoint::Listener { .. } => {
920 self.pending_incoming -= 1;
921 }
922 }
923 }
924
925 fn inc_established(&mut self, endpoint: &ConnectedPoint) {
926 match endpoint {
927 ConnectedPoint::Dialer { .. } => {
928 self.established_outgoing += 1;
929 }
930 ConnectedPoint::Listener { .. } => {
931 self.established_incoming += 1;
932 }
933 }
934 }
935
936 fn dec_established(&mut self, endpoint: &ConnectedPoint) {
937 match endpoint {
938 ConnectedPoint::Dialer { .. } => {
939 self.established_outgoing -= 1;
940 }
941 ConnectedPoint::Listener { .. } => {
942 self.established_incoming -= 1;
943 }
944 }
945 }
946}
947
948pub(crate) struct PoolConfig {
953 pub(crate) executor: Option<Box<dyn Executor + Send>>,
955 pub(crate) task_command_buffer_size: usize,
957 pub(crate) per_connection_event_buffer_size: usize,
960 pub(crate) dial_concurrency_factor: NonZeroU8,
962 pub(crate) idle_connection_timeout: Duration,
964 substream_upgrade_protocol_override: Option<libp2p_core::upgrade::Version>,
966
967 max_negotiating_inbound_streams: usize,
971}
972
973impl PoolConfig {
974 pub(crate) fn new(executor: Option<Box<dyn Executor + Send>>) -> Self {
975 Self {
976 executor,
977 task_command_buffer_size: 32,
978 per_connection_event_buffer_size: 7,
979 dial_concurrency_factor: NonZeroU8::new(8).expect("8 > 0"),
980 idle_connection_timeout: Duration::ZERO,
981 substream_upgrade_protocol_override: None,
982 max_negotiating_inbound_streams: 128,
983 }
984 }
985
986 pub(crate) fn with_notify_handler_buffer_size(mut self, n: NonZeroUsize) -> Self {
994 self.task_command_buffer_size = n.get() - 1;
995 self
996 }
997
998 pub(crate) fn with_per_connection_event_buffer_size(mut self, n: usize) -> Self {
1005 self.per_connection_event_buffer_size = n;
1006 self
1007 }
1008
1009 pub(crate) fn with_dial_concurrency_factor(mut self, factor: NonZeroU8) -> Self {
1011 self.dial_concurrency_factor = factor;
1012 self
1013 }
1014
1015 pub(crate) fn with_substream_upgrade_protocol_override(
1017 mut self,
1018 v: libp2p_core::upgrade::Version,
1019 ) -> Self {
1020 self.substream_upgrade_protocol_override = Some(v);
1021 self
1022 }
1023
1024 pub(crate) fn with_max_negotiating_inbound_streams(mut self, v: usize) -> Self {
1028 self.max_negotiating_inbound_streams = v;
1029 self
1030 }
1031}