libp2p_swarm/connection/
pool.rs

1// Copyright 2021 Protocol Labs.
2// Copyright 2018 Parity Technologies (UK) Ltd.
3//
4// Permission is hereby granted, free of charge, to any person obtaining a
5// copy of this software and associated documentation files (the "Software"),
6// to deal in the Software without restriction, including without limitation
7// the rights to use, copy, modify, merge, publish, distribute, sublicense,
8// and/or sell copies of the Software, and to permit persons to whom the
9// Software is furnished to do so, subject to the following conditions:
10//
11// The above copyright notice and this permission notice shall be included in
12// all copies or substantial portions of the Software.
13//
14// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
15// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
19// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
20// DEALINGS IN THE SOFTWARE.
21use crate::connection::{Connection, ConnectionId, PendingPoint};
22use crate::{
23    connection::{
24        Connected, ConnectionError, IncomingInfo, PendingConnectionError,
25        PendingInboundConnectionError, PendingOutboundConnectionError,
26    },
27    transport::TransportError,
28    ConnectedPoint, ConnectionHandler, Executor, Multiaddr, PeerId,
29};
30use concurrent_dial::ConcurrentDial;
31use fnv::FnvHashMap;
32use futures::prelude::*;
33use futures::stream::SelectAll;
34use futures::{
35    channel::{mpsc, oneshot},
36    future::{poll_fn, BoxFuture, Either},
37    ready,
38    stream::FuturesUnordered,
39};
40use instant::{Duration, Instant};
41use libp2p_core::connection::Endpoint;
42use libp2p_core::muxing::{StreamMuxerBox, StreamMuxerExt};
43use std::task::Waker;
44use std::{
45    collections::HashMap,
46    fmt,
47    num::{NonZeroU8, NonZeroUsize},
48    pin::Pin,
49    task::Context,
50    task::Poll,
51};
52use tracing::Instrument;
53use void::Void;
54
55mod concurrent_dial;
56mod task;
57
58enum ExecSwitch {
59    Executor(Box<dyn Executor + Send>),
60    LocalSpawn(FuturesUnordered<Pin<Box<dyn Future<Output = ()> + Send>>>),
61}
62
63impl ExecSwitch {
64    fn advance_local(&mut self, cx: &mut Context) {
65        match self {
66            ExecSwitch::Executor(_) => {}
67            ExecSwitch::LocalSpawn(local) => {
68                while let Poll::Ready(Some(())) = local.poll_next_unpin(cx) {}
69            }
70        }
71    }
72
73    fn spawn(&mut self, task: impl Future<Output = ()> + Send + 'static) {
74        let task = task.boxed();
75
76        match self {
77            Self::Executor(executor) => executor.exec(task),
78            Self::LocalSpawn(local) => local.push(task),
79        }
80    }
81}
82
83/// A connection `Pool` manages a set of connections for each peer.
84pub(crate) struct Pool<THandler>
85where
86    THandler: ConnectionHandler,
87{
88    local_id: PeerId,
89
90    /// The connection counter(s).
91    counters: ConnectionCounters,
92
93    /// The managed connections of each peer that are currently considered established.
94    established: FnvHashMap<
95        PeerId,
96        FnvHashMap<ConnectionId, EstablishedConnection<THandler::FromBehaviour>>,
97    >,
98
99    /// The pending connections that are currently being negotiated.
100    pending: HashMap<ConnectionId, PendingConnection>,
101
102    /// Size of the task command buffer (per task).
103    task_command_buffer_size: usize,
104
105    /// Number of addresses concurrently dialed for a single outbound connection attempt.
106    dial_concurrency_factor: NonZeroU8,
107
108    /// The configured override for substream protocol upgrades, if any.
109    substream_upgrade_protocol_override: Option<libp2p_core::upgrade::Version>,
110
111    /// The maximum number of inbound streams concurrently negotiating on a connection.
112    ///
113    /// See [`Connection::max_negotiating_inbound_streams`].
114    max_negotiating_inbound_streams: usize,
115
116    /// How many [`task::EstablishedConnectionEvent`]s can be buffered before the connection is back-pressured.
117    per_connection_event_buffer_size: usize,
118
119    /// The executor to use for running connection tasks. Can either be a global executor
120    /// or a local queue.
121    executor: ExecSwitch,
122
123    /// Sender distributed to pending tasks for reporting events back
124    /// to the pool.
125    pending_connection_events_tx: mpsc::Sender<task::PendingConnectionEvent>,
126
127    /// Receiver for events reported from pending tasks.
128    pending_connection_events_rx: mpsc::Receiver<task::PendingConnectionEvent>,
129
130    /// Waker in case we haven't established any connections yet.
131    no_established_connections_waker: Option<Waker>,
132
133    /// Receivers for events reported from established connections.
134    established_connection_events:
135        SelectAll<mpsc::Receiver<task::EstablishedConnectionEvent<THandler::ToBehaviour>>>,
136
137    /// Receivers for [`NewConnection`] objects that are dropped.
138    new_connection_dropped_listeners: FuturesUnordered<oneshot::Receiver<StreamMuxerBox>>,
139
140    /// How long a connection should be kept alive once it starts idling.
141    idle_connection_timeout: Duration,
142}
143
144#[derive(Debug)]
145pub(crate) struct EstablishedConnection<TInEvent> {
146    endpoint: ConnectedPoint,
147    /// Channel endpoint to send commands to the task.
148    sender: mpsc::Sender<task::Command<TInEvent>>,
149}
150
151impl<TInEvent> EstablishedConnection<TInEvent> {
152    /// (Asynchronously) sends an event to the connection handler.
153    ///
154    /// If the handler is not ready to receive the event, either because
155    /// it is busy or the connection is about to close, the given event
156    /// is returned with an `Err`.
157    ///
158    /// If execution of this method is preceded by successful execution of
159    /// `poll_ready_notify_handler` without another intervening execution
160    /// of `notify_handler`, it only fails if the connection is now about
161    /// to close.
162    pub(crate) fn notify_handler(&mut self, event: TInEvent) -> Result<(), TInEvent> {
163        let cmd = task::Command::NotifyHandler(event);
164        self.sender.try_send(cmd).map_err(|e| match e.into_inner() {
165            task::Command::NotifyHandler(event) => event,
166            _ => unreachable!("Expect failed send to return initial event."),
167        })
168    }
169
170    /// Checks if `notify_handler` is ready to accept an event.
171    ///
172    /// Returns `Ok(())` if the handler is ready to receive an event via `notify_handler`.
173    ///
174    /// Returns `Err(())` if the background task associated with the connection
175    /// is terminating and the connection is about to close.
176    pub(crate) fn poll_ready_notify_handler(
177        &mut self,
178        cx: &mut Context<'_>,
179    ) -> Poll<Result<(), ()>> {
180        self.sender.poll_ready(cx).map_err(|_| ())
181    }
182
183    /// Initiates a graceful close of the connection.
184    ///
185    /// Has no effect if the connection is already closing.
186    pub(crate) fn start_close(&mut self) {
187        // Clone the sender so that we are guaranteed to have
188        // capacity for the close command (every sender gets a slot).
189        match self.sender.clone().try_send(task::Command::Close) {
190            Ok(()) => {}
191            Err(e) => assert!(e.is_disconnected(), "No capacity for close command."),
192        };
193    }
194}
195
196struct PendingConnection {
197    /// [`PeerId`] of the remote peer.
198    peer_id: Option<PeerId>,
199    endpoint: PendingPoint,
200    /// When dropped, notifies the task which then knows to terminate.
201    abort_notifier: Option<oneshot::Sender<Void>>,
202    /// The moment we became aware of this possible connection, useful for timing metrics.
203    accepted_at: Instant,
204}
205
206impl PendingConnection {
207    fn is_for_same_remote_as(&self, other: PeerId) -> bool {
208        self.peer_id.map_or(false, |peer| peer == other)
209    }
210
211    /// Aborts the connection attempt, closing the connection.
212    fn abort(&mut self) {
213        if let Some(notifier) = self.abort_notifier.take() {
214            drop(notifier);
215        }
216    }
217}
218
219impl<THandler: ConnectionHandler> fmt::Debug for Pool<THandler> {
220    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
221        f.debug_struct("Pool")
222            .field("counters", &self.counters)
223            .finish()
224    }
225}
226
227/// Event that can happen on the `Pool`.
228#[derive(Debug)]
229pub(crate) enum PoolEvent<ToBehaviour> {
230    /// A new connection has been established.
231    ConnectionEstablished {
232        id: ConnectionId,
233        peer_id: PeerId,
234        endpoint: ConnectedPoint,
235        connection: NewConnection,
236        /// [`Some`] when the new connection is an outgoing connection.
237        /// Addresses are dialed in parallel. Contains the addresses and errors
238        /// of dial attempts that failed before the one successful dial.
239        concurrent_dial_errors: Option<Vec<(Multiaddr, TransportError<std::io::Error>)>>,
240        /// How long it took to establish this connection.
241        established_in: std::time::Duration,
242    },
243
244    /// An established connection was closed.
245    ///
246    /// A connection may close if
247    ///
248    ///   * it encounters an error, which includes the connection being
249    ///     closed by the remote. In this case `error` is `Some`.
250    ///   * it was actively closed by [`EstablishedConnection::start_close`],
251    ///     i.e. a successful, orderly close.
252    ///   * it was actively closed by [`Pool::disconnect`], i.e.
253    ///     dropped without an orderly close.
254    ///
255    ConnectionClosed {
256        id: ConnectionId,
257        /// Information about the connection that errored.
258        connected: Connected,
259        /// The error that occurred, if any. If `None`, the connection
260        /// was closed by the local peer.
261        error: Option<ConnectionError>,
262        /// The remaining established connections to the same peer.
263        remaining_established_connection_ids: Vec<ConnectionId>,
264    },
265
266    /// An outbound connection attempt failed.
267    PendingOutboundConnectionError {
268        /// The ID of the failed connection.
269        id: ConnectionId,
270        /// The error that occurred.
271        error: PendingOutboundConnectionError,
272        /// The (expected) peer of the failed connection.
273        peer: Option<PeerId>,
274    },
275
276    /// An inbound connection attempt failed.
277    PendingInboundConnectionError {
278        /// The ID of the failed connection.
279        id: ConnectionId,
280        /// Address used to send back data to the remote.
281        send_back_addr: Multiaddr,
282        /// Local connection address.
283        local_addr: Multiaddr,
284        /// The error that occurred.
285        error: PendingInboundConnectionError,
286    },
287
288    /// A node has produced an event.
289    ConnectionEvent {
290        id: ConnectionId,
291        peer_id: PeerId,
292        /// The produced event.
293        event: ToBehaviour,
294    },
295
296    /// The connection to a node has changed its address.
297    AddressChange {
298        id: ConnectionId,
299        peer_id: PeerId,
300        /// The new endpoint.
301        new_endpoint: ConnectedPoint,
302        /// The old endpoint.
303        old_endpoint: ConnectedPoint,
304    },
305}
306
307impl<THandler> Pool<THandler>
308where
309    THandler: ConnectionHandler,
310{
311    /// Creates a new empty `Pool`.
312    pub(crate) fn new(local_id: PeerId, config: PoolConfig) -> Self {
313        let (pending_connection_events_tx, pending_connection_events_rx) = mpsc::channel(0);
314        let executor = match config.executor {
315            Some(exec) => ExecSwitch::Executor(exec),
316            None => ExecSwitch::LocalSpawn(Default::default()),
317        };
318        Pool {
319            local_id,
320            counters: ConnectionCounters::new(),
321            established: Default::default(),
322            pending: Default::default(),
323            task_command_buffer_size: config.task_command_buffer_size,
324            dial_concurrency_factor: config.dial_concurrency_factor,
325            substream_upgrade_protocol_override: config.substream_upgrade_protocol_override,
326            max_negotiating_inbound_streams: config.max_negotiating_inbound_streams,
327            per_connection_event_buffer_size: config.per_connection_event_buffer_size,
328            idle_connection_timeout: config.idle_connection_timeout,
329            executor,
330            pending_connection_events_tx,
331            pending_connection_events_rx,
332            no_established_connections_waker: None,
333            established_connection_events: Default::default(),
334            new_connection_dropped_listeners: Default::default(),
335        }
336    }
337
338    /// Gets the dedicated connection counters.
339    pub(crate) fn counters(&self) -> &ConnectionCounters {
340        &self.counters
341    }
342
343    /// Gets an established connection from the pool by ID.
344    pub(crate) fn get_established(
345        &mut self,
346        id: ConnectionId,
347    ) -> Option<&mut EstablishedConnection<THandler::FromBehaviour>> {
348        self.established
349            .values_mut()
350            .find_map(|connections| connections.get_mut(&id))
351    }
352
353    /// Returns true if we are connected to the given peer.
354    ///
355    /// This will return true only after a `NodeReached` event has been produced by `poll()`.
356    pub(crate) fn is_connected(&self, id: PeerId) -> bool {
357        self.established.contains_key(&id)
358    }
359
360    /// Returns the number of connected peers, i.e. those with at least one
361    /// established connection in the pool.
362    pub(crate) fn num_peers(&self) -> usize {
363        self.established.len()
364    }
365
366    /// (Forcefully) close all connections to the given peer.
367    ///
368    /// All connections to the peer, whether pending or established are
369    /// closed asap and no more events from these connections are emitted
370    /// by the pool effective immediately.
371    pub(crate) fn disconnect(&mut self, peer: PeerId) {
372        if let Some(conns) = self.established.get_mut(&peer) {
373            for (_, conn) in conns.iter_mut() {
374                conn.start_close();
375            }
376        }
377
378        for connection in self
379            .pending
380            .iter_mut()
381            .filter_map(|(_, info)| info.is_for_same_remote_as(peer).then_some(info))
382        {
383            connection.abort()
384        }
385    }
386
387    /// Returns an iterator over all established connections of `peer`.
388    pub(crate) fn iter_established_connections_of_peer(
389        &mut self,
390        peer: &PeerId,
391    ) -> impl Iterator<Item = ConnectionId> + '_ {
392        match self.established.get(peer) {
393            Some(conns) => either::Either::Left(conns.iter().map(|(id, _)| *id)),
394            None => either::Either::Right(std::iter::empty()),
395        }
396    }
397
398    /// Checks whether we are currently dialing the given peer.
399    pub(crate) fn is_dialing(&self, peer: PeerId) -> bool {
400        self.pending.iter().any(|(_, info)| {
401            matches!(info.endpoint, PendingPoint::Dialer { .. }) && info.is_for_same_remote_as(peer)
402        })
403    }
404
405    /// Returns an iterator over all connected peers, i.e. those that have
406    /// at least one established connection in the pool.
407    pub(crate) fn iter_connected(&self) -> impl Iterator<Item = &PeerId> {
408        self.established.keys()
409    }
410
411    /// Adds a pending outgoing connection to the pool in the form of a `Future`
412    /// that establishes and negotiates the connection.
413    pub(crate) fn add_outgoing(
414        &mut self,
415        dials: Vec<
416            BoxFuture<
417                'static,
418                (
419                    Multiaddr,
420                    Result<(PeerId, StreamMuxerBox), TransportError<std::io::Error>>,
421                ),
422            >,
423        >,
424        peer: Option<PeerId>,
425        role_override: Endpoint,
426        dial_concurrency_factor_override: Option<NonZeroU8>,
427        connection_id: ConnectionId,
428    ) {
429        let concurrency_factor =
430            dial_concurrency_factor_override.unwrap_or(self.dial_concurrency_factor);
431        let span = tracing::debug_span!(parent: tracing::Span::none(), "new_outgoing_connection", %concurrency_factor, num_dials=%dials.len(), id = %connection_id);
432        span.follows_from(tracing::Span::current());
433
434        let (abort_notifier, abort_receiver) = oneshot::channel();
435
436        self.executor.spawn(
437            task::new_for_pending_outgoing_connection(
438                connection_id,
439                ConcurrentDial::new(dials, concurrency_factor),
440                abort_receiver,
441                self.pending_connection_events_tx.clone(),
442            )
443            .instrument(span),
444        );
445
446        let endpoint = PendingPoint::Dialer { role_override };
447
448        self.counters.inc_pending(&endpoint);
449        self.pending.insert(
450            connection_id,
451            PendingConnection {
452                peer_id: peer,
453                endpoint,
454                abort_notifier: Some(abort_notifier),
455                accepted_at: Instant::now(),
456            },
457        );
458    }
459
460    /// Adds a pending incoming connection to the pool in the form of a
461    /// `Future` that establishes and negotiates the connection.
462    pub(crate) fn add_incoming<TFut>(
463        &mut self,
464        future: TFut,
465        info: IncomingInfo<'_>,
466        connection_id: ConnectionId,
467    ) where
468        TFut: Future<Output = Result<(PeerId, StreamMuxerBox), std::io::Error>> + Send + 'static,
469    {
470        let endpoint = info.create_connected_point();
471
472        let (abort_notifier, abort_receiver) = oneshot::channel();
473
474        let span = tracing::debug_span!(parent: tracing::Span::none(), "new_incoming_connection", remote_addr = %info.send_back_addr, id = %connection_id);
475        span.follows_from(tracing::Span::current());
476
477        self.executor.spawn(
478            task::new_for_pending_incoming_connection(
479                connection_id,
480                future,
481                abort_receiver,
482                self.pending_connection_events_tx.clone(),
483            )
484            .instrument(span),
485        );
486
487        self.counters.inc_pending_incoming();
488        self.pending.insert(
489            connection_id,
490            PendingConnection {
491                peer_id: None,
492                endpoint: endpoint.into(),
493                abort_notifier: Some(abort_notifier),
494                accepted_at: Instant::now(),
495            },
496        );
497    }
498
499    pub(crate) fn spawn_connection(
500        &mut self,
501        id: ConnectionId,
502        obtained_peer_id: PeerId,
503        endpoint: &ConnectedPoint,
504        connection: NewConnection,
505        handler: THandler,
506    ) {
507        let connection = connection.extract();
508        let conns = self.established.entry(obtained_peer_id).or_default();
509        self.counters.inc_established(endpoint);
510
511        let (command_sender, command_receiver) = mpsc::channel(self.task_command_buffer_size);
512        let (event_sender, event_receiver) = mpsc::channel(self.per_connection_event_buffer_size);
513
514        conns.insert(
515            id,
516            EstablishedConnection {
517                endpoint: endpoint.clone(),
518                sender: command_sender,
519            },
520        );
521        self.established_connection_events.push(event_receiver);
522        if let Some(waker) = self.no_established_connections_waker.take() {
523            waker.wake();
524        }
525
526        let connection = Connection::new(
527            connection,
528            handler,
529            self.substream_upgrade_protocol_override,
530            self.max_negotiating_inbound_streams,
531            self.idle_connection_timeout,
532        );
533
534        let span = tracing::debug_span!(parent: tracing::Span::none(), "new_established_connection", remote_addr = %endpoint.get_remote_address(), %id, peer = %obtained_peer_id);
535        span.follows_from(tracing::Span::current());
536
537        self.executor.spawn(
538            task::new_for_established_connection(
539                id,
540                obtained_peer_id,
541                connection,
542                command_receiver,
543                event_sender,
544            )
545            .instrument(span),
546        )
547    }
548
549    /// Polls the connection pool for events.
550    #[tracing::instrument(level = "debug", name = "Pool::poll", skip(self, cx))]
551    pub(crate) fn poll(&mut self, cx: &mut Context<'_>) -> Poll<PoolEvent<THandler::ToBehaviour>>
552    where
553        THandler: ConnectionHandler + 'static,
554        <THandler as ConnectionHandler>::OutboundOpenInfo: Send,
555    {
556        // Poll for events of established connections.
557        //
558        // Note that established connections are polled before pending connections, thus
559        // prioritizing established connections over pending connections.
560        match self.established_connection_events.poll_next_unpin(cx) {
561            Poll::Pending => {}
562            Poll::Ready(None) => {
563                self.no_established_connections_waker = Some(cx.waker().clone());
564            }
565
566            Poll::Ready(Some(task::EstablishedConnectionEvent::Notify { id, peer_id, event })) => {
567                return Poll::Ready(PoolEvent::ConnectionEvent { peer_id, id, event });
568            }
569            Poll::Ready(Some(task::EstablishedConnectionEvent::AddressChange {
570                id,
571                peer_id,
572                new_address,
573            })) => {
574                let connection = self
575                    .established
576                    .get_mut(&peer_id)
577                    .expect("Receive `AddressChange` event for established peer.")
578                    .get_mut(&id)
579                    .expect("Receive `AddressChange` event from established connection");
580                let mut new_endpoint = connection.endpoint.clone();
581                new_endpoint.set_remote_address(new_address);
582                let old_endpoint =
583                    std::mem::replace(&mut connection.endpoint, new_endpoint.clone());
584
585                return Poll::Ready(PoolEvent::AddressChange {
586                    peer_id,
587                    id,
588                    new_endpoint,
589                    old_endpoint,
590                });
591            }
592            Poll::Ready(Some(task::EstablishedConnectionEvent::Closed { id, peer_id, error })) => {
593                let connections = self
594                    .established
595                    .get_mut(&peer_id)
596                    .expect("`Closed` event for established connection");
597                let EstablishedConnection { endpoint, .. } =
598                    connections.remove(&id).expect("Connection to be present");
599                self.counters.dec_established(&endpoint);
600                let remaining_established_connection_ids: Vec<ConnectionId> =
601                    connections.keys().cloned().collect();
602                if remaining_established_connection_ids.is_empty() {
603                    self.established.remove(&peer_id);
604                }
605                return Poll::Ready(PoolEvent::ConnectionClosed {
606                    id,
607                    connected: Connected { endpoint, peer_id },
608                    error,
609                    remaining_established_connection_ids,
610                });
611            }
612        }
613
614        // Poll for events of pending connections.
615        loop {
616            if let Poll::Ready(Some(result)) =
617                self.new_connection_dropped_listeners.poll_next_unpin(cx)
618            {
619                if let Ok(dropped_connection) = result {
620                    self.executor.spawn(async move {
621                        let _ = dropped_connection.close().await;
622                    });
623                }
624                continue;
625            }
626
627            let event = match self.pending_connection_events_rx.poll_next_unpin(cx) {
628                Poll::Ready(Some(event)) => event,
629                Poll::Pending => break,
630                Poll::Ready(None) => unreachable!("Pool holds both sender and receiver."),
631            };
632
633            match event {
634                task::PendingConnectionEvent::ConnectionEstablished {
635                    id,
636                    output: (obtained_peer_id, mut muxer),
637                    outgoing,
638                } => {
639                    let PendingConnection {
640                        peer_id: expected_peer_id,
641                        endpoint,
642                        abort_notifier: _,
643                        accepted_at,
644                    } = self
645                        .pending
646                        .remove(&id)
647                        .expect("Entry in `self.pending` for previously pending connection.");
648
649                    self.counters.dec_pending(&endpoint);
650
651                    let (endpoint, concurrent_dial_errors) = match (endpoint, outgoing) {
652                        (PendingPoint::Dialer { role_override }, Some((address, errors))) => (
653                            ConnectedPoint::Dialer {
654                                address,
655                                role_override,
656                            },
657                            Some(errors),
658                        ),
659                        (
660                            PendingPoint::Listener {
661                                local_addr,
662                                send_back_addr,
663                            },
664                            None,
665                        ) => (
666                            ConnectedPoint::Listener {
667                                local_addr,
668                                send_back_addr,
669                            },
670                            None,
671                        ),
672                        (PendingPoint::Dialer { .. }, None) => unreachable!(
673                            "Established incoming connection via pending outgoing connection."
674                        ),
675                        (PendingPoint::Listener { .. }, Some(_)) => unreachable!(
676                            "Established outgoing connection via pending incoming connection."
677                        ),
678                    };
679
680                    let check_peer_id = || {
681                        if let Some(peer) = expected_peer_id {
682                            if peer != obtained_peer_id {
683                                return Err(PendingConnectionError::WrongPeerId {
684                                    obtained: obtained_peer_id,
685                                    endpoint: endpoint.clone(),
686                                });
687                            }
688                        }
689
690                        if self.local_id == obtained_peer_id {
691                            return Err(PendingConnectionError::LocalPeerId {
692                                endpoint: endpoint.clone(),
693                            });
694                        }
695
696                        Ok(())
697                    };
698
699                    if let Err(error) = check_peer_id() {
700                        self.executor.spawn(poll_fn(move |cx| {
701                            if let Err(e) = ready!(muxer.poll_close_unpin(cx)) {
702                                tracing::debug!(
703                                    peer=%obtained_peer_id,
704                                    connection=%id,
705                                    "Failed to close connection to peer: {:?}",
706                                    e
707                                );
708                            }
709                            Poll::Ready(())
710                        }));
711
712                        match endpoint {
713                            ConnectedPoint::Dialer { .. } => {
714                                return Poll::Ready(PoolEvent::PendingOutboundConnectionError {
715                                    id,
716                                    error: error
717                                        .map(|t| vec![(endpoint.get_remote_address().clone(), t)]),
718                                    peer: expected_peer_id.or(Some(obtained_peer_id)),
719                                })
720                            }
721                            ConnectedPoint::Listener {
722                                send_back_addr,
723                                local_addr,
724                            } => {
725                                return Poll::Ready(PoolEvent::PendingInboundConnectionError {
726                                    id,
727                                    error,
728                                    send_back_addr,
729                                    local_addr,
730                                })
731                            }
732                        };
733                    }
734
735                    let established_in = accepted_at.elapsed();
736
737                    let (connection, drop_listener) = NewConnection::new(muxer);
738                    self.new_connection_dropped_listeners.push(drop_listener);
739
740                    return Poll::Ready(PoolEvent::ConnectionEstablished {
741                        peer_id: obtained_peer_id,
742                        endpoint,
743                        id,
744                        connection,
745                        concurrent_dial_errors,
746                        established_in,
747                    });
748                }
749                task::PendingConnectionEvent::PendingFailed { id, error } => {
750                    if let Some(PendingConnection {
751                        peer_id,
752                        endpoint,
753                        abort_notifier: _,
754                        accepted_at: _, // Ignoring the time it took for the connection to fail.
755                    }) = self.pending.remove(&id)
756                    {
757                        self.counters.dec_pending(&endpoint);
758
759                        match (endpoint, error) {
760                            (PendingPoint::Dialer { .. }, Either::Left(error)) => {
761                                return Poll::Ready(PoolEvent::PendingOutboundConnectionError {
762                                    id,
763                                    error,
764                                    peer: peer_id,
765                                });
766                            }
767                            (
768                                PendingPoint::Listener {
769                                    send_back_addr,
770                                    local_addr,
771                                },
772                                Either::Right(error),
773                            ) => {
774                                return Poll::Ready(PoolEvent::PendingInboundConnectionError {
775                                    id,
776                                    error,
777                                    send_back_addr,
778                                    local_addr,
779                                });
780                            }
781                            (PendingPoint::Dialer { .. }, Either::Right(_)) => {
782                                unreachable!("Inbound error for outbound connection.")
783                            }
784                            (PendingPoint::Listener { .. }, Either::Left(_)) => {
785                                unreachable!("Outbound error for inbound connection.")
786                            }
787                        }
788                    }
789                }
790            }
791        }
792
793        self.executor.advance_local(cx);
794
795        Poll::Pending
796    }
797}
798
799/// Opaque type for a new connection.
800///
801/// This connection has just been established but isn't part of the [`Pool`] yet.
802/// It either needs to be spawned via [`Pool::spawn_connection`] or dropped if undesired.
803///
804/// On drop, this type send the connection back to the [`Pool`] where it will be gracefully closed.
805#[derive(Debug)]
806pub(crate) struct NewConnection {
807    connection: Option<StreamMuxerBox>,
808    drop_sender: Option<oneshot::Sender<StreamMuxerBox>>,
809}
810
811impl NewConnection {
812    fn new(conn: StreamMuxerBox) -> (Self, oneshot::Receiver<StreamMuxerBox>) {
813        let (sender, receiver) = oneshot::channel();
814
815        (
816            Self {
817                connection: Some(conn),
818                drop_sender: Some(sender),
819            },
820            receiver,
821        )
822    }
823
824    fn extract(mut self) -> StreamMuxerBox {
825        self.connection.take().unwrap()
826    }
827}
828
829impl Drop for NewConnection {
830    fn drop(&mut self) {
831        if let Some(connection) = self.connection.take() {
832            let _ = self
833                .drop_sender
834                .take()
835                .expect("`drop_sender` to always be `Some`")
836                .send(connection);
837        }
838    }
839}
840
841/// Network connection information.
842#[derive(Debug, Clone)]
843pub struct ConnectionCounters {
844    /// The current number of incoming connections.
845    pending_incoming: u32,
846    /// The current number of outgoing connections.
847    pending_outgoing: u32,
848    /// The current number of established inbound connections.
849    established_incoming: u32,
850    /// The current number of established outbound connections.
851    established_outgoing: u32,
852}
853
854impl ConnectionCounters {
855    fn new() -> Self {
856        Self {
857            pending_incoming: 0,
858            pending_outgoing: 0,
859            established_incoming: 0,
860            established_outgoing: 0,
861        }
862    }
863
864    /// The total number of connections, both pending and established.
865    pub fn num_connections(&self) -> u32 {
866        self.num_pending() + self.num_established()
867    }
868
869    /// The total number of pending connections, both incoming and outgoing.
870    pub fn num_pending(&self) -> u32 {
871        self.pending_incoming + self.pending_outgoing
872    }
873
874    /// The number of incoming connections being established.
875    pub fn num_pending_incoming(&self) -> u32 {
876        self.pending_incoming
877    }
878
879    /// The number of outgoing connections being established.
880    pub fn num_pending_outgoing(&self) -> u32 {
881        self.pending_outgoing
882    }
883
884    /// The number of established incoming connections.
885    pub fn num_established_incoming(&self) -> u32 {
886        self.established_incoming
887    }
888
889    /// The number of established outgoing connections.
890    pub fn num_established_outgoing(&self) -> u32 {
891        self.established_outgoing
892    }
893
894    /// The total number of established connections.
895    pub fn num_established(&self) -> u32 {
896        self.established_outgoing + self.established_incoming
897    }
898
899    fn inc_pending(&mut self, endpoint: &PendingPoint) {
900        match endpoint {
901            PendingPoint::Dialer { .. } => {
902                self.pending_outgoing += 1;
903            }
904            PendingPoint::Listener { .. } => {
905                self.pending_incoming += 1;
906            }
907        }
908    }
909
910    fn inc_pending_incoming(&mut self) {
911        self.pending_incoming += 1;
912    }
913
914    fn dec_pending(&mut self, endpoint: &PendingPoint) {
915        match endpoint {
916            PendingPoint::Dialer { .. } => {
917                self.pending_outgoing -= 1;
918            }
919            PendingPoint::Listener { .. } => {
920                self.pending_incoming -= 1;
921            }
922        }
923    }
924
925    fn inc_established(&mut self, endpoint: &ConnectedPoint) {
926        match endpoint {
927            ConnectedPoint::Dialer { .. } => {
928                self.established_outgoing += 1;
929            }
930            ConnectedPoint::Listener { .. } => {
931                self.established_incoming += 1;
932            }
933        }
934    }
935
936    fn dec_established(&mut self, endpoint: &ConnectedPoint) {
937        match endpoint {
938            ConnectedPoint::Dialer { .. } => {
939                self.established_outgoing -= 1;
940            }
941            ConnectedPoint::Listener { .. } => {
942                self.established_incoming -= 1;
943            }
944        }
945    }
946}
947
948/// Configuration options when creating a [`Pool`].
949///
950/// The default configuration specifies no dedicated task executor, a
951/// task event buffer size of 32, and a task command buffer size of 7.
952pub(crate) struct PoolConfig {
953    /// Executor to use to spawn tasks.
954    pub(crate) executor: Option<Box<dyn Executor + Send>>,
955    /// Size of the task command buffer (per task).
956    pub(crate) task_command_buffer_size: usize,
957    /// Size of the pending connection task event buffer and the established connection task event
958    /// buffer.
959    pub(crate) per_connection_event_buffer_size: usize,
960    /// Number of addresses concurrently dialed for a single outbound connection attempt.
961    pub(crate) dial_concurrency_factor: NonZeroU8,
962    /// How long a connection should be kept alive once it is idling.
963    pub(crate) idle_connection_timeout: Duration,
964    /// The configured override for substream protocol upgrades, if any.
965    substream_upgrade_protocol_override: Option<libp2p_core::upgrade::Version>,
966
967    /// The maximum number of inbound streams concurrently negotiating on a connection.
968    ///
969    /// See [`Connection::max_negotiating_inbound_streams`].
970    max_negotiating_inbound_streams: usize,
971}
972
973impl PoolConfig {
974    pub(crate) fn new(executor: Option<Box<dyn Executor + Send>>) -> Self {
975        Self {
976            executor,
977            task_command_buffer_size: 32,
978            per_connection_event_buffer_size: 7,
979            dial_concurrency_factor: NonZeroU8::new(8).expect("8 > 0"),
980            idle_connection_timeout: Duration::ZERO,
981            substream_upgrade_protocol_override: None,
982            max_negotiating_inbound_streams: 128,
983        }
984    }
985
986    /// Sets the maximum number of events sent to a connection's background task
987    /// that may be buffered, if the task cannot keep up with their consumption and
988    /// delivery to the connection handler.
989    ///
990    /// When the buffer for a particular connection is full, `notify_handler` will no
991    /// longer be able to deliver events to the associated [`Connection`],
992    /// thus exerting back-pressure on the connection and peer API.
993    pub(crate) fn with_notify_handler_buffer_size(mut self, n: NonZeroUsize) -> Self {
994        self.task_command_buffer_size = n.get() - 1;
995        self
996    }
997
998    /// Sets the maximum number of buffered connection events (beyond a guaranteed
999    /// buffer of 1 event per connection).
1000    ///
1001    /// When the buffer is full, the background tasks of all connections will stall.
1002    /// In this way, the consumers of network events exert back-pressure on
1003    /// the network connection I/O.
1004    pub(crate) fn with_per_connection_event_buffer_size(mut self, n: usize) -> Self {
1005        self.per_connection_event_buffer_size = n;
1006        self
1007    }
1008
1009    /// Number of addresses concurrently dialed for a single outbound connection attempt.
1010    pub(crate) fn with_dial_concurrency_factor(mut self, factor: NonZeroU8) -> Self {
1011        self.dial_concurrency_factor = factor;
1012        self
1013    }
1014
1015    /// Configures an override for the substream upgrade protocol to use.
1016    pub(crate) fn with_substream_upgrade_protocol_override(
1017        mut self,
1018        v: libp2p_core::upgrade::Version,
1019    ) -> Self {
1020        self.substream_upgrade_protocol_override = Some(v);
1021        self
1022    }
1023
1024    /// The maximum number of inbound streams concurrently negotiating on a connection.
1025    ///
1026    /// See [`Connection::max_negotiating_inbound_streams`].
1027    pub(crate) fn with_max_negotiating_inbound_streams(mut self, v: usize) -> Self {
1028        self.max_negotiating_inbound_streams = v;
1029        self
1030    }
1031}