libp2p_swarm/
lib.rs

1// Copyright 2019 Parity Technologies (UK) Ltd.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21//! High-level network manager.
22//!
23//! A [`Swarm`] contains the state of the network as a whole. The entire
24//! behaviour of a libp2p network can be controlled through the `Swarm`.
25//! The `Swarm` struct contains all active and pending connections to
26//! remotes and manages the state of all the substreams that have been
27//! opened, and all the upgrades that were built upon these substreams.
28//!
29//! # Initializing a Swarm
30//!
31//! Creating a `Swarm` requires three things:
32//!
33//!  1. A network identity of the local node in form of a [`PeerId`].
34//!  2. An implementation of the [`Transport`] trait. This is the type that
35//!     will be used in order to reach nodes on the network based on their
36//!     address. See the `transport` module for more information.
37//!  3. An implementation of the [`NetworkBehaviour`] trait. This is a state
38//!     machine that defines how the swarm should behave once it is connected
39//!     to a node.
40//!
41//! # Network Behaviour
42//!
43//! The [`NetworkBehaviour`] trait is implemented on types that indicate to
44//! the swarm how it should behave. This includes which protocols are supported
45//! and which nodes to try to connect to. It is the `NetworkBehaviour` that
46//! controls what happens on the network. Multiple types that implement
47//! `NetworkBehaviour` can be composed into a single behaviour.
48//!
49//! # Protocols Handler
50//!
51//! The [`ConnectionHandler`] trait defines how each active connection to a
52//! remote should behave: how to handle incoming substreams, which protocols
53//! are supported, when to open a new outbound substream, etc.
54//!
55
56#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))]
57
58mod connection;
59mod executor;
60mod stream;
61mod stream_protocol;
62#[cfg(test)]
63mod test;
64mod upgrade;
65
66pub mod behaviour;
67pub mod dial_opts;
68pub mod dummy;
69pub mod handler;
70mod listen_opts;
71
72/// Bundles all symbols required for the [`libp2p_swarm_derive::NetworkBehaviour`] macro.
73#[doc(hidden)]
74pub mod derive_prelude {
75    pub use crate::behaviour::AddressChange;
76    pub use crate::behaviour::ConnectionClosed;
77    pub use crate::behaviour::ConnectionEstablished;
78    pub use crate::behaviour::DialFailure;
79    pub use crate::behaviour::ExpiredListenAddr;
80    pub use crate::behaviour::ExternalAddrConfirmed;
81    pub use crate::behaviour::ExternalAddrExpired;
82    pub use crate::behaviour::FromSwarm;
83    pub use crate::behaviour::ListenFailure;
84    pub use crate::behaviour::ListenerClosed;
85    pub use crate::behaviour::ListenerError;
86    pub use crate::behaviour::NewExternalAddrCandidate;
87    pub use crate::behaviour::NewExternalAddrOfPeer;
88    pub use crate::behaviour::NewListenAddr;
89    pub use crate::behaviour::NewListener;
90    pub use crate::connection::ConnectionId;
91    pub use crate::ConnectionDenied;
92    pub use crate::ConnectionHandler;
93    pub use crate::ConnectionHandlerSelect;
94    pub use crate::DialError;
95    pub use crate::NetworkBehaviour;
96    pub use crate::THandler;
97    pub use crate::THandlerInEvent;
98    pub use crate::THandlerOutEvent;
99    pub use crate::ToSwarm;
100    pub use either::Either;
101    pub use futures::prelude as futures;
102    pub use libp2p_core::transport::ListenerId;
103    pub use libp2p_core::ConnectedPoint;
104    pub use libp2p_core::Endpoint;
105    pub use libp2p_core::Multiaddr;
106    pub use libp2p_identity::PeerId;
107}
108
109pub use behaviour::{
110    AddressChange, CloseConnection, ConnectionClosed, DialFailure, ExpiredListenAddr,
111    ExternalAddrExpired, ExternalAddresses, FromSwarm, ListenAddresses, ListenFailure,
112    ListenerClosed, ListenerError, NetworkBehaviour, NewExternalAddrCandidate,
113    NewExternalAddrOfPeer, NewListenAddr, NotifyHandler, PeerAddresses, ToSwarm,
114};
115pub use connection::pool::ConnectionCounters;
116pub use connection::{ConnectionError, ConnectionId, SupportedProtocols};
117pub use executor::Executor;
118pub use handler::{
119    ConnectionHandler, ConnectionHandlerEvent, ConnectionHandlerSelect, OneShotHandler,
120    OneShotHandlerConfig, StreamUpgradeError, SubstreamProtocol,
121};
122#[cfg(feature = "macros")]
123pub use libp2p_swarm_derive::NetworkBehaviour;
124pub use listen_opts::ListenOpts;
125pub use stream::Stream;
126pub use stream_protocol::{InvalidProtocol, StreamProtocol};
127
128use crate::behaviour::ExternalAddrConfirmed;
129use crate::handler::UpgradeInfoSend;
130use connection::pool::{EstablishedConnection, Pool, PoolConfig, PoolEvent};
131use connection::IncomingInfo;
132use connection::{
133    PendingConnectionError, PendingInboundConnectionError, PendingOutboundConnectionError,
134};
135use dial_opts::{DialOpts, PeerCondition};
136use futures::{prelude::*, stream::FusedStream};
137use libp2p_core::{
138    connection::ConnectedPoint,
139    muxing::StreamMuxerBox,
140    transport::{self, ListenerId, TransportError, TransportEvent},
141    Endpoint, Multiaddr, Transport,
142};
143use libp2p_identity::PeerId;
144use smallvec::SmallVec;
145use std::collections::{HashMap, HashSet, VecDeque};
146use std::num::{NonZeroU32, NonZeroU8, NonZeroUsize};
147use std::time::Duration;
148use std::{
149    error, fmt, io,
150    pin::Pin,
151    task::{Context, Poll},
152};
153use tracing::Instrument;
154
155/// Event generated by the [`NetworkBehaviour`] that the swarm will report back.
156type TBehaviourOutEvent<TBehaviour> = <TBehaviour as NetworkBehaviour>::ToSwarm;
157
158/// [`ConnectionHandler`] of the [`NetworkBehaviour`] for all the protocols the [`NetworkBehaviour`]
159/// supports.
160pub type THandler<TBehaviour> = <TBehaviour as NetworkBehaviour>::ConnectionHandler;
161
162/// Custom event that can be received by the [`ConnectionHandler`] of the
163/// [`NetworkBehaviour`].
164pub type THandlerInEvent<TBehaviour> = <THandler<TBehaviour> as ConnectionHandler>::FromBehaviour;
165
166/// Custom event that can be produced by the [`ConnectionHandler`] of the [`NetworkBehaviour`].
167pub type THandlerOutEvent<TBehaviour> = <THandler<TBehaviour> as ConnectionHandler>::ToBehaviour;
168
169/// Event generated by the `Swarm`.
170#[derive(Debug)]
171#[non_exhaustive]
172pub enum SwarmEvent<TBehaviourOutEvent> {
173    /// Event generated by the `NetworkBehaviour`.
174    Behaviour(TBehaviourOutEvent),
175    /// A connection to the given peer has been opened.
176    ConnectionEstablished {
177        /// Identity of the peer that we have connected to.
178        peer_id: PeerId,
179        /// Identifier of the connection.
180        connection_id: ConnectionId,
181        /// Endpoint of the connection that has been opened.
182        endpoint: ConnectedPoint,
183        /// Number of established connections to this peer, including the one that has just been
184        /// opened.
185        num_established: NonZeroU32,
186        /// [`Some`] when the new connection is an outgoing connection.
187        /// Addresses are dialed concurrently. Contains the addresses and errors
188        /// of dial attempts that failed before the one successful dial.
189        concurrent_dial_errors: Option<Vec<(Multiaddr, TransportError<io::Error>)>>,
190        /// How long it took to establish this connection
191        established_in: std::time::Duration,
192    },
193    /// A connection with the given peer has been closed,
194    /// possibly as a result of an error.
195    ConnectionClosed {
196        /// Identity of the peer that we have connected to.
197        peer_id: PeerId,
198        /// Identifier of the connection.
199        connection_id: ConnectionId,
200        /// Endpoint of the connection that has been closed.
201        endpoint: ConnectedPoint,
202        /// Number of other remaining connections to this same peer.
203        num_established: u32,
204        /// Reason for the disconnection, if it was not a successful
205        /// active close.
206        cause: Option<ConnectionError>,
207    },
208    /// A new connection arrived on a listener and is in the process of protocol negotiation.
209    ///
210    /// A corresponding [`ConnectionEstablished`](SwarmEvent::ConnectionEstablished) or
211    /// [`IncomingConnectionError`](SwarmEvent::IncomingConnectionError) event will later be
212    /// generated for this connection.
213    IncomingConnection {
214        /// Identifier of the connection.
215        connection_id: ConnectionId,
216        /// Local connection address.
217        /// This address has been earlier reported with a [`NewListenAddr`](SwarmEvent::NewListenAddr)
218        /// event.
219        local_addr: Multiaddr,
220        /// Address used to send back data to the remote.
221        send_back_addr: Multiaddr,
222    },
223    /// An error happened on an inbound connection during its initial handshake.
224    ///
225    /// This can include, for example, an error during the handshake of the encryption layer, or
226    /// the connection unexpectedly closed.
227    IncomingConnectionError {
228        /// Identifier of the connection.
229        connection_id: ConnectionId,
230        /// Local connection address.
231        /// This address has been earlier reported with a [`NewListenAddr`](SwarmEvent::NewListenAddr)
232        /// event.
233        local_addr: Multiaddr,
234        /// Address used to send back data to the remote.
235        send_back_addr: Multiaddr,
236        /// The error that happened.
237        error: ListenError,
238    },
239    /// An error happened on an outbound connection.
240    OutgoingConnectionError {
241        /// Identifier of the connection.
242        connection_id: ConnectionId,
243        /// If known, [`PeerId`] of the peer we tried to reach.
244        peer_id: Option<PeerId>,
245        /// Error that has been encountered.
246        error: DialError,
247    },
248    /// One of our listeners has reported a new local listening address.
249    NewListenAddr {
250        /// The listener that is listening on the new address.
251        listener_id: ListenerId,
252        /// The new address that is being listened on.
253        address: Multiaddr,
254    },
255    /// One of our listeners has reported the expiration of a listening address.
256    ExpiredListenAddr {
257        /// The listener that is no longer listening on the address.
258        listener_id: ListenerId,
259        /// The expired address.
260        address: Multiaddr,
261    },
262    /// One of the listeners gracefully closed.
263    ListenerClosed {
264        /// The listener that closed.
265        listener_id: ListenerId,
266        /// The addresses that the listener was listening on. These addresses are now considered
267        /// expired, similar to if a [`ExpiredListenAddr`](SwarmEvent::ExpiredListenAddr) event
268        /// has been generated for each of them.
269        addresses: Vec<Multiaddr>,
270        /// Reason for the closure. Contains `Ok(())` if the stream produced `None`, or `Err`
271        /// if the stream produced an error.
272        reason: Result<(), io::Error>,
273    },
274    /// One of the listeners reported a non-fatal error.
275    ListenerError {
276        /// The listener that errored.
277        listener_id: ListenerId,
278        /// The listener error.
279        error: io::Error,
280    },
281    /// A new dialing attempt has been initiated by the [`NetworkBehaviour`]
282    /// implementation.
283    ///
284    /// A [`ConnectionEstablished`](SwarmEvent::ConnectionEstablished) event is
285    /// reported if the dialing attempt succeeds, otherwise a
286    /// [`OutgoingConnectionError`](SwarmEvent::OutgoingConnectionError) event
287    /// is reported.
288    Dialing {
289        /// Identity of the peer that we are connecting to.
290        peer_id: Option<PeerId>,
291
292        /// Identifier of the connection.
293        connection_id: ConnectionId,
294    },
295    /// We have discovered a new candidate for an external address for us.
296    NewExternalAddrCandidate { address: Multiaddr },
297    /// An external address of the local node was confirmed.
298    ExternalAddrConfirmed { address: Multiaddr },
299    /// An external address of the local node expired, i.e. is no-longer confirmed.
300    ExternalAddrExpired { address: Multiaddr },
301    /// We have discovered a new address of a peer.
302    NewExternalAddrOfPeer { peer_id: PeerId, address: Multiaddr },
303}
304
305impl<TBehaviourOutEvent> SwarmEvent<TBehaviourOutEvent> {
306    /// Extract the `TBehaviourOutEvent` from this [`SwarmEvent`] in case it is the `Behaviour` variant, otherwise fail.
307    #[allow(clippy::result_large_err)]
308    pub fn try_into_behaviour_event(self) -> Result<TBehaviourOutEvent, Self> {
309        match self {
310            SwarmEvent::Behaviour(inner) => Ok(inner),
311            other => Err(other),
312        }
313    }
314}
315
316/// Contains the state of the network, plus the way it should behave.
317///
318/// Note: Needs to be polled via `<Swarm as Stream>` in order to make
319/// progress.
320pub struct Swarm<TBehaviour>
321where
322    TBehaviour: NetworkBehaviour,
323{
324    /// [`Transport`] for dialing remote peers and listening for incoming connection.
325    transport: transport::Boxed<(PeerId, StreamMuxerBox)>,
326
327    /// The nodes currently active.
328    pool: Pool<THandler<TBehaviour>>,
329
330    /// The local peer ID.
331    local_peer_id: PeerId,
332
333    /// Handles which nodes to connect to and how to handle the events sent back by the protocol
334    /// handlers.
335    behaviour: TBehaviour,
336
337    /// List of protocols that the behaviour says it supports.
338    supported_protocols: SmallVec<[Vec<u8>; 16]>,
339
340    confirmed_external_addr: HashSet<Multiaddr>,
341
342    /// Multiaddresses that our listeners are listening on,
343    listened_addrs: HashMap<ListenerId, SmallVec<[Multiaddr; 1]>>,
344
345    /// Pending event to be delivered to connection handlers
346    /// (or dropped if the peer disconnected) before the `behaviour`
347    /// can be polled again.
348    pending_handler_event: Option<(PeerId, PendingNotifyHandler, THandlerInEvent<TBehaviour>)>,
349
350    pending_swarm_events: VecDeque<SwarmEvent<TBehaviour::ToSwarm>>,
351}
352
353impl<TBehaviour> Unpin for Swarm<TBehaviour> where TBehaviour: NetworkBehaviour {}
354
355impl<TBehaviour> Swarm<TBehaviour>
356where
357    TBehaviour: NetworkBehaviour,
358{
359    /// Creates a new [`Swarm`] from the given [`Transport`], [`NetworkBehaviour`], [`PeerId`] and
360    /// [`Config`].
361    pub fn new(
362        transport: transport::Boxed<(PeerId, StreamMuxerBox)>,
363        behaviour: TBehaviour,
364        local_peer_id: PeerId,
365        config: Config,
366    ) -> Self {
367        tracing::info!(%local_peer_id);
368
369        Swarm {
370            local_peer_id,
371            transport,
372            pool: Pool::new(local_peer_id, config.pool_config),
373            behaviour,
374            supported_protocols: Default::default(),
375            confirmed_external_addr: Default::default(),
376            listened_addrs: HashMap::new(),
377            pending_handler_event: None,
378            pending_swarm_events: VecDeque::default(),
379        }
380    }
381
382    /// Returns information about the connections underlying the [`Swarm`].
383    pub fn network_info(&self) -> NetworkInfo {
384        let num_peers = self.pool.num_peers();
385        let connection_counters = self.pool.counters().clone();
386        NetworkInfo {
387            num_peers,
388            connection_counters,
389        }
390    }
391
392    /// Starts listening on the given address.
393    /// Returns an error if the address is not supported.
394    ///
395    /// Listeners report their new listening addresses as [`SwarmEvent::NewListenAddr`].
396    /// Depending on the underlying transport, one listener may have multiple listening addresses.
397    pub fn listen_on(&mut self, addr: Multiaddr) -> Result<ListenerId, TransportError<io::Error>> {
398        let opts = ListenOpts::new(addr);
399        let id = opts.listener_id();
400        self.add_listener(opts)?;
401        Ok(id)
402    }
403
404    /// Remove some listener.
405    ///
406    /// Returns `true` if there was a listener with this ID, `false`
407    /// otherwise.
408    pub fn remove_listener(&mut self, listener_id: ListenerId) -> bool {
409        self.transport.remove_listener(listener_id)
410    }
411
412    /// Dial a known or unknown peer.
413    ///
414    /// See also [`DialOpts`].
415    ///
416    /// ```
417    /// # use libp2p_swarm::Swarm;
418    /// # use libp2p_swarm::dial_opts::{DialOpts, PeerCondition};
419    /// # use libp2p_core::{Multiaddr, Transport};
420    /// # use libp2p_core::transport::dummy::DummyTransport;
421    /// # use libp2p_swarm::dummy;
422    /// # use libp2p_identity::PeerId;
423    /// #
424    /// # #[tokio::main]
425    /// # async fn main() {
426    /// let mut swarm = build_swarm();
427    ///
428    /// // Dial a known peer.
429    /// swarm.dial(PeerId::random());
430    ///
431    /// // Dial an unknown peer.
432    /// swarm.dial("/ip6/::1/tcp/12345".parse::<Multiaddr>().unwrap());
433    /// # }
434    ///
435    /// # fn build_swarm() -> Swarm<dummy::Behaviour> {
436    /// #     Swarm::new(DummyTransport::new().boxed(), dummy::Behaviour, PeerId::random(), libp2p_swarm::Config::with_tokio_executor())
437    /// # }
438    /// ```
439    pub fn dial(&mut self, opts: impl Into<DialOpts>) -> Result<(), DialError> {
440        let dial_opts = opts.into();
441
442        let peer_id = dial_opts.get_peer_id();
443        let condition = dial_opts.peer_condition();
444        let connection_id = dial_opts.connection_id();
445
446        let should_dial = match (condition, peer_id) {
447            (_, None) => true,
448            (PeerCondition::Always, _) => true,
449            (PeerCondition::Disconnected, Some(peer_id)) => !self.pool.is_connected(peer_id),
450            (PeerCondition::NotDialing, Some(peer_id)) => !self.pool.is_dialing(peer_id),
451            (PeerCondition::DisconnectedAndNotDialing, Some(peer_id)) => {
452                !self.pool.is_dialing(peer_id) && !self.pool.is_connected(peer_id)
453            }
454        };
455
456        if !should_dial {
457            let e = DialError::DialPeerConditionFalse(condition);
458
459            self.behaviour
460                .on_swarm_event(FromSwarm::DialFailure(DialFailure {
461                    peer_id,
462                    error: &e,
463                    connection_id,
464                }));
465
466            return Err(e);
467        }
468
469        let addresses = {
470            let mut addresses_from_opts = dial_opts.get_addresses();
471
472            match self.behaviour.handle_pending_outbound_connection(
473                connection_id,
474                peer_id,
475                addresses_from_opts.as_slice(),
476                dial_opts.role_override(),
477            ) {
478                Ok(addresses) => {
479                    if dial_opts.extend_addresses_through_behaviour() {
480                        addresses_from_opts.extend(addresses)
481                    } else {
482                        let num_addresses = addresses.len();
483
484                        if num_addresses > 0 {
485                            tracing::debug!(
486                                connection=%connection_id,
487                                discarded_addresses_count=%num_addresses,
488                                "discarding addresses from `NetworkBehaviour` because `DialOpts::extend_addresses_through_behaviour is `false` for connection"
489                            )
490                        }
491                    }
492                }
493                Err(cause) => {
494                    let error = DialError::Denied { cause };
495
496                    self.behaviour
497                        .on_swarm_event(FromSwarm::DialFailure(DialFailure {
498                            peer_id,
499                            error: &error,
500                            connection_id,
501                        }));
502
503                    return Err(error);
504                }
505            }
506
507            let mut unique_addresses = HashSet::new();
508            addresses_from_opts.retain(|addr| {
509                !self.listened_addrs.values().flatten().any(|a| a == addr)
510                    && unique_addresses.insert(addr.clone())
511            });
512
513            if addresses_from_opts.is_empty() {
514                let error = DialError::NoAddresses;
515                self.behaviour
516                    .on_swarm_event(FromSwarm::DialFailure(DialFailure {
517                        peer_id,
518                        error: &error,
519                        connection_id,
520                    }));
521                return Err(error);
522            };
523
524            addresses_from_opts
525        };
526
527        let dials = addresses
528            .into_iter()
529            .map(|a| match peer_id.map_or(Ok(a.clone()), |p| a.with_p2p(p)) {
530                Ok(address) => {
531                    let (dial, span) = match dial_opts.role_override() {
532                        Endpoint::Dialer => (
533                            self.transport.dial(address.clone()),
534                            tracing::debug_span!(parent: tracing::Span::none(), "Transport::dial", %address),
535                        ),
536                        Endpoint::Listener => (
537                            self.transport.dial_as_listener(address.clone()),
538                            tracing::debug_span!(parent: tracing::Span::none(), "Transport::dial_as_listener", %address),
539                        ),
540                    };
541                    span.follows_from(tracing::Span::current());
542
543                    match dial {
544                        Ok(fut) => fut
545                            .map(|r| (address, r.map_err(TransportError::Other)))
546                            .instrument(span)
547                            .boxed(),
548                        Err(err) => futures::future::ready((address, Err(err))).boxed(),
549                    }
550                }
551                Err(address) => futures::future::ready((
552                    address.clone(),
553                    Err(TransportError::MultiaddrNotSupported(address)),
554                ))
555                .boxed(),
556            })
557            .collect();
558
559        self.pool.add_outgoing(
560            dials,
561            peer_id,
562            dial_opts.role_override(),
563            dial_opts.dial_concurrency_override(),
564            connection_id,
565        );
566
567        Ok(())
568    }
569
570    /// Returns an iterator that produces the list of addresses we're listening on.
571    pub fn listeners(&self) -> impl Iterator<Item = &Multiaddr> {
572        self.listened_addrs.values().flatten()
573    }
574
575    /// Returns the peer ID of the swarm passed as parameter.
576    pub fn local_peer_id(&self) -> &PeerId {
577        &self.local_peer_id
578    }
579
580    /// List all **confirmed** external address for the local node.
581    pub fn external_addresses(&self) -> impl Iterator<Item = &Multiaddr> {
582        self.confirmed_external_addr.iter()
583    }
584
585    fn add_listener(&mut self, opts: ListenOpts) -> Result<(), TransportError<io::Error>> {
586        let addr = opts.address();
587        let listener_id = opts.listener_id();
588
589        if let Err(e) = self.transport.listen_on(listener_id, addr.clone()) {
590            self.behaviour
591                .on_swarm_event(FromSwarm::ListenerError(behaviour::ListenerError {
592                    listener_id,
593                    err: &e,
594                }));
595
596            return Err(e);
597        }
598
599        self.behaviour
600            .on_swarm_event(FromSwarm::NewListener(behaviour::NewListener {
601                listener_id,
602            }));
603
604        Ok(())
605    }
606
607    /// Add a **confirmed** external address for the local node.
608    ///
609    /// This function should only be called with addresses that are guaranteed to be reachable.
610    /// The address is broadcast to all [`NetworkBehaviour`]s via [`FromSwarm::ExternalAddrConfirmed`].
611    pub fn add_external_address(&mut self, a: Multiaddr) {
612        self.behaviour
613            .on_swarm_event(FromSwarm::ExternalAddrConfirmed(ExternalAddrConfirmed {
614                addr: &a,
615            }));
616        self.confirmed_external_addr.insert(a);
617    }
618
619    /// Remove an external address for the local node.
620    ///
621    /// The address is broadcast to all [`NetworkBehaviour`]s via [`FromSwarm::ExternalAddrExpired`].
622    pub fn remove_external_address(&mut self, addr: &Multiaddr) {
623        self.behaviour
624            .on_swarm_event(FromSwarm::ExternalAddrExpired(ExternalAddrExpired { addr }));
625        self.confirmed_external_addr.remove(addr);
626    }
627
628    /// Add a new external address of a remote peer.
629    ///
630    /// The address is broadcast to all [`NetworkBehaviour`]s via [`FromSwarm::NewExternalAddrOfPeer`].
631    pub fn add_peer_address(&mut self, peer_id: PeerId, addr: Multiaddr) {
632        self.behaviour
633            .on_swarm_event(FromSwarm::NewExternalAddrOfPeer(NewExternalAddrOfPeer {
634                peer_id,
635                addr: &addr,
636            }))
637    }
638
639    /// Disconnects a peer by its peer ID, closing all connections to said peer.
640    ///
641    /// Returns `Ok(())` if there was one or more established connections to the peer.
642    ///
643    /// Closing a connection via [`Swarm::disconnect_peer_id`] will poll [`ConnectionHandler::poll_close`] to completion.
644    /// Use this function if you want to close a connection _despite_ it still being in use by one or more handlers.
645    #[allow(clippy::result_unit_err)]
646    pub fn disconnect_peer_id(&mut self, peer_id: PeerId) -> Result<(), ()> {
647        let was_connected = self.pool.is_connected(peer_id);
648        self.pool.disconnect(peer_id);
649
650        if was_connected {
651            Ok(())
652        } else {
653            Err(())
654        }
655    }
656
657    /// Attempt to gracefully close a connection.
658    ///
659    /// Closing a connection is asynchronous but this function will return immediately.
660    /// A [`SwarmEvent::ConnectionClosed`] event will be emitted once the connection is actually closed.
661    ///
662    /// # Returns
663    ///
664    /// - `true` if the connection was established and is now being closed.
665    /// - `false` if the connection was not found or is no longer established.
666    pub fn close_connection(&mut self, connection_id: ConnectionId) -> bool {
667        if let Some(established) = self.pool.get_established(connection_id) {
668            established.start_close();
669            return true;
670        }
671
672        false
673    }
674
675    /// Checks whether there is an established connection to a peer.
676    pub fn is_connected(&self, peer_id: &PeerId) -> bool {
677        self.pool.is_connected(*peer_id)
678    }
679
680    /// Returns the currently connected peers.
681    pub fn connected_peers(&self) -> impl Iterator<Item = &PeerId> {
682        self.pool.iter_connected()
683    }
684
685    /// Returns a reference to the provided [`NetworkBehaviour`].
686    pub fn behaviour(&self) -> &TBehaviour {
687        &self.behaviour
688    }
689
690    /// Returns a mutable reference to the provided [`NetworkBehaviour`].
691    pub fn behaviour_mut(&mut self) -> &mut TBehaviour {
692        &mut self.behaviour
693    }
694
695    fn handle_pool_event(&mut self, event: PoolEvent<THandlerOutEvent<TBehaviour>>) {
696        match event {
697            PoolEvent::ConnectionEstablished {
698                peer_id,
699                id,
700                endpoint,
701                connection,
702                concurrent_dial_errors,
703                established_in,
704            } => {
705                let handler = match endpoint.clone() {
706                    ConnectedPoint::Dialer {
707                        address,
708                        role_override,
709                    } => {
710                        match self.behaviour.handle_established_outbound_connection(
711                            id,
712                            peer_id,
713                            &address,
714                            role_override,
715                        ) {
716                            Ok(handler) => handler,
717                            Err(cause) => {
718                                let dial_error = DialError::Denied { cause };
719                                self.behaviour.on_swarm_event(FromSwarm::DialFailure(
720                                    DialFailure {
721                                        connection_id: id,
722                                        error: &dial_error,
723                                        peer_id: Some(peer_id),
724                                    },
725                                ));
726
727                                self.pending_swarm_events.push_back(
728                                    SwarmEvent::OutgoingConnectionError {
729                                        peer_id: Some(peer_id),
730                                        connection_id: id,
731                                        error: dial_error,
732                                    },
733                                );
734                                return;
735                            }
736                        }
737                    }
738                    ConnectedPoint::Listener {
739                        local_addr,
740                        send_back_addr,
741                    } => {
742                        match self.behaviour.handle_established_inbound_connection(
743                            id,
744                            peer_id,
745                            &local_addr,
746                            &send_back_addr,
747                        ) {
748                            Ok(handler) => handler,
749                            Err(cause) => {
750                                let listen_error = ListenError::Denied { cause };
751                                self.behaviour.on_swarm_event(FromSwarm::ListenFailure(
752                                    ListenFailure {
753                                        local_addr: &local_addr,
754                                        send_back_addr: &send_back_addr,
755                                        error: &listen_error,
756                                        connection_id: id,
757                                    },
758                                ));
759
760                                self.pending_swarm_events.push_back(
761                                    SwarmEvent::IncomingConnectionError {
762                                        connection_id: id,
763                                        send_back_addr,
764                                        local_addr,
765                                        error: listen_error,
766                                    },
767                                );
768                                return;
769                            }
770                        }
771                    }
772                };
773
774                let supported_protocols = handler
775                    .listen_protocol()
776                    .upgrade()
777                    .protocol_info()
778                    .map(|p| p.as_ref().as_bytes().to_vec())
779                    .collect();
780                let other_established_connection_ids = self
781                    .pool
782                    .iter_established_connections_of_peer(&peer_id)
783                    .collect::<Vec<_>>();
784                let num_established = NonZeroU32::new(
785                    u32::try_from(other_established_connection_ids.len() + 1).unwrap(),
786                )
787                .expect("n + 1 is always non-zero; qed");
788
789                self.pool
790                    .spawn_connection(id, peer_id, &endpoint, connection, handler);
791
792                tracing::debug!(
793                    peer=%peer_id,
794                    ?endpoint,
795                    total_peers=%num_established,
796                    "Connection established"
797                );
798                let failed_addresses = concurrent_dial_errors
799                    .as_ref()
800                    .map(|es| {
801                        es.iter()
802                            .map(|(a, _)| a)
803                            .cloned()
804                            .collect::<Vec<Multiaddr>>()
805                    })
806                    .unwrap_or_default();
807                self.behaviour
808                    .on_swarm_event(FromSwarm::ConnectionEstablished(
809                        behaviour::ConnectionEstablished {
810                            peer_id,
811                            connection_id: id,
812                            endpoint: &endpoint,
813                            failed_addresses: &failed_addresses,
814                            other_established: other_established_connection_ids.len(),
815                        },
816                    ));
817                self.supported_protocols = supported_protocols;
818                self.pending_swarm_events
819                    .push_back(SwarmEvent::ConnectionEstablished {
820                        peer_id,
821                        connection_id: id,
822                        num_established,
823                        endpoint,
824                        concurrent_dial_errors,
825                        established_in,
826                    });
827            }
828            PoolEvent::PendingOutboundConnectionError {
829                id: connection_id,
830                error,
831                peer,
832            } => {
833                let error = error.into();
834
835                self.behaviour
836                    .on_swarm_event(FromSwarm::DialFailure(DialFailure {
837                        peer_id: peer,
838                        error: &error,
839                        connection_id,
840                    }));
841
842                if let Some(peer) = peer {
843                    tracing::debug!(%peer, "Connection attempt to peer failed with {:?}.", error,);
844                } else {
845                    tracing::debug!("Connection attempt to unknown peer failed with {:?}", error);
846                }
847
848                self.pending_swarm_events
849                    .push_back(SwarmEvent::OutgoingConnectionError {
850                        peer_id: peer,
851                        connection_id,
852                        error,
853                    });
854            }
855            PoolEvent::PendingInboundConnectionError {
856                id,
857                send_back_addr,
858                local_addr,
859                error,
860            } => {
861                let error = error.into();
862
863                tracing::debug!("Incoming connection failed: {:?}", error);
864                self.behaviour
865                    .on_swarm_event(FromSwarm::ListenFailure(ListenFailure {
866                        local_addr: &local_addr,
867                        send_back_addr: &send_back_addr,
868                        error: &error,
869                        connection_id: id,
870                    }));
871                self.pending_swarm_events
872                    .push_back(SwarmEvent::IncomingConnectionError {
873                        connection_id: id,
874                        local_addr,
875                        send_back_addr,
876                        error,
877                    });
878            }
879            PoolEvent::ConnectionClosed {
880                id,
881                connected,
882                error,
883                remaining_established_connection_ids,
884                ..
885            } => {
886                if let Some(error) = error.as_ref() {
887                    tracing::debug!(
888                        total_peers=%remaining_established_connection_ids.len(),
889                        "Connection closed with error {:?}: {:?}",
890                        error,
891                        connected,
892                    );
893                } else {
894                    tracing::debug!(
895                        total_peers=%remaining_established_connection_ids.len(),
896                        "Connection closed: {:?}",
897                        connected
898                    );
899                }
900                let peer_id = connected.peer_id;
901                let endpoint = connected.endpoint;
902                let num_established =
903                    u32::try_from(remaining_established_connection_ids.len()).unwrap();
904
905                self.behaviour
906                    .on_swarm_event(FromSwarm::ConnectionClosed(ConnectionClosed {
907                        peer_id,
908                        connection_id: id,
909                        endpoint: &endpoint,
910                        remaining_established: num_established as usize,
911                    }));
912                self.pending_swarm_events
913                    .push_back(SwarmEvent::ConnectionClosed {
914                        peer_id,
915                        connection_id: id,
916                        endpoint,
917                        cause: error,
918                        num_established,
919                    });
920            }
921            PoolEvent::ConnectionEvent { peer_id, id, event } => {
922                self.behaviour
923                    .on_connection_handler_event(peer_id, id, event);
924            }
925            PoolEvent::AddressChange {
926                peer_id,
927                id,
928                new_endpoint,
929                old_endpoint,
930            } => {
931                self.behaviour
932                    .on_swarm_event(FromSwarm::AddressChange(AddressChange {
933                        peer_id,
934                        connection_id: id,
935                        old: &old_endpoint,
936                        new: &new_endpoint,
937                    }));
938            }
939        }
940    }
941
942    fn handle_transport_event(
943        &mut self,
944        event: TransportEvent<
945            <transport::Boxed<(PeerId, StreamMuxerBox)> as Transport>::ListenerUpgrade,
946            io::Error,
947        >,
948    ) {
949        match event {
950            TransportEvent::Incoming {
951                listener_id: _,
952                upgrade,
953                local_addr,
954                send_back_addr,
955            } => {
956                let connection_id = ConnectionId::next();
957
958                match self.behaviour.handle_pending_inbound_connection(
959                    connection_id,
960                    &local_addr,
961                    &send_back_addr,
962                ) {
963                    Ok(()) => {}
964                    Err(cause) => {
965                        let listen_error = ListenError::Denied { cause };
966
967                        self.behaviour
968                            .on_swarm_event(FromSwarm::ListenFailure(ListenFailure {
969                                local_addr: &local_addr,
970                                send_back_addr: &send_back_addr,
971                                error: &listen_error,
972                                connection_id,
973                            }));
974
975                        self.pending_swarm_events
976                            .push_back(SwarmEvent::IncomingConnectionError {
977                                connection_id,
978                                local_addr,
979                                send_back_addr,
980                                error: listen_error,
981                            });
982                        return;
983                    }
984                }
985
986                self.pool.add_incoming(
987                    upgrade,
988                    IncomingInfo {
989                        local_addr: &local_addr,
990                        send_back_addr: &send_back_addr,
991                    },
992                    connection_id,
993                );
994
995                self.pending_swarm_events
996                    .push_back(SwarmEvent::IncomingConnection {
997                        connection_id,
998                        local_addr,
999                        send_back_addr,
1000                    })
1001            }
1002            TransportEvent::NewAddress {
1003                listener_id,
1004                listen_addr,
1005            } => {
1006                tracing::debug!(
1007                    listener=?listener_id,
1008                    address=%listen_addr,
1009                    "New listener address"
1010                );
1011                let addrs = self.listened_addrs.entry(listener_id).or_default();
1012                if !addrs.contains(&listen_addr) {
1013                    addrs.push(listen_addr.clone())
1014                }
1015                self.behaviour
1016                    .on_swarm_event(FromSwarm::NewListenAddr(NewListenAddr {
1017                        listener_id,
1018                        addr: &listen_addr,
1019                    }));
1020                self.pending_swarm_events
1021                    .push_back(SwarmEvent::NewListenAddr {
1022                        listener_id,
1023                        address: listen_addr,
1024                    })
1025            }
1026            TransportEvent::AddressExpired {
1027                listener_id,
1028                listen_addr,
1029            } => {
1030                tracing::debug!(
1031                    listener=?listener_id,
1032                    address=%listen_addr,
1033                    "Expired listener address"
1034                );
1035                if let Some(addrs) = self.listened_addrs.get_mut(&listener_id) {
1036                    addrs.retain(|a| a != &listen_addr);
1037                }
1038                self.behaviour
1039                    .on_swarm_event(FromSwarm::ExpiredListenAddr(ExpiredListenAddr {
1040                        listener_id,
1041                        addr: &listen_addr,
1042                    }));
1043                self.pending_swarm_events
1044                    .push_back(SwarmEvent::ExpiredListenAddr {
1045                        listener_id,
1046                        address: listen_addr,
1047                    })
1048            }
1049            TransportEvent::ListenerClosed {
1050                listener_id,
1051                reason,
1052            } => {
1053                tracing::debug!(
1054                    listener=?listener_id,
1055                    ?reason,
1056                    "Listener closed"
1057                );
1058                let addrs = self.listened_addrs.remove(&listener_id).unwrap_or_default();
1059                for addr in addrs.iter() {
1060                    self.behaviour.on_swarm_event(FromSwarm::ExpiredListenAddr(
1061                        ExpiredListenAddr { listener_id, addr },
1062                    ));
1063                }
1064                self.behaviour
1065                    .on_swarm_event(FromSwarm::ListenerClosed(ListenerClosed {
1066                        listener_id,
1067                        reason: reason.as_ref().copied(),
1068                    }));
1069                self.pending_swarm_events
1070                    .push_back(SwarmEvent::ListenerClosed {
1071                        listener_id,
1072                        addresses: addrs.to_vec(),
1073                        reason,
1074                    })
1075            }
1076            TransportEvent::ListenerError { listener_id, error } => {
1077                self.behaviour
1078                    .on_swarm_event(FromSwarm::ListenerError(ListenerError {
1079                        listener_id,
1080                        err: &error,
1081                    }));
1082                self.pending_swarm_events
1083                    .push_back(SwarmEvent::ListenerError { listener_id, error })
1084            }
1085        }
1086    }
1087
1088    fn handle_behaviour_event(
1089        &mut self,
1090        event: ToSwarm<TBehaviour::ToSwarm, THandlerInEvent<TBehaviour>>,
1091    ) {
1092        match event {
1093            ToSwarm::GenerateEvent(event) => {
1094                self.pending_swarm_events
1095                    .push_back(SwarmEvent::Behaviour(event));
1096            }
1097            ToSwarm::Dial { opts } => {
1098                let peer_id = opts.get_peer_id();
1099                let connection_id = opts.connection_id();
1100                if let Ok(()) = self.dial(opts) {
1101                    self.pending_swarm_events.push_back(SwarmEvent::Dialing {
1102                        peer_id,
1103                        connection_id,
1104                    });
1105                }
1106            }
1107            ToSwarm::ListenOn { opts } => {
1108                // Error is dispatched internally, safe to ignore.
1109                let _ = self.add_listener(opts);
1110            }
1111            ToSwarm::RemoveListener { id } => {
1112                self.remove_listener(id);
1113            }
1114            ToSwarm::NotifyHandler {
1115                peer_id,
1116                handler,
1117                event,
1118            } => {
1119                assert!(self.pending_handler_event.is_none());
1120                let handler = match handler {
1121                    NotifyHandler::One(connection) => PendingNotifyHandler::One(connection),
1122                    NotifyHandler::Any => {
1123                        let ids = self
1124                            .pool
1125                            .iter_established_connections_of_peer(&peer_id)
1126                            .collect();
1127                        PendingNotifyHandler::Any(ids)
1128                    }
1129                };
1130
1131                self.pending_handler_event = Some((peer_id, handler, event));
1132            }
1133            ToSwarm::NewExternalAddrCandidate(addr) => {
1134                // Apply address translation to the candidate address.
1135                // For TCP without port-reuse, the observed address contains an ephemeral port which needs to be replaced by the port of a listen address.
1136                let translated_addresses = {
1137                    let mut addrs: Vec<_> = self
1138                        .listened_addrs
1139                        .values()
1140                        .flatten()
1141                        .filter_map(|server| self.transport.address_translation(server, &addr))
1142                        .collect();
1143
1144                    // remove duplicates
1145                    addrs.sort_unstable();
1146                    addrs.dedup();
1147                    addrs
1148                };
1149
1150                // If address translation yielded nothing, broadcast the original candidate address.
1151                if translated_addresses.is_empty() {
1152                    self.behaviour
1153                        .on_swarm_event(FromSwarm::NewExternalAddrCandidate(
1154                            NewExternalAddrCandidate { addr: &addr },
1155                        ));
1156                    self.pending_swarm_events
1157                        .push_back(SwarmEvent::NewExternalAddrCandidate { address: addr });
1158                } else {
1159                    for addr in translated_addresses {
1160                        self.behaviour
1161                            .on_swarm_event(FromSwarm::NewExternalAddrCandidate(
1162                                NewExternalAddrCandidate { addr: &addr },
1163                            ));
1164                        self.pending_swarm_events
1165                            .push_back(SwarmEvent::NewExternalAddrCandidate { address: addr });
1166                    }
1167                }
1168            }
1169            ToSwarm::ExternalAddrConfirmed(addr) => {
1170                self.add_external_address(addr.clone());
1171                self.pending_swarm_events
1172                    .push_back(SwarmEvent::ExternalAddrConfirmed { address: addr });
1173            }
1174            ToSwarm::ExternalAddrExpired(addr) => {
1175                self.remove_external_address(&addr);
1176                self.pending_swarm_events
1177                    .push_back(SwarmEvent::ExternalAddrExpired { address: addr });
1178            }
1179            ToSwarm::CloseConnection {
1180                peer_id,
1181                connection,
1182            } => match connection {
1183                CloseConnection::One(connection_id) => {
1184                    if let Some(conn) = self.pool.get_established(connection_id) {
1185                        conn.start_close();
1186                    }
1187                }
1188                CloseConnection::All => {
1189                    self.pool.disconnect(peer_id);
1190                }
1191            },
1192            ToSwarm::NewExternalAddrOfPeer { peer_id, address } => {
1193                self.behaviour
1194                    .on_swarm_event(FromSwarm::NewExternalAddrOfPeer(NewExternalAddrOfPeer {
1195                        peer_id,
1196                        addr: &address,
1197                    }));
1198                self.pending_swarm_events
1199                    .push_back(SwarmEvent::NewExternalAddrOfPeer { peer_id, address });
1200            }
1201        }
1202    }
1203
1204    /// Internal function used by everything event-related.
1205    ///
1206    /// Polls the `Swarm` for the next event.
1207    #[tracing::instrument(level = "debug", name = "Swarm::poll", skip(self, cx))]
1208    fn poll_next_event(
1209        mut self: Pin<&mut Self>,
1210        cx: &mut Context<'_>,
1211    ) -> Poll<SwarmEvent<TBehaviour::ToSwarm>> {
1212        // We use a `this` variable because the compiler can't mutably borrow multiple times
1213        // across a `Deref`.
1214        let this = &mut *self;
1215
1216        // This loop polls the components below in a prioritized order.
1217        //
1218        // 1. [`NetworkBehaviour`]
1219        // 2. Connection [`Pool`]
1220        // 3. [`ListenersStream`]
1221        //
1222        // (1) is polled before (2) to prioritize local work over work coming from a remote.
1223        //
1224        // (2) is polled before (3) to prioritize existing connections over upgrading new incoming connections.
1225        loop {
1226            if let Some(swarm_event) = this.pending_swarm_events.pop_front() {
1227                return Poll::Ready(swarm_event);
1228            }
1229
1230            match this.pending_handler_event.take() {
1231                // Try to deliver the pending event emitted by the [`NetworkBehaviour`] in the previous
1232                // iteration to the connection handler(s).
1233                Some((peer_id, handler, event)) => match handler {
1234                    PendingNotifyHandler::One(conn_id) => {
1235                        match this.pool.get_established(conn_id) {
1236                            Some(conn) => match notify_one(conn, event, cx) {
1237                                None => continue,
1238                                Some(event) => {
1239                                    this.pending_handler_event = Some((peer_id, handler, event));
1240                                }
1241                            },
1242                            None => continue,
1243                        }
1244                    }
1245                    PendingNotifyHandler::Any(ids) => {
1246                        match notify_any::<_, TBehaviour>(ids, &mut this.pool, event, cx) {
1247                            None => continue,
1248                            Some((event, ids)) => {
1249                                let handler = PendingNotifyHandler::Any(ids);
1250                                this.pending_handler_event = Some((peer_id, handler, event));
1251                            }
1252                        }
1253                    }
1254                },
1255                // No pending event. Allow the [`NetworkBehaviour`] to make progress.
1256                None => match this.behaviour.poll(cx) {
1257                    Poll::Pending => {}
1258                    Poll::Ready(behaviour_event) => {
1259                        this.handle_behaviour_event(behaviour_event);
1260
1261                        continue;
1262                    }
1263                },
1264            }
1265
1266            // Poll the known peers.
1267            match this.pool.poll(cx) {
1268                Poll::Pending => {}
1269                Poll::Ready(pool_event) => {
1270                    this.handle_pool_event(pool_event);
1271                    continue;
1272                }
1273            }
1274
1275            // Poll the listener(s) for new connections.
1276            match Pin::new(&mut this.transport).poll(cx) {
1277                Poll::Pending => {}
1278                Poll::Ready(transport_event) => {
1279                    this.handle_transport_event(transport_event);
1280                    continue;
1281                }
1282            }
1283
1284            return Poll::Pending;
1285        }
1286    }
1287}
1288
1289/// Connection to notify of a pending event.
1290///
1291/// The connection IDs out of which to notify one of an event are captured at
1292/// the time the behaviour emits the event, in order not to forward the event to
1293/// a new connection which the behaviour may not have been aware of at the time
1294/// it issued the request for sending it.
1295enum PendingNotifyHandler {
1296    One(ConnectionId),
1297    Any(SmallVec<[ConnectionId; 10]>),
1298}
1299
1300/// Notify a single connection of an event.
1301///
1302/// Returns `Some` with the given event if the connection is not currently
1303/// ready to receive another event, in which case the current task is
1304/// scheduled to be woken up.
1305///
1306/// Returns `None` if the connection is closing or the event has been
1307/// successfully sent, in either case the event is consumed.
1308fn notify_one<THandlerInEvent>(
1309    conn: &mut EstablishedConnection<THandlerInEvent>,
1310    event: THandlerInEvent,
1311    cx: &mut Context<'_>,
1312) -> Option<THandlerInEvent> {
1313    match conn.poll_ready_notify_handler(cx) {
1314        Poll::Pending => Some(event),
1315        Poll::Ready(Err(())) => None, // connection is closing
1316        Poll::Ready(Ok(())) => {
1317            // Can now only fail if connection is closing.
1318            let _ = conn.notify_handler(event);
1319            None
1320        }
1321    }
1322}
1323
1324/// Notify any one of a given list of connections of a peer of an event.
1325///
1326/// Returns `Some` with the given event and a new list of connections if
1327/// none of the given connections was able to receive the event but at
1328/// least one of them is not closing, in which case the current task
1329/// is scheduled to be woken up. The returned connections are those which
1330/// may still become ready to receive another event.
1331///
1332/// Returns `None` if either all connections are closing or the event
1333/// was successfully sent to a handler, in either case the event is consumed.
1334fn notify_any<THandler, TBehaviour>(
1335    ids: SmallVec<[ConnectionId; 10]>,
1336    pool: &mut Pool<THandler>,
1337    event: THandlerInEvent<TBehaviour>,
1338    cx: &mut Context<'_>,
1339) -> Option<(THandlerInEvent<TBehaviour>, SmallVec<[ConnectionId; 10]>)>
1340where
1341    TBehaviour: NetworkBehaviour,
1342    THandler: ConnectionHandler<
1343        FromBehaviour = THandlerInEvent<TBehaviour>,
1344        ToBehaviour = THandlerOutEvent<TBehaviour>,
1345    >,
1346{
1347    let mut pending = SmallVec::new();
1348    let mut event = Some(event); // (1)
1349    for id in ids.into_iter() {
1350        if let Some(conn) = pool.get_established(id) {
1351            match conn.poll_ready_notify_handler(cx) {
1352                Poll::Pending => pending.push(id),
1353                Poll::Ready(Err(())) => {} // connection is closing
1354                Poll::Ready(Ok(())) => {
1355                    let e = event.take().expect("by (1),(2)");
1356                    if let Err(e) = conn.notify_handler(e) {
1357                        event = Some(e) // (2)
1358                    } else {
1359                        break;
1360                    }
1361                }
1362            }
1363        }
1364    }
1365
1366    event.and_then(|e| {
1367        if !pending.is_empty() {
1368            Some((e, pending))
1369        } else {
1370            None
1371        }
1372    })
1373}
1374
1375/// Stream of events returned by [`Swarm`].
1376///
1377/// Includes events from the [`NetworkBehaviour`] as well as events about
1378/// connection and listener status. See [`SwarmEvent`] for details.
1379///
1380/// Note: This stream is infinite and it is guaranteed that
1381/// [`futures::Stream::poll_next`] will never return `Poll::Ready(None)`.
1382impl<TBehaviour> futures::Stream for Swarm<TBehaviour>
1383where
1384    TBehaviour: NetworkBehaviour,
1385{
1386    type Item = SwarmEvent<TBehaviourOutEvent<TBehaviour>>;
1387
1388    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
1389        self.as_mut().poll_next_event(cx).map(Some)
1390    }
1391}
1392
1393/// The stream of swarm events never terminates, so we can implement fused for it.
1394impl<TBehaviour> FusedStream for Swarm<TBehaviour>
1395where
1396    TBehaviour: NetworkBehaviour,
1397{
1398    fn is_terminated(&self) -> bool {
1399        false
1400    }
1401}
1402
1403pub struct Config {
1404    pool_config: PoolConfig,
1405}
1406
1407impl Config {
1408    /// Creates a new [`Config`] from the given executor. The [`Swarm`] is obtained via
1409    /// [`Swarm::new`].
1410    pub fn with_executor(executor: impl Executor + Send + 'static) -> Self {
1411        Self {
1412            pool_config: PoolConfig::new(Some(Box::new(executor))),
1413        }
1414    }
1415
1416    /// Sets executor to the `wasm` executor.
1417    /// Background tasks will be executed by the browser on the next micro-tick.
1418    ///
1419    /// Spawning a task is similar too:
1420    /// ```typescript
1421    /// function spawn(task: () => Promise<void>) {
1422    ///     task()
1423    /// }
1424    /// ```
1425    #[cfg(feature = "wasm-bindgen")]
1426    pub fn with_wasm_executor() -> Self {
1427        Self::with_executor(crate::executor::WasmBindgenExecutor)
1428    }
1429
1430    /// Builds a new [`Config`] from the given `tokio` executor.
1431    #[cfg(all(
1432        feature = "tokio",
1433        not(any(target_os = "emscripten", target_os = "wasi", target_os = "unknown"))
1434    ))]
1435    pub fn with_tokio_executor() -> Self {
1436        Self::with_executor(crate::executor::TokioExecutor)
1437    }
1438
1439    /// Builds a new [`Config`] from the given `async-std` executor.
1440    #[cfg(all(
1441        feature = "async-std",
1442        not(any(target_os = "emscripten", target_os = "wasi", target_os = "unknown"))
1443    ))]
1444    pub fn with_async_std_executor() -> Self {
1445        Self::with_executor(crate::executor::AsyncStdExecutor)
1446    }
1447
1448    /// Configures the number of events from the [`NetworkBehaviour`] in
1449    /// destination to the [`ConnectionHandler`] that can be buffered before
1450    /// the [`Swarm`] has to wait. An individual buffer with this number of
1451    /// events exists for each individual connection.
1452    ///
1453    /// The ideal value depends on the executor used, the CPU speed, and the
1454    /// volume of events. If this value is too low, then the [`Swarm`] will
1455    /// be sleeping more often than necessary. Increasing this value increases
1456    /// the overall memory usage.
1457    pub fn with_notify_handler_buffer_size(mut self, n: NonZeroUsize) -> Self {
1458        self.pool_config = self.pool_config.with_notify_handler_buffer_size(n);
1459        self
1460    }
1461
1462    /// Configures the size of the buffer for events sent by a [`ConnectionHandler`] to the
1463    /// [`NetworkBehaviour`].
1464    ///
1465    /// Each connection has its own buffer.
1466    ///
1467    /// The ideal value depends on the executor used, the CPU speed and the volume of events.
1468    /// If this value is too low, then the [`ConnectionHandler`]s will be sleeping more often
1469    /// than necessary. Increasing this value increases the overall memory
1470    /// usage, and more importantly the latency between the moment when an
1471    /// event is emitted and the moment when it is received by the
1472    /// [`NetworkBehaviour`].
1473    pub fn with_per_connection_event_buffer_size(mut self, n: usize) -> Self {
1474        self.pool_config = self.pool_config.with_per_connection_event_buffer_size(n);
1475        self
1476    }
1477
1478    /// Number of addresses concurrently dialed for a single outbound connection attempt.
1479    pub fn with_dial_concurrency_factor(mut self, factor: NonZeroU8) -> Self {
1480        self.pool_config = self.pool_config.with_dial_concurrency_factor(factor);
1481        self
1482    }
1483
1484    /// Configures an override for the substream upgrade protocol to use.
1485    ///
1486    /// The subtream upgrade protocol is the multistream-select protocol
1487    /// used for protocol negotiation on substreams. Since a listener
1488    /// supports all existing versions, the choice of upgrade protocol
1489    /// only effects the "dialer", i.e. the peer opening a substream.
1490    ///
1491    /// > **Note**: If configured, specific upgrade protocols for
1492    /// > individual [`SubstreamProtocol`]s emitted by the `NetworkBehaviour`
1493    /// > are ignored.
1494    pub fn with_substream_upgrade_protocol_override(
1495        mut self,
1496        v: libp2p_core::upgrade::Version,
1497    ) -> Self {
1498        self.pool_config = self.pool_config.with_substream_upgrade_protocol_override(v);
1499        self
1500    }
1501
1502    /// The maximum number of inbound streams concurrently negotiating on a
1503    /// connection. New inbound streams exceeding the limit are dropped and thus
1504    /// reset.
1505    ///
1506    /// Note: This only enforces a limit on the number of concurrently
1507    /// negotiating inbound streams. The total number of inbound streams on a
1508    /// connection is the sum of negotiating and negotiated streams. A limit on
1509    /// the total number of streams can be enforced at the
1510    /// [`StreamMuxerBox`] level.
1511    pub fn with_max_negotiating_inbound_streams(mut self, v: usize) -> Self {
1512        self.pool_config = self.pool_config.with_max_negotiating_inbound_streams(v);
1513        self
1514    }
1515
1516    /// How long to keep a connection alive once it is idling.
1517    ///
1518    /// Defaults to 0.
1519    pub fn with_idle_connection_timeout(mut self, timeout: Duration) -> Self {
1520        self.pool_config.idle_connection_timeout = timeout;
1521        self
1522    }
1523}
1524
1525/// Possible errors when trying to establish or upgrade an outbound connection.
1526#[derive(Debug)]
1527pub enum DialError {
1528    /// The peer identity obtained on the connection matches the local peer.
1529    LocalPeerId {
1530        endpoint: ConnectedPoint,
1531    },
1532    /// No addresses have been provided by [`NetworkBehaviour::handle_pending_outbound_connection`] and [`DialOpts`].
1533    NoAddresses,
1534    /// The provided [`dial_opts::PeerCondition`] evaluated to false and thus
1535    /// the dial was aborted.
1536    DialPeerConditionFalse(dial_opts::PeerCondition),
1537    /// Pending connection attempt has been aborted.
1538    Aborted,
1539    /// The peer identity obtained on the connection did not match the one that was expected.
1540    WrongPeerId {
1541        obtained: PeerId,
1542        endpoint: ConnectedPoint,
1543    },
1544    Denied {
1545        cause: ConnectionDenied,
1546    },
1547    /// An error occurred while negotiating the transport protocol(s) on a connection.
1548    Transport(Vec<(Multiaddr, TransportError<io::Error>)>),
1549}
1550
1551impl From<PendingOutboundConnectionError> for DialError {
1552    fn from(error: PendingOutboundConnectionError) -> Self {
1553        match error {
1554            PendingConnectionError::Aborted => DialError::Aborted,
1555            PendingConnectionError::WrongPeerId { obtained, endpoint } => {
1556                DialError::WrongPeerId { obtained, endpoint }
1557            }
1558            PendingConnectionError::LocalPeerId { endpoint } => DialError::LocalPeerId { endpoint },
1559            PendingConnectionError::Transport(e) => DialError::Transport(e),
1560        }
1561    }
1562}
1563
1564impl fmt::Display for DialError {
1565    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1566        match self {
1567            DialError::NoAddresses => write!(f, "Dial error: no addresses for peer."),
1568            DialError::LocalPeerId { endpoint } => write!(
1569                f,
1570                "Dial error: tried to dial local peer id at {endpoint:?}."
1571            ),
1572            DialError::DialPeerConditionFalse(PeerCondition::Disconnected) => write!(f, "Dial error: dial condition was configured to only happen when disconnected (`PeerCondition::Disconnected`), but node is already connected, thus cancelling new dial."),
1573            DialError::DialPeerConditionFalse(PeerCondition::NotDialing) => write!(f, "Dial error: dial condition was configured to only happen if there is currently no ongoing dialing attempt (`PeerCondition::NotDialing`), but a dial is in progress, thus cancelling new dial."),
1574            DialError::DialPeerConditionFalse(PeerCondition::DisconnectedAndNotDialing) => write!(f, "Dial error: dial condition was configured to only happen when both disconnected (`PeerCondition::Disconnected`) and there is currently no ongoing dialing attempt (`PeerCondition::NotDialing`), but node is already connected or dial is in progress, thus cancelling new dial."),
1575            DialError::DialPeerConditionFalse(PeerCondition::Always) => unreachable!("Dial peer condition is by definition true."),
1576            DialError::Aborted => write!(
1577                f,
1578                "Dial error: Pending connection attempt has been aborted."
1579            ),
1580            DialError::WrongPeerId { obtained, endpoint } => write!(
1581                f,
1582                "Dial error: Unexpected peer ID {obtained} at {endpoint:?}."
1583            ),
1584            DialError::Transport(errors) => {
1585                write!(f, "Failed to negotiate transport protocol(s): [")?;
1586
1587                for (addr, error) in errors {
1588                    write!(f, "({addr}")?;
1589                    print_error_chain(f, error)?;
1590                    write!(f, ")")?;
1591                }
1592                write!(f, "]")?;
1593
1594                Ok(())
1595            }
1596            DialError::Denied { .. } => {
1597                write!(f, "Dial error")
1598            }
1599        }
1600    }
1601}
1602
1603fn print_error_chain(f: &mut fmt::Formatter<'_>, e: &dyn error::Error) -> fmt::Result {
1604    write!(f, ": {e}")?;
1605
1606    if let Some(source) = e.source() {
1607        print_error_chain(f, source)?;
1608    }
1609
1610    Ok(())
1611}
1612
1613impl error::Error for DialError {
1614    fn source(&self) -> Option<&(dyn error::Error + 'static)> {
1615        match self {
1616            DialError::LocalPeerId { .. } => None,
1617            DialError::NoAddresses => None,
1618            DialError::DialPeerConditionFalse(_) => None,
1619            DialError::Aborted => None,
1620            DialError::WrongPeerId { .. } => None,
1621            DialError::Transport(_) => None,
1622            DialError::Denied { cause } => Some(cause),
1623        }
1624    }
1625}
1626
1627/// Possible errors when upgrading an inbound connection.
1628#[derive(Debug)]
1629pub enum ListenError {
1630    /// Pending connection attempt has been aborted.
1631    Aborted,
1632    /// The peer identity obtained on the connection did not match the one that was expected.
1633    WrongPeerId {
1634        obtained: PeerId,
1635        endpoint: ConnectedPoint,
1636    },
1637    /// The connection was dropped because it resolved to our own [`PeerId`].
1638    LocalPeerId {
1639        endpoint: ConnectedPoint,
1640    },
1641    Denied {
1642        cause: ConnectionDenied,
1643    },
1644    /// An error occurred while negotiating the transport protocol(s) on a connection.
1645    Transport(TransportError<io::Error>),
1646}
1647
1648impl From<PendingInboundConnectionError> for ListenError {
1649    fn from(error: PendingInboundConnectionError) -> Self {
1650        match error {
1651            PendingInboundConnectionError::Transport(inner) => ListenError::Transport(inner),
1652            PendingInboundConnectionError::Aborted => ListenError::Aborted,
1653            PendingInboundConnectionError::WrongPeerId { obtained, endpoint } => {
1654                ListenError::WrongPeerId { obtained, endpoint }
1655            }
1656            PendingInboundConnectionError::LocalPeerId { endpoint } => {
1657                ListenError::LocalPeerId { endpoint }
1658            }
1659        }
1660    }
1661}
1662
1663impl fmt::Display for ListenError {
1664    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1665        match self {
1666            ListenError::Aborted => write!(
1667                f,
1668                "Listen error: Pending connection attempt has been aborted."
1669            ),
1670            ListenError::WrongPeerId { obtained, endpoint } => write!(
1671                f,
1672                "Listen error: Unexpected peer ID {obtained} at {endpoint:?}."
1673            ),
1674            ListenError::Transport(_) => {
1675                write!(f, "Listen error: Failed to negotiate transport protocol(s)")
1676            }
1677            ListenError::Denied { cause } => {
1678                write!(f, "Listen error: Denied: {cause}")
1679            }
1680            ListenError::LocalPeerId { endpoint } => {
1681                write!(f, "Listen error: Local peer ID at {endpoint:?}.")
1682            }
1683        }
1684    }
1685}
1686
1687impl error::Error for ListenError {
1688    fn source(&self) -> Option<&(dyn error::Error + 'static)> {
1689        match self {
1690            ListenError::WrongPeerId { .. } => None,
1691            ListenError::Transport(err) => Some(err),
1692            ListenError::Aborted => None,
1693            ListenError::Denied { cause } => Some(cause),
1694            ListenError::LocalPeerId { .. } => None,
1695        }
1696    }
1697}
1698
1699/// A connection was denied.
1700///
1701/// To figure out which [`NetworkBehaviour`] denied the connection, use [`ConnectionDenied::downcast`].
1702#[derive(Debug)]
1703pub struct ConnectionDenied {
1704    inner: Box<dyn error::Error + Send + Sync + 'static>,
1705}
1706
1707impl ConnectionDenied {
1708    pub fn new(cause: impl Into<Box<dyn error::Error + Send + Sync + 'static>>) -> Self {
1709        Self {
1710            inner: cause.into(),
1711        }
1712    }
1713
1714    /// Attempt to downcast to a particular reason for why the connection was denied.
1715    pub fn downcast<E>(self) -> Result<E, Self>
1716    where
1717        E: error::Error + Send + Sync + 'static,
1718    {
1719        let inner = self
1720            .inner
1721            .downcast::<E>()
1722            .map_err(|inner| ConnectionDenied { inner })?;
1723
1724        Ok(*inner)
1725    }
1726
1727    /// Attempt to downcast to a particular reason for why the connection was denied.
1728    pub fn downcast_ref<E>(&self) -> Option<&E>
1729    where
1730        E: error::Error + Send + Sync + 'static,
1731    {
1732        self.inner.downcast_ref::<E>()
1733    }
1734}
1735
1736impl fmt::Display for ConnectionDenied {
1737    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1738        write!(f, "connection denied")
1739    }
1740}
1741
1742impl error::Error for ConnectionDenied {
1743    fn source(&self) -> Option<&(dyn error::Error + 'static)> {
1744        Some(self.inner.as_ref())
1745    }
1746}
1747
1748/// Information about the connections obtained by [`Swarm::network_info()`].
1749#[derive(Clone, Debug)]
1750pub struct NetworkInfo {
1751    /// The total number of connected peers.
1752    num_peers: usize,
1753    /// Counters of ongoing network connections.
1754    connection_counters: ConnectionCounters,
1755}
1756
1757impl NetworkInfo {
1758    /// The number of connected peers, i.e. peers with whom at least
1759    /// one established connection exists.
1760    pub fn num_peers(&self) -> usize {
1761        self.num_peers
1762    }
1763
1764    /// Gets counters for ongoing network connections.
1765    pub fn connection_counters(&self) -> &ConnectionCounters {
1766        &self.connection_counters
1767    }
1768}
1769
1770#[cfg(test)]
1771mod tests {
1772    use super::*;
1773    use crate::test::{CallTraceBehaviour, MockBehaviour};
1774    use libp2p_core::multiaddr::multiaddr;
1775    use libp2p_core::transport::memory::MemoryTransportError;
1776    use libp2p_core::{multiaddr, upgrade};
1777    use libp2p_identity as identity;
1778    use libp2p_plaintext as plaintext;
1779    use libp2p_yamux as yamux;
1780    use quickcheck::*;
1781
1782    // Test execution state.
1783    // Connection => Disconnecting => Connecting.
1784    enum State {
1785        Connecting,
1786        Disconnecting,
1787    }
1788
1789    fn new_test_swarm(
1790        config: Config,
1791    ) -> Swarm<CallTraceBehaviour<MockBehaviour<dummy::ConnectionHandler, ()>>> {
1792        let id_keys = identity::Keypair::generate_ed25519();
1793        let local_public_key = id_keys.public();
1794        let transport = transport::MemoryTransport::default()
1795            .upgrade(upgrade::Version::V1)
1796            .authenticate(plaintext::Config::new(&id_keys))
1797            .multiplex(yamux::Config::default())
1798            .boxed();
1799        let behaviour = CallTraceBehaviour::new(MockBehaviour::new(dummy::ConnectionHandler));
1800
1801        Swarm::new(
1802            transport,
1803            behaviour,
1804            local_public_key.into(),
1805            config.with_idle_connection_timeout(Duration::from_secs(5)),
1806        )
1807    }
1808
1809    fn swarms_connected<TBehaviour>(
1810        swarm1: &Swarm<CallTraceBehaviour<TBehaviour>>,
1811        swarm2: &Swarm<CallTraceBehaviour<TBehaviour>>,
1812        num_connections: usize,
1813    ) -> bool
1814    where
1815        TBehaviour: NetworkBehaviour,
1816        THandlerOutEvent<TBehaviour>: Clone,
1817    {
1818        swarm1
1819            .behaviour()
1820            .num_connections_to_peer(*swarm2.local_peer_id())
1821            == num_connections
1822            && swarm2
1823                .behaviour()
1824                .num_connections_to_peer(*swarm1.local_peer_id())
1825                == num_connections
1826            && swarm1.is_connected(swarm2.local_peer_id())
1827            && swarm2.is_connected(swarm1.local_peer_id())
1828    }
1829
1830    fn swarms_disconnected<TBehaviour>(
1831        swarm1: &Swarm<CallTraceBehaviour<TBehaviour>>,
1832        swarm2: &Swarm<CallTraceBehaviour<TBehaviour>>,
1833    ) -> bool
1834    where
1835        TBehaviour: NetworkBehaviour,
1836        THandlerOutEvent<TBehaviour>: Clone,
1837    {
1838        swarm1
1839            .behaviour()
1840            .num_connections_to_peer(*swarm2.local_peer_id())
1841            == 0
1842            && swarm2
1843                .behaviour()
1844                .num_connections_to_peer(*swarm1.local_peer_id())
1845                == 0
1846            && !swarm1.is_connected(swarm2.local_peer_id())
1847            && !swarm2.is_connected(swarm1.local_peer_id())
1848    }
1849
1850    /// Establishes multiple connections between two peers,
1851    /// after which one peer disconnects the other using [`Swarm::disconnect_peer_id`].
1852    ///
1853    /// The test expects both behaviours to be notified via calls to [`NetworkBehaviour::on_swarm_event`]
1854    /// with pairs of [`FromSwarm::ConnectionEstablished`] / [`FromSwarm::ConnectionClosed`]
1855    #[tokio::test]
1856    async fn test_swarm_disconnect() {
1857        let mut swarm1 = new_test_swarm(Config::with_tokio_executor());
1858        let mut swarm2 = new_test_swarm(Config::with_tokio_executor());
1859
1860        let addr1: Multiaddr = multiaddr::Protocol::Memory(rand::random::<u64>()).into();
1861        let addr2: Multiaddr = multiaddr::Protocol::Memory(rand::random::<u64>()).into();
1862
1863        swarm1.listen_on(addr1.clone()).unwrap();
1864        swarm2.listen_on(addr2.clone()).unwrap();
1865
1866        let swarm1_id = *swarm1.local_peer_id();
1867
1868        let mut reconnected = false;
1869        let num_connections = 10;
1870
1871        for _ in 0..num_connections {
1872            swarm1.dial(addr2.clone()).unwrap();
1873        }
1874        let mut state = State::Connecting;
1875
1876        future::poll_fn(move |cx| loop {
1877            let poll1 = Swarm::poll_next_event(Pin::new(&mut swarm1), cx);
1878            let poll2 = Swarm::poll_next_event(Pin::new(&mut swarm2), cx);
1879            match state {
1880                State::Connecting => {
1881                    if swarms_connected(&swarm1, &swarm2, num_connections) {
1882                        if reconnected {
1883                            return Poll::Ready(());
1884                        }
1885                        swarm2
1886                            .disconnect_peer_id(swarm1_id)
1887                            .expect("Error disconnecting");
1888                        state = State::Disconnecting;
1889                    }
1890                }
1891                State::Disconnecting => {
1892                    if swarms_disconnected(&swarm1, &swarm2) {
1893                        if reconnected {
1894                            return Poll::Ready(());
1895                        }
1896                        reconnected = true;
1897                        for _ in 0..num_connections {
1898                            swarm2.dial(addr1.clone()).unwrap();
1899                        }
1900                        state = State::Connecting;
1901                    }
1902                }
1903            }
1904
1905            if poll1.is_pending() && poll2.is_pending() {
1906                return Poll::Pending;
1907            }
1908        })
1909        .await
1910    }
1911
1912    /// Establishes multiple connections between two peers,
1913    /// after which one peer disconnects the other
1914    /// using [`ToSwarm::CloseConnection`] returned by a [`NetworkBehaviour`].
1915    ///
1916    /// The test expects both behaviours to be notified via calls to [`NetworkBehaviour::on_swarm_event`]
1917    /// with pairs of [`FromSwarm::ConnectionEstablished`] / [`FromSwarm::ConnectionClosed`]
1918    #[tokio::test]
1919    async fn test_behaviour_disconnect_all() {
1920        let mut swarm1 = new_test_swarm(Config::with_tokio_executor());
1921        let mut swarm2 = new_test_swarm(Config::with_tokio_executor());
1922
1923        let addr1: Multiaddr = multiaddr::Protocol::Memory(rand::random::<u64>()).into();
1924        let addr2: Multiaddr = multiaddr::Protocol::Memory(rand::random::<u64>()).into();
1925
1926        swarm1.listen_on(addr1.clone()).unwrap();
1927        swarm2.listen_on(addr2.clone()).unwrap();
1928
1929        let swarm1_id = *swarm1.local_peer_id();
1930
1931        let mut reconnected = false;
1932        let num_connections = 10;
1933
1934        for _ in 0..num_connections {
1935            swarm1.dial(addr2.clone()).unwrap();
1936        }
1937        let mut state = State::Connecting;
1938
1939        future::poll_fn(move |cx| loop {
1940            let poll1 = Swarm::poll_next_event(Pin::new(&mut swarm1), cx);
1941            let poll2 = Swarm::poll_next_event(Pin::new(&mut swarm2), cx);
1942            match state {
1943                State::Connecting => {
1944                    if swarms_connected(&swarm1, &swarm2, num_connections) {
1945                        if reconnected {
1946                            return Poll::Ready(());
1947                        }
1948                        swarm2
1949                            .behaviour
1950                            .inner()
1951                            .next_action
1952                            .replace(ToSwarm::CloseConnection {
1953                                peer_id: swarm1_id,
1954                                connection: CloseConnection::All,
1955                            });
1956                        state = State::Disconnecting;
1957                        continue;
1958                    }
1959                }
1960                State::Disconnecting => {
1961                    if swarms_disconnected(&swarm1, &swarm2) {
1962                        reconnected = true;
1963                        for _ in 0..num_connections {
1964                            swarm2.dial(addr1.clone()).unwrap();
1965                        }
1966                        state = State::Connecting;
1967                        continue;
1968                    }
1969                }
1970            }
1971
1972            if poll1.is_pending() && poll2.is_pending() {
1973                return Poll::Pending;
1974            }
1975        })
1976        .await
1977    }
1978
1979    /// Establishes multiple connections between two peers,
1980    /// after which one peer closes a single connection
1981    /// using [`ToSwarm::CloseConnection`] returned by a [`NetworkBehaviour`].
1982    ///
1983    /// The test expects both behaviours to be notified via calls to [`NetworkBehaviour::on_swarm_event`]
1984    /// with pairs of [`FromSwarm::ConnectionEstablished`] / [`FromSwarm::ConnectionClosed`]
1985    #[tokio::test]
1986    async fn test_behaviour_disconnect_one() {
1987        let mut swarm1 = new_test_swarm(Config::with_tokio_executor());
1988        let mut swarm2 = new_test_swarm(Config::with_tokio_executor());
1989
1990        let addr1: Multiaddr = multiaddr::Protocol::Memory(rand::random::<u64>()).into();
1991        let addr2: Multiaddr = multiaddr::Protocol::Memory(rand::random::<u64>()).into();
1992
1993        swarm1.listen_on(addr1).unwrap();
1994        swarm2.listen_on(addr2.clone()).unwrap();
1995
1996        let swarm1_id = *swarm1.local_peer_id();
1997
1998        let num_connections = 10;
1999
2000        for _ in 0..num_connections {
2001            swarm1.dial(addr2.clone()).unwrap();
2002        }
2003        let mut state = State::Connecting;
2004        let mut disconnected_conn_id = None;
2005
2006        future::poll_fn(move |cx| loop {
2007            let poll1 = Swarm::poll_next_event(Pin::new(&mut swarm1), cx);
2008            let poll2 = Swarm::poll_next_event(Pin::new(&mut swarm2), cx);
2009            match state {
2010                State::Connecting => {
2011                    if swarms_connected(&swarm1, &swarm2, num_connections) {
2012                        disconnected_conn_id = {
2013                            let conn_id =
2014                                swarm2.behaviour.on_connection_established[num_connections / 2].1;
2015                            swarm2.behaviour.inner().next_action.replace(
2016                                ToSwarm::CloseConnection {
2017                                    peer_id: swarm1_id,
2018                                    connection: CloseConnection::One(conn_id),
2019                                },
2020                            );
2021                            Some(conn_id)
2022                        };
2023                        state = State::Disconnecting;
2024                    }
2025                }
2026                State::Disconnecting => {
2027                    for s in &[&swarm1, &swarm2] {
2028                        assert!(s
2029                            .behaviour
2030                            .on_connection_closed
2031                            .iter()
2032                            .all(|(.., remaining_conns)| *remaining_conns > 0));
2033                        assert_eq!(s.behaviour.on_connection_established.len(), num_connections);
2034                        s.behaviour.assert_connected(num_connections, 1);
2035                    }
2036                    if [&swarm1, &swarm2]
2037                        .iter()
2038                        .all(|s| s.behaviour.on_connection_closed.len() == 1)
2039                    {
2040                        let conn_id = swarm2.behaviour.on_connection_closed[0].1;
2041                        assert_eq!(Some(conn_id), disconnected_conn_id);
2042                        return Poll::Ready(());
2043                    }
2044                }
2045            }
2046
2047            if poll1.is_pending() && poll2.is_pending() {
2048                return Poll::Pending;
2049            }
2050        })
2051        .await
2052    }
2053
2054    #[test]
2055    fn concurrent_dialing() {
2056        #[derive(Clone, Debug)]
2057        struct DialConcurrencyFactor(NonZeroU8);
2058
2059        impl Arbitrary for DialConcurrencyFactor {
2060            fn arbitrary(g: &mut Gen) -> Self {
2061                Self(NonZeroU8::new(g.gen_range(1..11)).unwrap())
2062            }
2063        }
2064
2065        fn prop(concurrency_factor: DialConcurrencyFactor) {
2066            tokio::runtime::Runtime::new().unwrap().block_on(async {
2067                let mut swarm = new_test_swarm(
2068                    Config::with_tokio_executor()
2069                        .with_dial_concurrency_factor(concurrency_factor.0),
2070                );
2071
2072                // Listen on `concurrency_factor + 1` addresses.
2073                //
2074                // `+ 2` to ensure a subset of addresses is dialed by network_2.
2075                let num_listen_addrs = concurrency_factor.0.get() + 2;
2076                let mut listen_addresses = Vec::new();
2077                let mut transports = Vec::new();
2078                for _ in 0..num_listen_addrs {
2079                    let mut transport = transport::MemoryTransport::default().boxed();
2080                    transport
2081                        .listen_on(ListenerId::next(), "/memory/0".parse().unwrap())
2082                        .unwrap();
2083
2084                    match transport.select_next_some().await {
2085                        TransportEvent::NewAddress { listen_addr, .. } => {
2086                            listen_addresses.push(listen_addr);
2087                        }
2088                        _ => panic!("Expected `NewListenAddr` event."),
2089                    }
2090
2091                    transports.push(transport);
2092                }
2093
2094                // Have swarm dial each listener and wait for each listener to receive the incoming
2095                // connections.
2096                swarm
2097                    .dial(
2098                        DialOpts::peer_id(PeerId::random())
2099                            .addresses(listen_addresses)
2100                            .build(),
2101                    )
2102                    .unwrap();
2103                for mut transport in transports.into_iter() {
2104                    match futures::future::select(transport.select_next_some(), swarm.next()).await
2105                    {
2106                        future::Either::Left((TransportEvent::Incoming { .. }, _)) => {}
2107                        future::Either::Left(_) => {
2108                            panic!("Unexpected transport event.")
2109                        }
2110                        future::Either::Right((e, _)) => {
2111                            panic!("Expect swarm to not emit any event {e:?}")
2112                        }
2113                    }
2114                }
2115
2116                match swarm.next().await.unwrap() {
2117                    SwarmEvent::OutgoingConnectionError { .. } => {}
2118                    e => panic!("Unexpected swarm event {e:?}"),
2119                }
2120            })
2121        }
2122
2123        QuickCheck::new().tests(10).quickcheck(prop as fn(_) -> _);
2124    }
2125
2126    #[tokio::test]
2127    async fn invalid_peer_id() {
2128        // Checks whether dialing an address containing the wrong peer id raises an error
2129        // for the expected peer id instead of the obtained peer id.
2130
2131        let mut swarm1 = new_test_swarm(Config::with_tokio_executor());
2132        let mut swarm2 = new_test_swarm(Config::with_tokio_executor());
2133
2134        swarm1.listen_on("/memory/0".parse().unwrap()).unwrap();
2135
2136        let address = future::poll_fn(|cx| match swarm1.poll_next_unpin(cx) {
2137            Poll::Ready(Some(SwarmEvent::NewListenAddr { address, .. })) => Poll::Ready(address),
2138            Poll::Pending => Poll::Pending,
2139            _ => panic!("Was expecting the listen address to be reported"),
2140        })
2141        .await;
2142
2143        let other_id = PeerId::random();
2144        let other_addr = address.with(multiaddr::Protocol::P2p(other_id));
2145
2146        swarm2.dial(other_addr.clone()).unwrap();
2147
2148        let (peer_id, error) = future::poll_fn(|cx| {
2149            if let Poll::Ready(Some(SwarmEvent::IncomingConnection { .. })) =
2150                swarm1.poll_next_unpin(cx)
2151            {}
2152
2153            match swarm2.poll_next_unpin(cx) {
2154                Poll::Ready(Some(SwarmEvent::OutgoingConnectionError {
2155                    peer_id, error, ..
2156                })) => Poll::Ready((peer_id, error)),
2157                Poll::Ready(x) => panic!("unexpected {x:?}"),
2158                Poll::Pending => Poll::Pending,
2159            }
2160        })
2161        .await;
2162        assert_eq!(peer_id.unwrap(), other_id);
2163        match error {
2164            DialError::WrongPeerId { obtained, endpoint } => {
2165                assert_eq!(obtained, *swarm1.local_peer_id());
2166                assert_eq!(
2167                    endpoint,
2168                    ConnectedPoint::Dialer {
2169                        address: other_addr,
2170                        role_override: Endpoint::Dialer,
2171                    }
2172                );
2173            }
2174            x => panic!("wrong error {x:?}"),
2175        }
2176    }
2177
2178    #[tokio::test]
2179    async fn dial_self() {
2180        // Check whether dialing ourselves correctly fails.
2181        //
2182        // Dialing the same address we're listening should result in three events:
2183        //
2184        // - The incoming connection notification (before we know the incoming peer ID).
2185        // - The connection error for the dialing endpoint (once we've determined that it's our own ID).
2186        // - The connection error for the listening endpoint (once we've determined that it's our own ID).
2187        //
2188        // The last two can happen in any order.
2189
2190        let mut swarm = new_test_swarm(Config::with_tokio_executor());
2191        swarm.listen_on("/memory/0".parse().unwrap()).unwrap();
2192
2193        let local_address = future::poll_fn(|cx| match swarm.poll_next_unpin(cx) {
2194            Poll::Ready(Some(SwarmEvent::NewListenAddr { address, .. })) => Poll::Ready(address),
2195            Poll::Pending => Poll::Pending,
2196            _ => panic!("Was expecting the listen address to be reported"),
2197        })
2198        .await;
2199
2200        swarm.listened_addrs.clear(); // This is a hack to actually execute the dial to ourselves which would otherwise be filtered.
2201
2202        swarm.dial(local_address.clone()).unwrap();
2203
2204        let mut got_dial_err = false;
2205        let mut got_inc_err = false;
2206        future::poll_fn(|cx| -> Poll<Result<(), io::Error>> {
2207            loop {
2208                match swarm.poll_next_unpin(cx) {
2209                    Poll::Ready(Some(SwarmEvent::OutgoingConnectionError {
2210                        peer_id,
2211                        error: DialError::LocalPeerId { .. },
2212                        ..
2213                    })) => {
2214                        assert_eq!(&peer_id.unwrap(), swarm.local_peer_id());
2215                        assert!(!got_dial_err);
2216                        got_dial_err = true;
2217                        if got_inc_err {
2218                            return Poll::Ready(Ok(()));
2219                        }
2220                    }
2221                    Poll::Ready(Some(SwarmEvent::IncomingConnectionError {
2222                        local_addr, ..
2223                    })) => {
2224                        assert!(!got_inc_err);
2225                        assert_eq!(local_addr, local_address);
2226                        got_inc_err = true;
2227                        if got_dial_err {
2228                            return Poll::Ready(Ok(()));
2229                        }
2230                    }
2231                    Poll::Ready(Some(SwarmEvent::IncomingConnection { local_addr, .. })) => {
2232                        assert_eq!(local_addr, local_address);
2233                    }
2234                    Poll::Ready(ev) => {
2235                        panic!("Unexpected event: {ev:?}")
2236                    }
2237                    Poll::Pending => break Poll::Pending,
2238                }
2239            }
2240        })
2241        .await
2242        .unwrap();
2243    }
2244
2245    #[tokio::test]
2246    async fn dial_self_by_id() {
2247        // Trying to dial self by passing the same `PeerId` shouldn't even be possible in the first
2248        // place.
2249        let swarm = new_test_swarm(Config::with_tokio_executor());
2250        let peer_id = *swarm.local_peer_id();
2251        assert!(!swarm.is_connected(&peer_id));
2252    }
2253
2254    #[tokio::test]
2255    async fn multiple_addresses_err() {
2256        // Tries dialing multiple addresses, and makes sure there's one dialing error per address.
2257
2258        let target = PeerId::random();
2259
2260        let mut swarm = new_test_swarm(Config::with_tokio_executor());
2261
2262        let addresses = HashSet::from([
2263            multiaddr![Ip4([0, 0, 0, 0]), Tcp(rand::random::<u16>())],
2264            multiaddr![Ip4([0, 0, 0, 0]), Tcp(rand::random::<u16>())],
2265            multiaddr![Ip4([0, 0, 0, 0]), Tcp(rand::random::<u16>())],
2266            multiaddr![Udp(rand::random::<u16>())],
2267            multiaddr![Udp(rand::random::<u16>())],
2268            multiaddr![Udp(rand::random::<u16>())],
2269            multiaddr![Udp(rand::random::<u16>())],
2270            multiaddr![Udp(rand::random::<u16>())],
2271        ]);
2272
2273        swarm
2274            .dial(
2275                DialOpts::peer_id(target)
2276                    .addresses(addresses.iter().cloned().collect())
2277                    .build(),
2278            )
2279            .unwrap();
2280
2281        match swarm.next().await.unwrap() {
2282            SwarmEvent::OutgoingConnectionError {
2283                peer_id,
2284                // multiaddr,
2285                error: DialError::Transport(errors),
2286                ..
2287            } => {
2288                assert_eq!(target, peer_id.unwrap());
2289
2290                let failed_addresses = errors.into_iter().map(|(addr, _)| addr).collect::<Vec<_>>();
2291                let expected_addresses = addresses
2292                    .into_iter()
2293                    .map(|addr| addr.with(multiaddr::Protocol::P2p(target)))
2294                    .collect::<Vec<_>>();
2295
2296                assert_eq!(expected_addresses, failed_addresses);
2297            }
2298            e => panic!("Unexpected event: {e:?}"),
2299        }
2300    }
2301
2302    #[tokio::test]
2303    async fn aborting_pending_connection_surfaces_error() {
2304        let _ = tracing_subscriber::fmt()
2305            .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
2306            .try_init();
2307
2308        let mut dialer = new_test_swarm(Config::with_tokio_executor());
2309        let mut listener = new_test_swarm(Config::with_tokio_executor());
2310
2311        let listener_peer_id = *listener.local_peer_id();
2312        listener.listen_on(multiaddr![Memory(0u64)]).unwrap();
2313        let listener_address = match listener.next().await.unwrap() {
2314            SwarmEvent::NewListenAddr { address, .. } => address,
2315            e => panic!("Unexpected network event: {e:?}"),
2316        };
2317
2318        dialer
2319            .dial(
2320                DialOpts::peer_id(listener_peer_id)
2321                    .addresses(vec![listener_address])
2322                    .build(),
2323            )
2324            .unwrap();
2325
2326        dialer
2327            .disconnect_peer_id(listener_peer_id)
2328            .expect_err("Expect peer to not yet be connected.");
2329
2330        match dialer.next().await.unwrap() {
2331            SwarmEvent::OutgoingConnectionError {
2332                error: DialError::Aborted,
2333                ..
2334            } => {}
2335            e => panic!("Unexpected swarm event {e:?}."),
2336        }
2337    }
2338
2339    #[test]
2340    fn dial_error_prints_sources() {
2341        // This constitutes a fairly typical error for chained transports.
2342        let error = DialError::Transport(vec![(
2343            "/ip4/127.0.0.1/tcp/80".parse().unwrap(),
2344            TransportError::Other(io::Error::new(
2345                io::ErrorKind::Other,
2346                MemoryTransportError::Unreachable,
2347            )),
2348        )]);
2349
2350        let string = format!("{error}");
2351
2352        // Unfortunately, we have some "empty" errors that lead to multiple colons without text but that is the best we can do.
2353        assert_eq!("Failed to negotiate transport protocol(s): [(/ip4/127.0.0.1/tcp/80: : No listener on the given port.)]", string)
2354    }
2355}