1#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))]
57
58mod connection;
59mod executor;
60mod stream;
61mod stream_protocol;
62#[cfg(test)]
63mod test;
64mod upgrade;
65
66pub mod behaviour;
67pub mod dial_opts;
68pub mod dummy;
69pub mod handler;
70mod listen_opts;
71
72#[doc(hidden)]
74pub mod derive_prelude {
75 pub use crate::behaviour::AddressChange;
76 pub use crate::behaviour::ConnectionClosed;
77 pub use crate::behaviour::ConnectionEstablished;
78 pub use crate::behaviour::DialFailure;
79 pub use crate::behaviour::ExpiredListenAddr;
80 pub use crate::behaviour::ExternalAddrConfirmed;
81 pub use crate::behaviour::ExternalAddrExpired;
82 pub use crate::behaviour::FromSwarm;
83 pub use crate::behaviour::ListenFailure;
84 pub use crate::behaviour::ListenerClosed;
85 pub use crate::behaviour::ListenerError;
86 pub use crate::behaviour::NewExternalAddrCandidate;
87 pub use crate::behaviour::NewExternalAddrOfPeer;
88 pub use crate::behaviour::NewListenAddr;
89 pub use crate::behaviour::NewListener;
90 pub use crate::connection::ConnectionId;
91 pub use crate::ConnectionDenied;
92 pub use crate::ConnectionHandler;
93 pub use crate::ConnectionHandlerSelect;
94 pub use crate::DialError;
95 pub use crate::NetworkBehaviour;
96 pub use crate::THandler;
97 pub use crate::THandlerInEvent;
98 pub use crate::THandlerOutEvent;
99 pub use crate::ToSwarm;
100 pub use either::Either;
101 pub use futures::prelude as futures;
102 pub use libp2p_core::transport::ListenerId;
103 pub use libp2p_core::ConnectedPoint;
104 pub use libp2p_core::Endpoint;
105 pub use libp2p_core::Multiaddr;
106 pub use libp2p_identity::PeerId;
107}
108
109pub use behaviour::{
110 AddressChange, CloseConnection, ConnectionClosed, DialFailure, ExpiredListenAddr,
111 ExternalAddrExpired, ExternalAddresses, FromSwarm, ListenAddresses, ListenFailure,
112 ListenerClosed, ListenerError, NetworkBehaviour, NewExternalAddrCandidate,
113 NewExternalAddrOfPeer, NewListenAddr, NotifyHandler, PeerAddresses, ToSwarm,
114};
115pub use connection::pool::ConnectionCounters;
116pub use connection::{ConnectionError, ConnectionId, SupportedProtocols};
117pub use executor::Executor;
118pub use handler::{
119 ConnectionHandler, ConnectionHandlerEvent, ConnectionHandlerSelect, OneShotHandler,
120 OneShotHandlerConfig, StreamUpgradeError, SubstreamProtocol,
121};
122#[cfg(feature = "macros")]
123pub use libp2p_swarm_derive::NetworkBehaviour;
124pub use listen_opts::ListenOpts;
125pub use stream::Stream;
126pub use stream_protocol::{InvalidProtocol, StreamProtocol};
127
128use crate::behaviour::ExternalAddrConfirmed;
129use crate::handler::UpgradeInfoSend;
130use connection::pool::{EstablishedConnection, Pool, PoolConfig, PoolEvent};
131use connection::IncomingInfo;
132use connection::{
133 PendingConnectionError, PendingInboundConnectionError, PendingOutboundConnectionError,
134};
135use dial_opts::{DialOpts, PeerCondition};
136use futures::{prelude::*, stream::FusedStream};
137use libp2p_core::{
138 connection::ConnectedPoint,
139 muxing::StreamMuxerBox,
140 transport::{self, ListenerId, TransportError, TransportEvent},
141 Endpoint, Multiaddr, Transport,
142};
143use libp2p_identity::PeerId;
144use smallvec::SmallVec;
145use std::collections::{HashMap, HashSet, VecDeque};
146use std::num::{NonZeroU32, NonZeroU8, NonZeroUsize};
147use std::time::Duration;
148use std::{
149 error, fmt, io,
150 pin::Pin,
151 task::{Context, Poll},
152};
153use tracing::Instrument;
154
155type TBehaviourOutEvent<TBehaviour> = <TBehaviour as NetworkBehaviour>::ToSwarm;
157
158pub type THandler<TBehaviour> = <TBehaviour as NetworkBehaviour>::ConnectionHandler;
161
162pub type THandlerInEvent<TBehaviour> = <THandler<TBehaviour> as ConnectionHandler>::FromBehaviour;
165
166pub type THandlerOutEvent<TBehaviour> = <THandler<TBehaviour> as ConnectionHandler>::ToBehaviour;
168
169#[derive(Debug)]
171#[non_exhaustive]
172pub enum SwarmEvent<TBehaviourOutEvent> {
173 Behaviour(TBehaviourOutEvent),
175 ConnectionEstablished {
177 peer_id: PeerId,
179 connection_id: ConnectionId,
181 endpoint: ConnectedPoint,
183 num_established: NonZeroU32,
186 concurrent_dial_errors: Option<Vec<(Multiaddr, TransportError<io::Error>)>>,
190 established_in: std::time::Duration,
192 },
193 ConnectionClosed {
196 peer_id: PeerId,
198 connection_id: ConnectionId,
200 endpoint: ConnectedPoint,
202 num_established: u32,
204 cause: Option<ConnectionError>,
207 },
208 IncomingConnection {
214 connection_id: ConnectionId,
216 local_addr: Multiaddr,
220 send_back_addr: Multiaddr,
222 },
223 IncomingConnectionError {
228 connection_id: ConnectionId,
230 local_addr: Multiaddr,
234 send_back_addr: Multiaddr,
236 error: ListenError,
238 },
239 OutgoingConnectionError {
241 connection_id: ConnectionId,
243 peer_id: Option<PeerId>,
245 error: DialError,
247 },
248 NewListenAddr {
250 listener_id: ListenerId,
252 address: Multiaddr,
254 },
255 ExpiredListenAddr {
257 listener_id: ListenerId,
259 address: Multiaddr,
261 },
262 ListenerClosed {
264 listener_id: ListenerId,
266 addresses: Vec<Multiaddr>,
270 reason: Result<(), io::Error>,
273 },
274 ListenerError {
276 listener_id: ListenerId,
278 error: io::Error,
280 },
281 Dialing {
289 peer_id: Option<PeerId>,
291
292 connection_id: ConnectionId,
294 },
295 NewExternalAddrCandidate { address: Multiaddr },
297 ExternalAddrConfirmed { address: Multiaddr },
299 ExternalAddrExpired { address: Multiaddr },
301 NewExternalAddrOfPeer { peer_id: PeerId, address: Multiaddr },
303}
304
305impl<TBehaviourOutEvent> SwarmEvent<TBehaviourOutEvent> {
306 #[allow(clippy::result_large_err)]
308 pub fn try_into_behaviour_event(self) -> Result<TBehaviourOutEvent, Self> {
309 match self {
310 SwarmEvent::Behaviour(inner) => Ok(inner),
311 other => Err(other),
312 }
313 }
314}
315
316pub struct Swarm<TBehaviour>
321where
322 TBehaviour: NetworkBehaviour,
323{
324 transport: transport::Boxed<(PeerId, StreamMuxerBox)>,
326
327 pool: Pool<THandler<TBehaviour>>,
329
330 local_peer_id: PeerId,
332
333 behaviour: TBehaviour,
336
337 supported_protocols: SmallVec<[Vec<u8>; 16]>,
339
340 confirmed_external_addr: HashSet<Multiaddr>,
341
342 listened_addrs: HashMap<ListenerId, SmallVec<[Multiaddr; 1]>>,
344
345 pending_handler_event: Option<(PeerId, PendingNotifyHandler, THandlerInEvent<TBehaviour>)>,
349
350 pending_swarm_events: VecDeque<SwarmEvent<TBehaviour::ToSwarm>>,
351}
352
353impl<TBehaviour> Unpin for Swarm<TBehaviour> where TBehaviour: NetworkBehaviour {}
354
355impl<TBehaviour> Swarm<TBehaviour>
356where
357 TBehaviour: NetworkBehaviour,
358{
359 pub fn new(
362 transport: transport::Boxed<(PeerId, StreamMuxerBox)>,
363 behaviour: TBehaviour,
364 local_peer_id: PeerId,
365 config: Config,
366 ) -> Self {
367 tracing::info!(%local_peer_id);
368
369 Swarm {
370 local_peer_id,
371 transport,
372 pool: Pool::new(local_peer_id, config.pool_config),
373 behaviour,
374 supported_protocols: Default::default(),
375 confirmed_external_addr: Default::default(),
376 listened_addrs: HashMap::new(),
377 pending_handler_event: None,
378 pending_swarm_events: VecDeque::default(),
379 }
380 }
381
382 pub fn network_info(&self) -> NetworkInfo {
384 let num_peers = self.pool.num_peers();
385 let connection_counters = self.pool.counters().clone();
386 NetworkInfo {
387 num_peers,
388 connection_counters,
389 }
390 }
391
392 pub fn listen_on(&mut self, addr: Multiaddr) -> Result<ListenerId, TransportError<io::Error>> {
398 let opts = ListenOpts::new(addr);
399 let id = opts.listener_id();
400 self.add_listener(opts)?;
401 Ok(id)
402 }
403
404 pub fn remove_listener(&mut self, listener_id: ListenerId) -> bool {
409 self.transport.remove_listener(listener_id)
410 }
411
412 pub fn dial(&mut self, opts: impl Into<DialOpts>) -> Result<(), DialError> {
440 let dial_opts = opts.into();
441
442 let peer_id = dial_opts.get_peer_id();
443 let condition = dial_opts.peer_condition();
444 let connection_id = dial_opts.connection_id();
445
446 let should_dial = match (condition, peer_id) {
447 (_, None) => true,
448 (PeerCondition::Always, _) => true,
449 (PeerCondition::Disconnected, Some(peer_id)) => !self.pool.is_connected(peer_id),
450 (PeerCondition::NotDialing, Some(peer_id)) => !self.pool.is_dialing(peer_id),
451 (PeerCondition::DisconnectedAndNotDialing, Some(peer_id)) => {
452 !self.pool.is_dialing(peer_id) && !self.pool.is_connected(peer_id)
453 }
454 };
455
456 if !should_dial {
457 let e = DialError::DialPeerConditionFalse(condition);
458
459 self.behaviour
460 .on_swarm_event(FromSwarm::DialFailure(DialFailure {
461 peer_id,
462 error: &e,
463 connection_id,
464 }));
465
466 return Err(e);
467 }
468
469 let addresses = {
470 let mut addresses_from_opts = dial_opts.get_addresses();
471
472 match self.behaviour.handle_pending_outbound_connection(
473 connection_id,
474 peer_id,
475 addresses_from_opts.as_slice(),
476 dial_opts.role_override(),
477 ) {
478 Ok(addresses) => {
479 if dial_opts.extend_addresses_through_behaviour() {
480 addresses_from_opts.extend(addresses)
481 } else {
482 let num_addresses = addresses.len();
483
484 if num_addresses > 0 {
485 tracing::debug!(
486 connection=%connection_id,
487 discarded_addresses_count=%num_addresses,
488 "discarding addresses from `NetworkBehaviour` because `DialOpts::extend_addresses_through_behaviour is `false` for connection"
489 )
490 }
491 }
492 }
493 Err(cause) => {
494 let error = DialError::Denied { cause };
495
496 self.behaviour
497 .on_swarm_event(FromSwarm::DialFailure(DialFailure {
498 peer_id,
499 error: &error,
500 connection_id,
501 }));
502
503 return Err(error);
504 }
505 }
506
507 let mut unique_addresses = HashSet::new();
508 addresses_from_opts.retain(|addr| {
509 !self.listened_addrs.values().flatten().any(|a| a == addr)
510 && unique_addresses.insert(addr.clone())
511 });
512
513 if addresses_from_opts.is_empty() {
514 let error = DialError::NoAddresses;
515 self.behaviour
516 .on_swarm_event(FromSwarm::DialFailure(DialFailure {
517 peer_id,
518 error: &error,
519 connection_id,
520 }));
521 return Err(error);
522 };
523
524 addresses_from_opts
525 };
526
527 let dials = addresses
528 .into_iter()
529 .map(|a| match peer_id.map_or(Ok(a.clone()), |p| a.with_p2p(p)) {
530 Ok(address) => {
531 let (dial, span) = match dial_opts.role_override() {
532 Endpoint::Dialer => (
533 self.transport.dial(address.clone()),
534 tracing::debug_span!(parent: tracing::Span::none(), "Transport::dial", %address),
535 ),
536 Endpoint::Listener => (
537 self.transport.dial_as_listener(address.clone()),
538 tracing::debug_span!(parent: tracing::Span::none(), "Transport::dial_as_listener", %address),
539 ),
540 };
541 span.follows_from(tracing::Span::current());
542
543 match dial {
544 Ok(fut) => fut
545 .map(|r| (address, r.map_err(TransportError::Other)))
546 .instrument(span)
547 .boxed(),
548 Err(err) => futures::future::ready((address, Err(err))).boxed(),
549 }
550 }
551 Err(address) => futures::future::ready((
552 address.clone(),
553 Err(TransportError::MultiaddrNotSupported(address)),
554 ))
555 .boxed(),
556 })
557 .collect();
558
559 self.pool.add_outgoing(
560 dials,
561 peer_id,
562 dial_opts.role_override(),
563 dial_opts.dial_concurrency_override(),
564 connection_id,
565 );
566
567 Ok(())
568 }
569
570 pub fn listeners(&self) -> impl Iterator<Item = &Multiaddr> {
572 self.listened_addrs.values().flatten()
573 }
574
575 pub fn local_peer_id(&self) -> &PeerId {
577 &self.local_peer_id
578 }
579
580 pub fn external_addresses(&self) -> impl Iterator<Item = &Multiaddr> {
582 self.confirmed_external_addr.iter()
583 }
584
585 fn add_listener(&mut self, opts: ListenOpts) -> Result<(), TransportError<io::Error>> {
586 let addr = opts.address();
587 let listener_id = opts.listener_id();
588
589 if let Err(e) = self.transport.listen_on(listener_id, addr.clone()) {
590 self.behaviour
591 .on_swarm_event(FromSwarm::ListenerError(behaviour::ListenerError {
592 listener_id,
593 err: &e,
594 }));
595
596 return Err(e);
597 }
598
599 self.behaviour
600 .on_swarm_event(FromSwarm::NewListener(behaviour::NewListener {
601 listener_id,
602 }));
603
604 Ok(())
605 }
606
607 pub fn add_external_address(&mut self, a: Multiaddr) {
612 self.behaviour
613 .on_swarm_event(FromSwarm::ExternalAddrConfirmed(ExternalAddrConfirmed {
614 addr: &a,
615 }));
616 self.confirmed_external_addr.insert(a);
617 }
618
619 pub fn remove_external_address(&mut self, addr: &Multiaddr) {
623 self.behaviour
624 .on_swarm_event(FromSwarm::ExternalAddrExpired(ExternalAddrExpired { addr }));
625 self.confirmed_external_addr.remove(addr);
626 }
627
628 pub fn add_peer_address(&mut self, peer_id: PeerId, addr: Multiaddr) {
632 self.behaviour
633 .on_swarm_event(FromSwarm::NewExternalAddrOfPeer(NewExternalAddrOfPeer {
634 peer_id,
635 addr: &addr,
636 }))
637 }
638
639 #[allow(clippy::result_unit_err)]
646 pub fn disconnect_peer_id(&mut self, peer_id: PeerId) -> Result<(), ()> {
647 let was_connected = self.pool.is_connected(peer_id);
648 self.pool.disconnect(peer_id);
649
650 if was_connected {
651 Ok(())
652 } else {
653 Err(())
654 }
655 }
656
657 pub fn close_connection(&mut self, connection_id: ConnectionId) -> bool {
667 if let Some(established) = self.pool.get_established(connection_id) {
668 established.start_close();
669 return true;
670 }
671
672 false
673 }
674
675 pub fn is_connected(&self, peer_id: &PeerId) -> bool {
677 self.pool.is_connected(*peer_id)
678 }
679
680 pub fn connected_peers(&self) -> impl Iterator<Item = &PeerId> {
682 self.pool.iter_connected()
683 }
684
685 pub fn behaviour(&self) -> &TBehaviour {
687 &self.behaviour
688 }
689
690 pub fn behaviour_mut(&mut self) -> &mut TBehaviour {
692 &mut self.behaviour
693 }
694
695 fn handle_pool_event(&mut self, event: PoolEvent<THandlerOutEvent<TBehaviour>>) {
696 match event {
697 PoolEvent::ConnectionEstablished {
698 peer_id,
699 id,
700 endpoint,
701 connection,
702 concurrent_dial_errors,
703 established_in,
704 } => {
705 let handler = match endpoint.clone() {
706 ConnectedPoint::Dialer {
707 address,
708 role_override,
709 } => {
710 match self.behaviour.handle_established_outbound_connection(
711 id,
712 peer_id,
713 &address,
714 role_override,
715 ) {
716 Ok(handler) => handler,
717 Err(cause) => {
718 let dial_error = DialError::Denied { cause };
719 self.behaviour.on_swarm_event(FromSwarm::DialFailure(
720 DialFailure {
721 connection_id: id,
722 error: &dial_error,
723 peer_id: Some(peer_id),
724 },
725 ));
726
727 self.pending_swarm_events.push_back(
728 SwarmEvent::OutgoingConnectionError {
729 peer_id: Some(peer_id),
730 connection_id: id,
731 error: dial_error,
732 },
733 );
734 return;
735 }
736 }
737 }
738 ConnectedPoint::Listener {
739 local_addr,
740 send_back_addr,
741 } => {
742 match self.behaviour.handle_established_inbound_connection(
743 id,
744 peer_id,
745 &local_addr,
746 &send_back_addr,
747 ) {
748 Ok(handler) => handler,
749 Err(cause) => {
750 let listen_error = ListenError::Denied { cause };
751 self.behaviour.on_swarm_event(FromSwarm::ListenFailure(
752 ListenFailure {
753 local_addr: &local_addr,
754 send_back_addr: &send_back_addr,
755 error: &listen_error,
756 connection_id: id,
757 },
758 ));
759
760 self.pending_swarm_events.push_back(
761 SwarmEvent::IncomingConnectionError {
762 connection_id: id,
763 send_back_addr,
764 local_addr,
765 error: listen_error,
766 },
767 );
768 return;
769 }
770 }
771 }
772 };
773
774 let supported_protocols = handler
775 .listen_protocol()
776 .upgrade()
777 .protocol_info()
778 .map(|p| p.as_ref().as_bytes().to_vec())
779 .collect();
780 let other_established_connection_ids = self
781 .pool
782 .iter_established_connections_of_peer(&peer_id)
783 .collect::<Vec<_>>();
784 let num_established = NonZeroU32::new(
785 u32::try_from(other_established_connection_ids.len() + 1).unwrap(),
786 )
787 .expect("n + 1 is always non-zero; qed");
788
789 self.pool
790 .spawn_connection(id, peer_id, &endpoint, connection, handler);
791
792 tracing::debug!(
793 peer=%peer_id,
794 ?endpoint,
795 total_peers=%num_established,
796 "Connection established"
797 );
798 let failed_addresses = concurrent_dial_errors
799 .as_ref()
800 .map(|es| {
801 es.iter()
802 .map(|(a, _)| a)
803 .cloned()
804 .collect::<Vec<Multiaddr>>()
805 })
806 .unwrap_or_default();
807 self.behaviour
808 .on_swarm_event(FromSwarm::ConnectionEstablished(
809 behaviour::ConnectionEstablished {
810 peer_id,
811 connection_id: id,
812 endpoint: &endpoint,
813 failed_addresses: &failed_addresses,
814 other_established: other_established_connection_ids.len(),
815 },
816 ));
817 self.supported_protocols = supported_protocols;
818 self.pending_swarm_events
819 .push_back(SwarmEvent::ConnectionEstablished {
820 peer_id,
821 connection_id: id,
822 num_established,
823 endpoint,
824 concurrent_dial_errors,
825 established_in,
826 });
827 }
828 PoolEvent::PendingOutboundConnectionError {
829 id: connection_id,
830 error,
831 peer,
832 } => {
833 let error = error.into();
834
835 self.behaviour
836 .on_swarm_event(FromSwarm::DialFailure(DialFailure {
837 peer_id: peer,
838 error: &error,
839 connection_id,
840 }));
841
842 if let Some(peer) = peer {
843 tracing::debug!(%peer, "Connection attempt to peer failed with {:?}.", error,);
844 } else {
845 tracing::debug!("Connection attempt to unknown peer failed with {:?}", error);
846 }
847
848 self.pending_swarm_events
849 .push_back(SwarmEvent::OutgoingConnectionError {
850 peer_id: peer,
851 connection_id,
852 error,
853 });
854 }
855 PoolEvent::PendingInboundConnectionError {
856 id,
857 send_back_addr,
858 local_addr,
859 error,
860 } => {
861 let error = error.into();
862
863 tracing::debug!("Incoming connection failed: {:?}", error);
864 self.behaviour
865 .on_swarm_event(FromSwarm::ListenFailure(ListenFailure {
866 local_addr: &local_addr,
867 send_back_addr: &send_back_addr,
868 error: &error,
869 connection_id: id,
870 }));
871 self.pending_swarm_events
872 .push_back(SwarmEvent::IncomingConnectionError {
873 connection_id: id,
874 local_addr,
875 send_back_addr,
876 error,
877 });
878 }
879 PoolEvent::ConnectionClosed {
880 id,
881 connected,
882 error,
883 remaining_established_connection_ids,
884 ..
885 } => {
886 if let Some(error) = error.as_ref() {
887 tracing::debug!(
888 total_peers=%remaining_established_connection_ids.len(),
889 "Connection closed with error {:?}: {:?}",
890 error,
891 connected,
892 );
893 } else {
894 tracing::debug!(
895 total_peers=%remaining_established_connection_ids.len(),
896 "Connection closed: {:?}",
897 connected
898 );
899 }
900 let peer_id = connected.peer_id;
901 let endpoint = connected.endpoint;
902 let num_established =
903 u32::try_from(remaining_established_connection_ids.len()).unwrap();
904
905 self.behaviour
906 .on_swarm_event(FromSwarm::ConnectionClosed(ConnectionClosed {
907 peer_id,
908 connection_id: id,
909 endpoint: &endpoint,
910 remaining_established: num_established as usize,
911 }));
912 self.pending_swarm_events
913 .push_back(SwarmEvent::ConnectionClosed {
914 peer_id,
915 connection_id: id,
916 endpoint,
917 cause: error,
918 num_established,
919 });
920 }
921 PoolEvent::ConnectionEvent { peer_id, id, event } => {
922 self.behaviour
923 .on_connection_handler_event(peer_id, id, event);
924 }
925 PoolEvent::AddressChange {
926 peer_id,
927 id,
928 new_endpoint,
929 old_endpoint,
930 } => {
931 self.behaviour
932 .on_swarm_event(FromSwarm::AddressChange(AddressChange {
933 peer_id,
934 connection_id: id,
935 old: &old_endpoint,
936 new: &new_endpoint,
937 }));
938 }
939 }
940 }
941
942 fn handle_transport_event(
943 &mut self,
944 event: TransportEvent<
945 <transport::Boxed<(PeerId, StreamMuxerBox)> as Transport>::ListenerUpgrade,
946 io::Error,
947 >,
948 ) {
949 match event {
950 TransportEvent::Incoming {
951 listener_id: _,
952 upgrade,
953 local_addr,
954 send_back_addr,
955 } => {
956 let connection_id = ConnectionId::next();
957
958 match self.behaviour.handle_pending_inbound_connection(
959 connection_id,
960 &local_addr,
961 &send_back_addr,
962 ) {
963 Ok(()) => {}
964 Err(cause) => {
965 let listen_error = ListenError::Denied { cause };
966
967 self.behaviour
968 .on_swarm_event(FromSwarm::ListenFailure(ListenFailure {
969 local_addr: &local_addr,
970 send_back_addr: &send_back_addr,
971 error: &listen_error,
972 connection_id,
973 }));
974
975 self.pending_swarm_events
976 .push_back(SwarmEvent::IncomingConnectionError {
977 connection_id,
978 local_addr,
979 send_back_addr,
980 error: listen_error,
981 });
982 return;
983 }
984 }
985
986 self.pool.add_incoming(
987 upgrade,
988 IncomingInfo {
989 local_addr: &local_addr,
990 send_back_addr: &send_back_addr,
991 },
992 connection_id,
993 );
994
995 self.pending_swarm_events
996 .push_back(SwarmEvent::IncomingConnection {
997 connection_id,
998 local_addr,
999 send_back_addr,
1000 })
1001 }
1002 TransportEvent::NewAddress {
1003 listener_id,
1004 listen_addr,
1005 } => {
1006 tracing::debug!(
1007 listener=?listener_id,
1008 address=%listen_addr,
1009 "New listener address"
1010 );
1011 let addrs = self.listened_addrs.entry(listener_id).or_default();
1012 if !addrs.contains(&listen_addr) {
1013 addrs.push(listen_addr.clone())
1014 }
1015 self.behaviour
1016 .on_swarm_event(FromSwarm::NewListenAddr(NewListenAddr {
1017 listener_id,
1018 addr: &listen_addr,
1019 }));
1020 self.pending_swarm_events
1021 .push_back(SwarmEvent::NewListenAddr {
1022 listener_id,
1023 address: listen_addr,
1024 })
1025 }
1026 TransportEvent::AddressExpired {
1027 listener_id,
1028 listen_addr,
1029 } => {
1030 tracing::debug!(
1031 listener=?listener_id,
1032 address=%listen_addr,
1033 "Expired listener address"
1034 );
1035 if let Some(addrs) = self.listened_addrs.get_mut(&listener_id) {
1036 addrs.retain(|a| a != &listen_addr);
1037 }
1038 self.behaviour
1039 .on_swarm_event(FromSwarm::ExpiredListenAddr(ExpiredListenAddr {
1040 listener_id,
1041 addr: &listen_addr,
1042 }));
1043 self.pending_swarm_events
1044 .push_back(SwarmEvent::ExpiredListenAddr {
1045 listener_id,
1046 address: listen_addr,
1047 })
1048 }
1049 TransportEvent::ListenerClosed {
1050 listener_id,
1051 reason,
1052 } => {
1053 tracing::debug!(
1054 listener=?listener_id,
1055 ?reason,
1056 "Listener closed"
1057 );
1058 let addrs = self.listened_addrs.remove(&listener_id).unwrap_or_default();
1059 for addr in addrs.iter() {
1060 self.behaviour.on_swarm_event(FromSwarm::ExpiredListenAddr(
1061 ExpiredListenAddr { listener_id, addr },
1062 ));
1063 }
1064 self.behaviour
1065 .on_swarm_event(FromSwarm::ListenerClosed(ListenerClosed {
1066 listener_id,
1067 reason: reason.as_ref().copied(),
1068 }));
1069 self.pending_swarm_events
1070 .push_back(SwarmEvent::ListenerClosed {
1071 listener_id,
1072 addresses: addrs.to_vec(),
1073 reason,
1074 })
1075 }
1076 TransportEvent::ListenerError { listener_id, error } => {
1077 self.behaviour
1078 .on_swarm_event(FromSwarm::ListenerError(ListenerError {
1079 listener_id,
1080 err: &error,
1081 }));
1082 self.pending_swarm_events
1083 .push_back(SwarmEvent::ListenerError { listener_id, error })
1084 }
1085 }
1086 }
1087
1088 fn handle_behaviour_event(
1089 &mut self,
1090 event: ToSwarm<TBehaviour::ToSwarm, THandlerInEvent<TBehaviour>>,
1091 ) {
1092 match event {
1093 ToSwarm::GenerateEvent(event) => {
1094 self.pending_swarm_events
1095 .push_back(SwarmEvent::Behaviour(event));
1096 }
1097 ToSwarm::Dial { opts } => {
1098 let peer_id = opts.get_peer_id();
1099 let connection_id = opts.connection_id();
1100 if let Ok(()) = self.dial(opts) {
1101 self.pending_swarm_events.push_back(SwarmEvent::Dialing {
1102 peer_id,
1103 connection_id,
1104 });
1105 }
1106 }
1107 ToSwarm::ListenOn { opts } => {
1108 let _ = self.add_listener(opts);
1110 }
1111 ToSwarm::RemoveListener { id } => {
1112 self.remove_listener(id);
1113 }
1114 ToSwarm::NotifyHandler {
1115 peer_id,
1116 handler,
1117 event,
1118 } => {
1119 assert!(self.pending_handler_event.is_none());
1120 let handler = match handler {
1121 NotifyHandler::One(connection) => PendingNotifyHandler::One(connection),
1122 NotifyHandler::Any => {
1123 let ids = self
1124 .pool
1125 .iter_established_connections_of_peer(&peer_id)
1126 .collect();
1127 PendingNotifyHandler::Any(ids)
1128 }
1129 };
1130
1131 self.pending_handler_event = Some((peer_id, handler, event));
1132 }
1133 ToSwarm::NewExternalAddrCandidate(addr) => {
1134 let translated_addresses = {
1137 let mut addrs: Vec<_> = self
1138 .listened_addrs
1139 .values()
1140 .flatten()
1141 .filter_map(|server| self.transport.address_translation(server, &addr))
1142 .collect();
1143
1144 addrs.sort_unstable();
1146 addrs.dedup();
1147 addrs
1148 };
1149
1150 if translated_addresses.is_empty() {
1152 self.behaviour
1153 .on_swarm_event(FromSwarm::NewExternalAddrCandidate(
1154 NewExternalAddrCandidate { addr: &addr },
1155 ));
1156 self.pending_swarm_events
1157 .push_back(SwarmEvent::NewExternalAddrCandidate { address: addr });
1158 } else {
1159 for addr in translated_addresses {
1160 self.behaviour
1161 .on_swarm_event(FromSwarm::NewExternalAddrCandidate(
1162 NewExternalAddrCandidate { addr: &addr },
1163 ));
1164 self.pending_swarm_events
1165 .push_back(SwarmEvent::NewExternalAddrCandidate { address: addr });
1166 }
1167 }
1168 }
1169 ToSwarm::ExternalAddrConfirmed(addr) => {
1170 self.add_external_address(addr.clone());
1171 self.pending_swarm_events
1172 .push_back(SwarmEvent::ExternalAddrConfirmed { address: addr });
1173 }
1174 ToSwarm::ExternalAddrExpired(addr) => {
1175 self.remove_external_address(&addr);
1176 self.pending_swarm_events
1177 .push_back(SwarmEvent::ExternalAddrExpired { address: addr });
1178 }
1179 ToSwarm::CloseConnection {
1180 peer_id,
1181 connection,
1182 } => match connection {
1183 CloseConnection::One(connection_id) => {
1184 if let Some(conn) = self.pool.get_established(connection_id) {
1185 conn.start_close();
1186 }
1187 }
1188 CloseConnection::All => {
1189 self.pool.disconnect(peer_id);
1190 }
1191 },
1192 ToSwarm::NewExternalAddrOfPeer { peer_id, address } => {
1193 self.behaviour
1194 .on_swarm_event(FromSwarm::NewExternalAddrOfPeer(NewExternalAddrOfPeer {
1195 peer_id,
1196 addr: &address,
1197 }));
1198 self.pending_swarm_events
1199 .push_back(SwarmEvent::NewExternalAddrOfPeer { peer_id, address });
1200 }
1201 }
1202 }
1203
1204 #[tracing::instrument(level = "debug", name = "Swarm::poll", skip(self, cx))]
1208 fn poll_next_event(
1209 mut self: Pin<&mut Self>,
1210 cx: &mut Context<'_>,
1211 ) -> Poll<SwarmEvent<TBehaviour::ToSwarm>> {
1212 let this = &mut *self;
1215
1216 loop {
1226 if let Some(swarm_event) = this.pending_swarm_events.pop_front() {
1227 return Poll::Ready(swarm_event);
1228 }
1229
1230 match this.pending_handler_event.take() {
1231 Some((peer_id, handler, event)) => match handler {
1234 PendingNotifyHandler::One(conn_id) => {
1235 match this.pool.get_established(conn_id) {
1236 Some(conn) => match notify_one(conn, event, cx) {
1237 None => continue,
1238 Some(event) => {
1239 this.pending_handler_event = Some((peer_id, handler, event));
1240 }
1241 },
1242 None => continue,
1243 }
1244 }
1245 PendingNotifyHandler::Any(ids) => {
1246 match notify_any::<_, TBehaviour>(ids, &mut this.pool, event, cx) {
1247 None => continue,
1248 Some((event, ids)) => {
1249 let handler = PendingNotifyHandler::Any(ids);
1250 this.pending_handler_event = Some((peer_id, handler, event));
1251 }
1252 }
1253 }
1254 },
1255 None => match this.behaviour.poll(cx) {
1257 Poll::Pending => {}
1258 Poll::Ready(behaviour_event) => {
1259 this.handle_behaviour_event(behaviour_event);
1260
1261 continue;
1262 }
1263 },
1264 }
1265
1266 match this.pool.poll(cx) {
1268 Poll::Pending => {}
1269 Poll::Ready(pool_event) => {
1270 this.handle_pool_event(pool_event);
1271 continue;
1272 }
1273 }
1274
1275 match Pin::new(&mut this.transport).poll(cx) {
1277 Poll::Pending => {}
1278 Poll::Ready(transport_event) => {
1279 this.handle_transport_event(transport_event);
1280 continue;
1281 }
1282 }
1283
1284 return Poll::Pending;
1285 }
1286 }
1287}
1288
1289enum PendingNotifyHandler {
1296 One(ConnectionId),
1297 Any(SmallVec<[ConnectionId; 10]>),
1298}
1299
1300fn notify_one<THandlerInEvent>(
1309 conn: &mut EstablishedConnection<THandlerInEvent>,
1310 event: THandlerInEvent,
1311 cx: &mut Context<'_>,
1312) -> Option<THandlerInEvent> {
1313 match conn.poll_ready_notify_handler(cx) {
1314 Poll::Pending => Some(event),
1315 Poll::Ready(Err(())) => None, Poll::Ready(Ok(())) => {
1317 let _ = conn.notify_handler(event);
1319 None
1320 }
1321 }
1322}
1323
1324fn notify_any<THandler, TBehaviour>(
1335 ids: SmallVec<[ConnectionId; 10]>,
1336 pool: &mut Pool<THandler>,
1337 event: THandlerInEvent<TBehaviour>,
1338 cx: &mut Context<'_>,
1339) -> Option<(THandlerInEvent<TBehaviour>, SmallVec<[ConnectionId; 10]>)>
1340where
1341 TBehaviour: NetworkBehaviour,
1342 THandler: ConnectionHandler<
1343 FromBehaviour = THandlerInEvent<TBehaviour>,
1344 ToBehaviour = THandlerOutEvent<TBehaviour>,
1345 >,
1346{
1347 let mut pending = SmallVec::new();
1348 let mut event = Some(event); for id in ids.into_iter() {
1350 if let Some(conn) = pool.get_established(id) {
1351 match conn.poll_ready_notify_handler(cx) {
1352 Poll::Pending => pending.push(id),
1353 Poll::Ready(Err(())) => {} Poll::Ready(Ok(())) => {
1355 let e = event.take().expect("by (1),(2)");
1356 if let Err(e) = conn.notify_handler(e) {
1357 event = Some(e) } else {
1359 break;
1360 }
1361 }
1362 }
1363 }
1364 }
1365
1366 event.and_then(|e| {
1367 if !pending.is_empty() {
1368 Some((e, pending))
1369 } else {
1370 None
1371 }
1372 })
1373}
1374
1375impl<TBehaviour> futures::Stream for Swarm<TBehaviour>
1383where
1384 TBehaviour: NetworkBehaviour,
1385{
1386 type Item = SwarmEvent<TBehaviourOutEvent<TBehaviour>>;
1387
1388 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
1389 self.as_mut().poll_next_event(cx).map(Some)
1390 }
1391}
1392
1393impl<TBehaviour> FusedStream for Swarm<TBehaviour>
1395where
1396 TBehaviour: NetworkBehaviour,
1397{
1398 fn is_terminated(&self) -> bool {
1399 false
1400 }
1401}
1402
1403pub struct Config {
1404 pool_config: PoolConfig,
1405}
1406
1407impl Config {
1408 pub fn with_executor(executor: impl Executor + Send + 'static) -> Self {
1411 Self {
1412 pool_config: PoolConfig::new(Some(Box::new(executor))),
1413 }
1414 }
1415
1416 #[cfg(feature = "wasm-bindgen")]
1426 pub fn with_wasm_executor() -> Self {
1427 Self::with_executor(crate::executor::WasmBindgenExecutor)
1428 }
1429
1430 #[cfg(all(
1432 feature = "tokio",
1433 not(any(target_os = "emscripten", target_os = "wasi", target_os = "unknown"))
1434 ))]
1435 pub fn with_tokio_executor() -> Self {
1436 Self::with_executor(crate::executor::TokioExecutor)
1437 }
1438
1439 #[cfg(all(
1441 feature = "async-std",
1442 not(any(target_os = "emscripten", target_os = "wasi", target_os = "unknown"))
1443 ))]
1444 pub fn with_async_std_executor() -> Self {
1445 Self::with_executor(crate::executor::AsyncStdExecutor)
1446 }
1447
1448 pub fn with_notify_handler_buffer_size(mut self, n: NonZeroUsize) -> Self {
1458 self.pool_config = self.pool_config.with_notify_handler_buffer_size(n);
1459 self
1460 }
1461
1462 pub fn with_per_connection_event_buffer_size(mut self, n: usize) -> Self {
1474 self.pool_config = self.pool_config.with_per_connection_event_buffer_size(n);
1475 self
1476 }
1477
1478 pub fn with_dial_concurrency_factor(mut self, factor: NonZeroU8) -> Self {
1480 self.pool_config = self.pool_config.with_dial_concurrency_factor(factor);
1481 self
1482 }
1483
1484 pub fn with_substream_upgrade_protocol_override(
1495 mut self,
1496 v: libp2p_core::upgrade::Version,
1497 ) -> Self {
1498 self.pool_config = self.pool_config.with_substream_upgrade_protocol_override(v);
1499 self
1500 }
1501
1502 pub fn with_max_negotiating_inbound_streams(mut self, v: usize) -> Self {
1512 self.pool_config = self.pool_config.with_max_negotiating_inbound_streams(v);
1513 self
1514 }
1515
1516 pub fn with_idle_connection_timeout(mut self, timeout: Duration) -> Self {
1520 self.pool_config.idle_connection_timeout = timeout;
1521 self
1522 }
1523}
1524
1525#[derive(Debug)]
1527pub enum DialError {
1528 LocalPeerId {
1530 endpoint: ConnectedPoint,
1531 },
1532 NoAddresses,
1534 DialPeerConditionFalse(dial_opts::PeerCondition),
1537 Aborted,
1539 WrongPeerId {
1541 obtained: PeerId,
1542 endpoint: ConnectedPoint,
1543 },
1544 Denied {
1545 cause: ConnectionDenied,
1546 },
1547 Transport(Vec<(Multiaddr, TransportError<io::Error>)>),
1549}
1550
1551impl From<PendingOutboundConnectionError> for DialError {
1552 fn from(error: PendingOutboundConnectionError) -> Self {
1553 match error {
1554 PendingConnectionError::Aborted => DialError::Aborted,
1555 PendingConnectionError::WrongPeerId { obtained, endpoint } => {
1556 DialError::WrongPeerId { obtained, endpoint }
1557 }
1558 PendingConnectionError::LocalPeerId { endpoint } => DialError::LocalPeerId { endpoint },
1559 PendingConnectionError::Transport(e) => DialError::Transport(e),
1560 }
1561 }
1562}
1563
1564impl fmt::Display for DialError {
1565 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1566 match self {
1567 DialError::NoAddresses => write!(f, "Dial error: no addresses for peer."),
1568 DialError::LocalPeerId { endpoint } => write!(
1569 f,
1570 "Dial error: tried to dial local peer id at {endpoint:?}."
1571 ),
1572 DialError::DialPeerConditionFalse(PeerCondition::Disconnected) => write!(f, "Dial error: dial condition was configured to only happen when disconnected (`PeerCondition::Disconnected`), but node is already connected, thus cancelling new dial."),
1573 DialError::DialPeerConditionFalse(PeerCondition::NotDialing) => write!(f, "Dial error: dial condition was configured to only happen if there is currently no ongoing dialing attempt (`PeerCondition::NotDialing`), but a dial is in progress, thus cancelling new dial."),
1574 DialError::DialPeerConditionFalse(PeerCondition::DisconnectedAndNotDialing) => write!(f, "Dial error: dial condition was configured to only happen when both disconnected (`PeerCondition::Disconnected`) and there is currently no ongoing dialing attempt (`PeerCondition::NotDialing`), but node is already connected or dial is in progress, thus cancelling new dial."),
1575 DialError::DialPeerConditionFalse(PeerCondition::Always) => unreachable!("Dial peer condition is by definition true."),
1576 DialError::Aborted => write!(
1577 f,
1578 "Dial error: Pending connection attempt has been aborted."
1579 ),
1580 DialError::WrongPeerId { obtained, endpoint } => write!(
1581 f,
1582 "Dial error: Unexpected peer ID {obtained} at {endpoint:?}."
1583 ),
1584 DialError::Transport(errors) => {
1585 write!(f, "Failed to negotiate transport protocol(s): [")?;
1586
1587 for (addr, error) in errors {
1588 write!(f, "({addr}")?;
1589 print_error_chain(f, error)?;
1590 write!(f, ")")?;
1591 }
1592 write!(f, "]")?;
1593
1594 Ok(())
1595 }
1596 DialError::Denied { .. } => {
1597 write!(f, "Dial error")
1598 }
1599 }
1600 }
1601}
1602
1603fn print_error_chain(f: &mut fmt::Formatter<'_>, e: &dyn error::Error) -> fmt::Result {
1604 write!(f, ": {e}")?;
1605
1606 if let Some(source) = e.source() {
1607 print_error_chain(f, source)?;
1608 }
1609
1610 Ok(())
1611}
1612
1613impl error::Error for DialError {
1614 fn source(&self) -> Option<&(dyn error::Error + 'static)> {
1615 match self {
1616 DialError::LocalPeerId { .. } => None,
1617 DialError::NoAddresses => None,
1618 DialError::DialPeerConditionFalse(_) => None,
1619 DialError::Aborted => None,
1620 DialError::WrongPeerId { .. } => None,
1621 DialError::Transport(_) => None,
1622 DialError::Denied { cause } => Some(cause),
1623 }
1624 }
1625}
1626
1627#[derive(Debug)]
1629pub enum ListenError {
1630 Aborted,
1632 WrongPeerId {
1634 obtained: PeerId,
1635 endpoint: ConnectedPoint,
1636 },
1637 LocalPeerId {
1639 endpoint: ConnectedPoint,
1640 },
1641 Denied {
1642 cause: ConnectionDenied,
1643 },
1644 Transport(TransportError<io::Error>),
1646}
1647
1648impl From<PendingInboundConnectionError> for ListenError {
1649 fn from(error: PendingInboundConnectionError) -> Self {
1650 match error {
1651 PendingInboundConnectionError::Transport(inner) => ListenError::Transport(inner),
1652 PendingInboundConnectionError::Aborted => ListenError::Aborted,
1653 PendingInboundConnectionError::WrongPeerId { obtained, endpoint } => {
1654 ListenError::WrongPeerId { obtained, endpoint }
1655 }
1656 PendingInboundConnectionError::LocalPeerId { endpoint } => {
1657 ListenError::LocalPeerId { endpoint }
1658 }
1659 }
1660 }
1661}
1662
1663impl fmt::Display for ListenError {
1664 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1665 match self {
1666 ListenError::Aborted => write!(
1667 f,
1668 "Listen error: Pending connection attempt has been aborted."
1669 ),
1670 ListenError::WrongPeerId { obtained, endpoint } => write!(
1671 f,
1672 "Listen error: Unexpected peer ID {obtained} at {endpoint:?}."
1673 ),
1674 ListenError::Transport(_) => {
1675 write!(f, "Listen error: Failed to negotiate transport protocol(s)")
1676 }
1677 ListenError::Denied { cause } => {
1678 write!(f, "Listen error: Denied: {cause}")
1679 }
1680 ListenError::LocalPeerId { endpoint } => {
1681 write!(f, "Listen error: Local peer ID at {endpoint:?}.")
1682 }
1683 }
1684 }
1685}
1686
1687impl error::Error for ListenError {
1688 fn source(&self) -> Option<&(dyn error::Error + 'static)> {
1689 match self {
1690 ListenError::WrongPeerId { .. } => None,
1691 ListenError::Transport(err) => Some(err),
1692 ListenError::Aborted => None,
1693 ListenError::Denied { cause } => Some(cause),
1694 ListenError::LocalPeerId { .. } => None,
1695 }
1696 }
1697}
1698
1699#[derive(Debug)]
1703pub struct ConnectionDenied {
1704 inner: Box<dyn error::Error + Send + Sync + 'static>,
1705}
1706
1707impl ConnectionDenied {
1708 pub fn new(cause: impl Into<Box<dyn error::Error + Send + Sync + 'static>>) -> Self {
1709 Self {
1710 inner: cause.into(),
1711 }
1712 }
1713
1714 pub fn downcast<E>(self) -> Result<E, Self>
1716 where
1717 E: error::Error + Send + Sync + 'static,
1718 {
1719 let inner = self
1720 .inner
1721 .downcast::<E>()
1722 .map_err(|inner| ConnectionDenied { inner })?;
1723
1724 Ok(*inner)
1725 }
1726
1727 pub fn downcast_ref<E>(&self) -> Option<&E>
1729 where
1730 E: error::Error + Send + Sync + 'static,
1731 {
1732 self.inner.downcast_ref::<E>()
1733 }
1734}
1735
1736impl fmt::Display for ConnectionDenied {
1737 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1738 write!(f, "connection denied")
1739 }
1740}
1741
1742impl error::Error for ConnectionDenied {
1743 fn source(&self) -> Option<&(dyn error::Error + 'static)> {
1744 Some(self.inner.as_ref())
1745 }
1746}
1747
1748#[derive(Clone, Debug)]
1750pub struct NetworkInfo {
1751 num_peers: usize,
1753 connection_counters: ConnectionCounters,
1755}
1756
1757impl NetworkInfo {
1758 pub fn num_peers(&self) -> usize {
1761 self.num_peers
1762 }
1763
1764 pub fn connection_counters(&self) -> &ConnectionCounters {
1766 &self.connection_counters
1767 }
1768}
1769
1770#[cfg(test)]
1771mod tests {
1772 use super::*;
1773 use crate::test::{CallTraceBehaviour, MockBehaviour};
1774 use libp2p_core::multiaddr::multiaddr;
1775 use libp2p_core::transport::memory::MemoryTransportError;
1776 use libp2p_core::{multiaddr, upgrade};
1777 use libp2p_identity as identity;
1778 use libp2p_plaintext as plaintext;
1779 use libp2p_yamux as yamux;
1780 use quickcheck::*;
1781
1782 enum State {
1785 Connecting,
1786 Disconnecting,
1787 }
1788
1789 fn new_test_swarm(
1790 config: Config,
1791 ) -> Swarm<CallTraceBehaviour<MockBehaviour<dummy::ConnectionHandler, ()>>> {
1792 let id_keys = identity::Keypair::generate_ed25519();
1793 let local_public_key = id_keys.public();
1794 let transport = transport::MemoryTransport::default()
1795 .upgrade(upgrade::Version::V1)
1796 .authenticate(plaintext::Config::new(&id_keys))
1797 .multiplex(yamux::Config::default())
1798 .boxed();
1799 let behaviour = CallTraceBehaviour::new(MockBehaviour::new(dummy::ConnectionHandler));
1800
1801 Swarm::new(
1802 transport,
1803 behaviour,
1804 local_public_key.into(),
1805 config.with_idle_connection_timeout(Duration::from_secs(5)),
1806 )
1807 }
1808
1809 fn swarms_connected<TBehaviour>(
1810 swarm1: &Swarm<CallTraceBehaviour<TBehaviour>>,
1811 swarm2: &Swarm<CallTraceBehaviour<TBehaviour>>,
1812 num_connections: usize,
1813 ) -> bool
1814 where
1815 TBehaviour: NetworkBehaviour,
1816 THandlerOutEvent<TBehaviour>: Clone,
1817 {
1818 swarm1
1819 .behaviour()
1820 .num_connections_to_peer(*swarm2.local_peer_id())
1821 == num_connections
1822 && swarm2
1823 .behaviour()
1824 .num_connections_to_peer(*swarm1.local_peer_id())
1825 == num_connections
1826 && swarm1.is_connected(swarm2.local_peer_id())
1827 && swarm2.is_connected(swarm1.local_peer_id())
1828 }
1829
1830 fn swarms_disconnected<TBehaviour>(
1831 swarm1: &Swarm<CallTraceBehaviour<TBehaviour>>,
1832 swarm2: &Swarm<CallTraceBehaviour<TBehaviour>>,
1833 ) -> bool
1834 where
1835 TBehaviour: NetworkBehaviour,
1836 THandlerOutEvent<TBehaviour>: Clone,
1837 {
1838 swarm1
1839 .behaviour()
1840 .num_connections_to_peer(*swarm2.local_peer_id())
1841 == 0
1842 && swarm2
1843 .behaviour()
1844 .num_connections_to_peer(*swarm1.local_peer_id())
1845 == 0
1846 && !swarm1.is_connected(swarm2.local_peer_id())
1847 && !swarm2.is_connected(swarm1.local_peer_id())
1848 }
1849
1850 #[tokio::test]
1856 async fn test_swarm_disconnect() {
1857 let mut swarm1 = new_test_swarm(Config::with_tokio_executor());
1858 let mut swarm2 = new_test_swarm(Config::with_tokio_executor());
1859
1860 let addr1: Multiaddr = multiaddr::Protocol::Memory(rand::random::<u64>()).into();
1861 let addr2: Multiaddr = multiaddr::Protocol::Memory(rand::random::<u64>()).into();
1862
1863 swarm1.listen_on(addr1.clone()).unwrap();
1864 swarm2.listen_on(addr2.clone()).unwrap();
1865
1866 let swarm1_id = *swarm1.local_peer_id();
1867
1868 let mut reconnected = false;
1869 let num_connections = 10;
1870
1871 for _ in 0..num_connections {
1872 swarm1.dial(addr2.clone()).unwrap();
1873 }
1874 let mut state = State::Connecting;
1875
1876 future::poll_fn(move |cx| loop {
1877 let poll1 = Swarm::poll_next_event(Pin::new(&mut swarm1), cx);
1878 let poll2 = Swarm::poll_next_event(Pin::new(&mut swarm2), cx);
1879 match state {
1880 State::Connecting => {
1881 if swarms_connected(&swarm1, &swarm2, num_connections) {
1882 if reconnected {
1883 return Poll::Ready(());
1884 }
1885 swarm2
1886 .disconnect_peer_id(swarm1_id)
1887 .expect("Error disconnecting");
1888 state = State::Disconnecting;
1889 }
1890 }
1891 State::Disconnecting => {
1892 if swarms_disconnected(&swarm1, &swarm2) {
1893 if reconnected {
1894 return Poll::Ready(());
1895 }
1896 reconnected = true;
1897 for _ in 0..num_connections {
1898 swarm2.dial(addr1.clone()).unwrap();
1899 }
1900 state = State::Connecting;
1901 }
1902 }
1903 }
1904
1905 if poll1.is_pending() && poll2.is_pending() {
1906 return Poll::Pending;
1907 }
1908 })
1909 .await
1910 }
1911
1912 #[tokio::test]
1919 async fn test_behaviour_disconnect_all() {
1920 let mut swarm1 = new_test_swarm(Config::with_tokio_executor());
1921 let mut swarm2 = new_test_swarm(Config::with_tokio_executor());
1922
1923 let addr1: Multiaddr = multiaddr::Protocol::Memory(rand::random::<u64>()).into();
1924 let addr2: Multiaddr = multiaddr::Protocol::Memory(rand::random::<u64>()).into();
1925
1926 swarm1.listen_on(addr1.clone()).unwrap();
1927 swarm2.listen_on(addr2.clone()).unwrap();
1928
1929 let swarm1_id = *swarm1.local_peer_id();
1930
1931 let mut reconnected = false;
1932 let num_connections = 10;
1933
1934 for _ in 0..num_connections {
1935 swarm1.dial(addr2.clone()).unwrap();
1936 }
1937 let mut state = State::Connecting;
1938
1939 future::poll_fn(move |cx| loop {
1940 let poll1 = Swarm::poll_next_event(Pin::new(&mut swarm1), cx);
1941 let poll2 = Swarm::poll_next_event(Pin::new(&mut swarm2), cx);
1942 match state {
1943 State::Connecting => {
1944 if swarms_connected(&swarm1, &swarm2, num_connections) {
1945 if reconnected {
1946 return Poll::Ready(());
1947 }
1948 swarm2
1949 .behaviour
1950 .inner()
1951 .next_action
1952 .replace(ToSwarm::CloseConnection {
1953 peer_id: swarm1_id,
1954 connection: CloseConnection::All,
1955 });
1956 state = State::Disconnecting;
1957 continue;
1958 }
1959 }
1960 State::Disconnecting => {
1961 if swarms_disconnected(&swarm1, &swarm2) {
1962 reconnected = true;
1963 for _ in 0..num_connections {
1964 swarm2.dial(addr1.clone()).unwrap();
1965 }
1966 state = State::Connecting;
1967 continue;
1968 }
1969 }
1970 }
1971
1972 if poll1.is_pending() && poll2.is_pending() {
1973 return Poll::Pending;
1974 }
1975 })
1976 .await
1977 }
1978
1979 #[tokio::test]
1986 async fn test_behaviour_disconnect_one() {
1987 let mut swarm1 = new_test_swarm(Config::with_tokio_executor());
1988 let mut swarm2 = new_test_swarm(Config::with_tokio_executor());
1989
1990 let addr1: Multiaddr = multiaddr::Protocol::Memory(rand::random::<u64>()).into();
1991 let addr2: Multiaddr = multiaddr::Protocol::Memory(rand::random::<u64>()).into();
1992
1993 swarm1.listen_on(addr1).unwrap();
1994 swarm2.listen_on(addr2.clone()).unwrap();
1995
1996 let swarm1_id = *swarm1.local_peer_id();
1997
1998 let num_connections = 10;
1999
2000 for _ in 0..num_connections {
2001 swarm1.dial(addr2.clone()).unwrap();
2002 }
2003 let mut state = State::Connecting;
2004 let mut disconnected_conn_id = None;
2005
2006 future::poll_fn(move |cx| loop {
2007 let poll1 = Swarm::poll_next_event(Pin::new(&mut swarm1), cx);
2008 let poll2 = Swarm::poll_next_event(Pin::new(&mut swarm2), cx);
2009 match state {
2010 State::Connecting => {
2011 if swarms_connected(&swarm1, &swarm2, num_connections) {
2012 disconnected_conn_id = {
2013 let conn_id =
2014 swarm2.behaviour.on_connection_established[num_connections / 2].1;
2015 swarm2.behaviour.inner().next_action.replace(
2016 ToSwarm::CloseConnection {
2017 peer_id: swarm1_id,
2018 connection: CloseConnection::One(conn_id),
2019 },
2020 );
2021 Some(conn_id)
2022 };
2023 state = State::Disconnecting;
2024 }
2025 }
2026 State::Disconnecting => {
2027 for s in &[&swarm1, &swarm2] {
2028 assert!(s
2029 .behaviour
2030 .on_connection_closed
2031 .iter()
2032 .all(|(.., remaining_conns)| *remaining_conns > 0));
2033 assert_eq!(s.behaviour.on_connection_established.len(), num_connections);
2034 s.behaviour.assert_connected(num_connections, 1);
2035 }
2036 if [&swarm1, &swarm2]
2037 .iter()
2038 .all(|s| s.behaviour.on_connection_closed.len() == 1)
2039 {
2040 let conn_id = swarm2.behaviour.on_connection_closed[0].1;
2041 assert_eq!(Some(conn_id), disconnected_conn_id);
2042 return Poll::Ready(());
2043 }
2044 }
2045 }
2046
2047 if poll1.is_pending() && poll2.is_pending() {
2048 return Poll::Pending;
2049 }
2050 })
2051 .await
2052 }
2053
2054 #[test]
2055 fn concurrent_dialing() {
2056 #[derive(Clone, Debug)]
2057 struct DialConcurrencyFactor(NonZeroU8);
2058
2059 impl Arbitrary for DialConcurrencyFactor {
2060 fn arbitrary(g: &mut Gen) -> Self {
2061 Self(NonZeroU8::new(g.gen_range(1..11)).unwrap())
2062 }
2063 }
2064
2065 fn prop(concurrency_factor: DialConcurrencyFactor) {
2066 tokio::runtime::Runtime::new().unwrap().block_on(async {
2067 let mut swarm = new_test_swarm(
2068 Config::with_tokio_executor()
2069 .with_dial_concurrency_factor(concurrency_factor.0),
2070 );
2071
2072 let num_listen_addrs = concurrency_factor.0.get() + 2;
2076 let mut listen_addresses = Vec::new();
2077 let mut transports = Vec::new();
2078 for _ in 0..num_listen_addrs {
2079 let mut transport = transport::MemoryTransport::default().boxed();
2080 transport
2081 .listen_on(ListenerId::next(), "/memory/0".parse().unwrap())
2082 .unwrap();
2083
2084 match transport.select_next_some().await {
2085 TransportEvent::NewAddress { listen_addr, .. } => {
2086 listen_addresses.push(listen_addr);
2087 }
2088 _ => panic!("Expected `NewListenAddr` event."),
2089 }
2090
2091 transports.push(transport);
2092 }
2093
2094 swarm
2097 .dial(
2098 DialOpts::peer_id(PeerId::random())
2099 .addresses(listen_addresses)
2100 .build(),
2101 )
2102 .unwrap();
2103 for mut transport in transports.into_iter() {
2104 match futures::future::select(transport.select_next_some(), swarm.next()).await
2105 {
2106 future::Either::Left((TransportEvent::Incoming { .. }, _)) => {}
2107 future::Either::Left(_) => {
2108 panic!("Unexpected transport event.")
2109 }
2110 future::Either::Right((e, _)) => {
2111 panic!("Expect swarm to not emit any event {e:?}")
2112 }
2113 }
2114 }
2115
2116 match swarm.next().await.unwrap() {
2117 SwarmEvent::OutgoingConnectionError { .. } => {}
2118 e => panic!("Unexpected swarm event {e:?}"),
2119 }
2120 })
2121 }
2122
2123 QuickCheck::new().tests(10).quickcheck(prop as fn(_) -> _);
2124 }
2125
2126 #[tokio::test]
2127 async fn invalid_peer_id() {
2128 let mut swarm1 = new_test_swarm(Config::with_tokio_executor());
2132 let mut swarm2 = new_test_swarm(Config::with_tokio_executor());
2133
2134 swarm1.listen_on("/memory/0".parse().unwrap()).unwrap();
2135
2136 let address = future::poll_fn(|cx| match swarm1.poll_next_unpin(cx) {
2137 Poll::Ready(Some(SwarmEvent::NewListenAddr { address, .. })) => Poll::Ready(address),
2138 Poll::Pending => Poll::Pending,
2139 _ => panic!("Was expecting the listen address to be reported"),
2140 })
2141 .await;
2142
2143 let other_id = PeerId::random();
2144 let other_addr = address.with(multiaddr::Protocol::P2p(other_id));
2145
2146 swarm2.dial(other_addr.clone()).unwrap();
2147
2148 let (peer_id, error) = future::poll_fn(|cx| {
2149 if let Poll::Ready(Some(SwarmEvent::IncomingConnection { .. })) =
2150 swarm1.poll_next_unpin(cx)
2151 {}
2152
2153 match swarm2.poll_next_unpin(cx) {
2154 Poll::Ready(Some(SwarmEvent::OutgoingConnectionError {
2155 peer_id, error, ..
2156 })) => Poll::Ready((peer_id, error)),
2157 Poll::Ready(x) => panic!("unexpected {x:?}"),
2158 Poll::Pending => Poll::Pending,
2159 }
2160 })
2161 .await;
2162 assert_eq!(peer_id.unwrap(), other_id);
2163 match error {
2164 DialError::WrongPeerId { obtained, endpoint } => {
2165 assert_eq!(obtained, *swarm1.local_peer_id());
2166 assert_eq!(
2167 endpoint,
2168 ConnectedPoint::Dialer {
2169 address: other_addr,
2170 role_override: Endpoint::Dialer,
2171 }
2172 );
2173 }
2174 x => panic!("wrong error {x:?}"),
2175 }
2176 }
2177
2178 #[tokio::test]
2179 async fn dial_self() {
2180 let mut swarm = new_test_swarm(Config::with_tokio_executor());
2191 swarm.listen_on("/memory/0".parse().unwrap()).unwrap();
2192
2193 let local_address = future::poll_fn(|cx| match swarm.poll_next_unpin(cx) {
2194 Poll::Ready(Some(SwarmEvent::NewListenAddr { address, .. })) => Poll::Ready(address),
2195 Poll::Pending => Poll::Pending,
2196 _ => panic!("Was expecting the listen address to be reported"),
2197 })
2198 .await;
2199
2200 swarm.listened_addrs.clear(); swarm.dial(local_address.clone()).unwrap();
2203
2204 let mut got_dial_err = false;
2205 let mut got_inc_err = false;
2206 future::poll_fn(|cx| -> Poll<Result<(), io::Error>> {
2207 loop {
2208 match swarm.poll_next_unpin(cx) {
2209 Poll::Ready(Some(SwarmEvent::OutgoingConnectionError {
2210 peer_id,
2211 error: DialError::LocalPeerId { .. },
2212 ..
2213 })) => {
2214 assert_eq!(&peer_id.unwrap(), swarm.local_peer_id());
2215 assert!(!got_dial_err);
2216 got_dial_err = true;
2217 if got_inc_err {
2218 return Poll::Ready(Ok(()));
2219 }
2220 }
2221 Poll::Ready(Some(SwarmEvent::IncomingConnectionError {
2222 local_addr, ..
2223 })) => {
2224 assert!(!got_inc_err);
2225 assert_eq!(local_addr, local_address);
2226 got_inc_err = true;
2227 if got_dial_err {
2228 return Poll::Ready(Ok(()));
2229 }
2230 }
2231 Poll::Ready(Some(SwarmEvent::IncomingConnection { local_addr, .. })) => {
2232 assert_eq!(local_addr, local_address);
2233 }
2234 Poll::Ready(ev) => {
2235 panic!("Unexpected event: {ev:?}")
2236 }
2237 Poll::Pending => break Poll::Pending,
2238 }
2239 }
2240 })
2241 .await
2242 .unwrap();
2243 }
2244
2245 #[tokio::test]
2246 async fn dial_self_by_id() {
2247 let swarm = new_test_swarm(Config::with_tokio_executor());
2250 let peer_id = *swarm.local_peer_id();
2251 assert!(!swarm.is_connected(&peer_id));
2252 }
2253
2254 #[tokio::test]
2255 async fn multiple_addresses_err() {
2256 let target = PeerId::random();
2259
2260 let mut swarm = new_test_swarm(Config::with_tokio_executor());
2261
2262 let addresses = HashSet::from([
2263 multiaddr![Ip4([0, 0, 0, 0]), Tcp(rand::random::<u16>())],
2264 multiaddr![Ip4([0, 0, 0, 0]), Tcp(rand::random::<u16>())],
2265 multiaddr![Ip4([0, 0, 0, 0]), Tcp(rand::random::<u16>())],
2266 multiaddr![Udp(rand::random::<u16>())],
2267 multiaddr![Udp(rand::random::<u16>())],
2268 multiaddr![Udp(rand::random::<u16>())],
2269 multiaddr![Udp(rand::random::<u16>())],
2270 multiaddr![Udp(rand::random::<u16>())],
2271 ]);
2272
2273 swarm
2274 .dial(
2275 DialOpts::peer_id(target)
2276 .addresses(addresses.iter().cloned().collect())
2277 .build(),
2278 )
2279 .unwrap();
2280
2281 match swarm.next().await.unwrap() {
2282 SwarmEvent::OutgoingConnectionError {
2283 peer_id,
2284 error: DialError::Transport(errors),
2286 ..
2287 } => {
2288 assert_eq!(target, peer_id.unwrap());
2289
2290 let failed_addresses = errors.into_iter().map(|(addr, _)| addr).collect::<Vec<_>>();
2291 let expected_addresses = addresses
2292 .into_iter()
2293 .map(|addr| addr.with(multiaddr::Protocol::P2p(target)))
2294 .collect::<Vec<_>>();
2295
2296 assert_eq!(expected_addresses, failed_addresses);
2297 }
2298 e => panic!("Unexpected event: {e:?}"),
2299 }
2300 }
2301
2302 #[tokio::test]
2303 async fn aborting_pending_connection_surfaces_error() {
2304 let _ = tracing_subscriber::fmt()
2305 .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
2306 .try_init();
2307
2308 let mut dialer = new_test_swarm(Config::with_tokio_executor());
2309 let mut listener = new_test_swarm(Config::with_tokio_executor());
2310
2311 let listener_peer_id = *listener.local_peer_id();
2312 listener.listen_on(multiaddr![Memory(0u64)]).unwrap();
2313 let listener_address = match listener.next().await.unwrap() {
2314 SwarmEvent::NewListenAddr { address, .. } => address,
2315 e => panic!("Unexpected network event: {e:?}"),
2316 };
2317
2318 dialer
2319 .dial(
2320 DialOpts::peer_id(listener_peer_id)
2321 .addresses(vec![listener_address])
2322 .build(),
2323 )
2324 .unwrap();
2325
2326 dialer
2327 .disconnect_peer_id(listener_peer_id)
2328 .expect_err("Expect peer to not yet be connected.");
2329
2330 match dialer.next().await.unwrap() {
2331 SwarmEvent::OutgoingConnectionError {
2332 error: DialError::Aborted,
2333 ..
2334 } => {}
2335 e => panic!("Unexpected swarm event {e:?}."),
2336 }
2337 }
2338
2339 #[test]
2340 fn dial_error_prints_sources() {
2341 let error = DialError::Transport(vec![(
2343 "/ip4/127.0.0.1/tcp/80".parse().unwrap(),
2344 TransportError::Other(io::Error::new(
2345 io::ErrorKind::Other,
2346 MemoryTransportError::Unreachable,
2347 )),
2348 )]);
2349
2350 let string = format!("{error}");
2351
2352 assert_eq!("Failed to negotiate transport protocol(s): [(/ip4/127.0.0.1/tcp/80: : No listener on the given port.)]", string)
2354 }
2355}