libp2p/builder/phase/
tcp.rs

1use super::*;
2use crate::SwarmBuilder;
3#[cfg(all(
4    not(target_arch = "wasm32"),
5    any(feature = "tcp", feature = "websocket")
6))]
7use libp2p_core::muxing::{StreamMuxer, StreamMuxerBox};
8#[cfg(all(feature = "websocket", not(target_arch = "wasm32")))]
9use libp2p_core::Transport;
10#[cfg(all(
11    not(target_arch = "wasm32"),
12    any(feature = "tcp", feature = "websocket")
13))]
14use libp2p_core::{
15    upgrade::InboundConnectionUpgrade, upgrade::OutboundConnectionUpgrade, Negotiated, UpgradeInfo,
16};
17use std::marker::PhantomData;
18
19pub struct TcpPhase {}
20
21macro_rules! impl_tcp_builder {
22    ($providerKebabCase:literal, $providerPascalCase:ty, $path:ident) => {
23        #[cfg(all(
24            not(target_arch = "wasm32"),
25            feature = "tcp",
26            feature = $providerKebabCase,
27        ))]
28        impl SwarmBuilder<$providerPascalCase, TcpPhase> {
29            /// Adds a TCP based transport.
30            ///
31            /// Note that both `security_upgrade` and `multiplexer_upgrade` take function pointers,
32            /// i.e. they take the function themselves (without the invocation via `()`), not the
33            /// result of the function invocation. See example below.
34            ///
35            /// ``` rust
36            /// # use libp2p::SwarmBuilder;
37            /// # use std::error::Error;
38            /// # async fn build_swarm() -> Result<(), Box<dyn Error>> {
39            /// let swarm = SwarmBuilder::with_new_identity()
40            ///     .with_tokio()
41            ///     .with_tcp(
42            ///         Default::default(),
43            ///         (libp2p_tls::Config::new, libp2p_noise::Config::new),
44            ///         libp2p_yamux::Config::default,
45            ///     )?
46            /// # ;
47            /// # Ok(())
48            /// # }
49            /// ```
50            pub fn with_tcp<SecUpgrade, SecStream, SecError, MuxUpgrade, MuxStream, MuxError>(
51                self,
52                tcp_config: libp2p_tcp::Config,
53                security_upgrade: SecUpgrade,
54                multiplexer_upgrade: MuxUpgrade,
55            ) -> Result<
56                SwarmBuilder<$providerPascalCase, QuicPhase<impl AuthenticatedMultiplexedTransport>>,
57            SecUpgrade::Error,
58            >
59            where
60                SecStream: futures::AsyncRead + futures::AsyncWrite + Unpin + Send + 'static,
61                SecError: std::error::Error + Send + Sync + 'static,
62                SecUpgrade: IntoSecurityUpgrade<libp2p_tcp::$path::TcpStream>,
63                SecUpgrade::Upgrade: InboundConnectionUpgrade<Negotiated<libp2p_tcp::$path::TcpStream>, Output = (libp2p_identity::PeerId, SecStream), Error = SecError> + OutboundConnectionUpgrade<Negotiated<libp2p_tcp::$path::TcpStream>, Output = (libp2p_identity::PeerId, SecStream), Error = SecError> + Clone + Send + 'static,
64                <SecUpgrade::Upgrade as InboundConnectionUpgrade<Negotiated<libp2p_tcp::$path::TcpStream>>>::Future: Send,
65                <SecUpgrade::Upgrade as OutboundConnectionUpgrade<Negotiated<libp2p_tcp::$path::TcpStream>>>::Future: Send,
66                <<<SecUpgrade as IntoSecurityUpgrade<libp2p_tcp::$path::TcpStream>>::Upgrade as UpgradeInfo>::InfoIter as IntoIterator>::IntoIter: Send,
67                <<SecUpgrade as IntoSecurityUpgrade<libp2p_tcp::$path::TcpStream>>::Upgrade as UpgradeInfo>::Info: Send,
68
69                MuxStream: StreamMuxer + Send + 'static,
70                MuxStream::Substream: Send + 'static,
71                MuxStream::Error: Send + Sync + 'static,
72                MuxUpgrade: IntoMultiplexerUpgrade<SecStream>,
73                MuxUpgrade::Upgrade: InboundConnectionUpgrade<Negotiated<SecStream>, Output = MuxStream, Error = MuxError> + OutboundConnectionUpgrade<Negotiated<SecStream>, Output = MuxStream, Error = MuxError> + Clone + Send + 'static,
74                <MuxUpgrade::Upgrade as InboundConnectionUpgrade<Negotiated<SecStream>>>::Future: Send,
75                <MuxUpgrade::Upgrade as OutboundConnectionUpgrade<Negotiated<SecStream>>>::Future: Send,
76                MuxError: std::error::Error + Send + Sync + 'static,
77                <<<MuxUpgrade as IntoMultiplexerUpgrade<SecStream>>::Upgrade as UpgradeInfo>::InfoIter as IntoIterator>::IntoIter: Send,
78                <<MuxUpgrade as IntoMultiplexerUpgrade<SecStream>>::Upgrade as UpgradeInfo>::Info: Send,
79            {
80                Ok(SwarmBuilder {
81                    phase: QuicPhase {
82                        transport: libp2p_tcp::$path::Transport::new(tcp_config)
83                            .upgrade(libp2p_core::upgrade::Version::V1Lazy)
84                            .authenticate(
85                                security_upgrade.into_security_upgrade(&self.keypair)?,
86                            )
87                            .multiplex(multiplexer_upgrade.into_multiplexer_upgrade())
88                            .map(|(p, c), _| (p, StreamMuxerBox::new(c))),
89                    },
90                    keypair: self.keypair,
91                    phantom: PhantomData,
92                })
93            }
94        }
95    };
96}
97
98impl_tcp_builder!("async-std", super::provider::AsyncStd, async_io);
99impl_tcp_builder!("tokio", super::provider::Tokio, tokio);
100
101impl<Provider> SwarmBuilder<Provider, TcpPhase> {
102    pub(crate) fn without_tcp(
103        self,
104    ) -> SwarmBuilder<Provider, QuicPhase<impl AuthenticatedMultiplexedTransport>> {
105        SwarmBuilder {
106            keypair: self.keypair,
107            phantom: PhantomData,
108            phase: QuicPhase {
109                transport: libp2p_core::transport::dummy::DummyTransport::new(),
110            },
111        }
112    }
113}
114
115// Shortcuts
116#[cfg(all(not(target_arch = "wasm32"), feature = "quic", feature = "async-std"))]
117impl SwarmBuilder<super::provider::AsyncStd, TcpPhase> {
118    pub fn with_quic(
119        self,
120    ) -> SwarmBuilder<
121        super::provider::AsyncStd,
122        OtherTransportPhase<impl AuthenticatedMultiplexedTransport>,
123    > {
124        self.without_tcp().with_quic()
125    }
126}
127#[cfg(all(not(target_arch = "wasm32"), feature = "quic", feature = "tokio"))]
128impl SwarmBuilder<super::provider::Tokio, TcpPhase> {
129    pub fn with_quic(
130        self,
131    ) -> SwarmBuilder<
132        super::provider::Tokio,
133        OtherTransportPhase<impl AuthenticatedMultiplexedTransport>,
134    > {
135        self.without_tcp().with_quic()
136    }
137}
138#[cfg(all(not(target_arch = "wasm32"), feature = "quic", feature = "async-std"))]
139impl SwarmBuilder<super::provider::AsyncStd, TcpPhase> {
140    pub fn with_quic_config(
141        self,
142        constructor: impl FnOnce(libp2p_quic::Config) -> libp2p_quic::Config,
143    ) -> SwarmBuilder<
144        super::provider::AsyncStd,
145        OtherTransportPhase<impl AuthenticatedMultiplexedTransport>,
146    > {
147        self.without_tcp().with_quic_config(constructor)
148    }
149}
150#[cfg(all(not(target_arch = "wasm32"), feature = "quic", feature = "tokio"))]
151impl SwarmBuilder<super::provider::Tokio, TcpPhase> {
152    pub fn with_quic_config(
153        self,
154        constructor: impl FnOnce(libp2p_quic::Config) -> libp2p_quic::Config,
155    ) -> SwarmBuilder<
156        super::provider::Tokio,
157        OtherTransportPhase<impl AuthenticatedMultiplexedTransport>,
158    > {
159        self.without_tcp().with_quic_config(constructor)
160    }
161}
162impl<Provider> SwarmBuilder<Provider, TcpPhase> {
163    pub fn with_other_transport<
164        Muxer: libp2p_core::muxing::StreamMuxer + Send + 'static,
165        OtherTransport: Transport<Output = (libp2p_identity::PeerId, Muxer)> + Send + Unpin + 'static,
166        R: TryIntoTransport<OtherTransport>,
167    >(
168        self,
169        constructor: impl FnOnce(&libp2p_identity::Keypair) -> R,
170    ) -> Result<
171        SwarmBuilder<Provider, OtherTransportPhase<impl AuthenticatedMultiplexedTransport>>,
172        R::Error,
173    >
174    where
175        <OtherTransport as Transport>::Error: Send + Sync + 'static,
176        <OtherTransport as Transport>::Dial: Send,
177        <OtherTransport as Transport>::ListenerUpgrade: Send,
178        <Muxer as libp2p_core::muxing::StreamMuxer>::Substream: Send,
179        <Muxer as libp2p_core::muxing::StreamMuxer>::Error: Send + Sync,
180    {
181        self.without_tcp()
182            .without_quic()
183            .with_other_transport(constructor)
184    }
185}
186macro_rules! impl_tcp_phase_with_websocket {
187    ($providerKebabCase:literal, $providerPascalCase:ty, $websocketStream:ty) => {
188        #[cfg(all(feature = $providerKebabCase, not(target_arch = "wasm32"), feature = "websocket"))]
189        impl SwarmBuilder<$providerPascalCase, TcpPhase> {
190            /// See [`SwarmBuilder::with_websocket`].
191            pub async fn with_websocket <
192                SecUpgrade,
193                SecStream,
194                SecError,
195                MuxUpgrade,
196                MuxStream,
197                MuxError,
198            > (
199                self,
200                security_upgrade: SecUpgrade,
201                multiplexer_upgrade: MuxUpgrade,
202            ) -> Result<
203                    SwarmBuilder<
204                        $providerPascalCase,
205                        RelayPhase<impl AuthenticatedMultiplexedTransport>,
206                    >,
207                    WebsocketError<SecUpgrade::Error>,
208                >
209            where
210                SecStream: futures::AsyncRead + futures::AsyncWrite + Unpin + Send + 'static,
211                SecError: std::error::Error + Send + Sync + 'static,
212                SecUpgrade: IntoSecurityUpgrade<$websocketStream>,
213                SecUpgrade::Upgrade: InboundConnectionUpgrade<Negotiated<$websocketStream>, Output = (libp2p_identity::PeerId, SecStream), Error = SecError> + OutboundConnectionUpgrade<Negotiated<$websocketStream>, Output = (libp2p_identity::PeerId, SecStream), Error = SecError> + Clone + Send + 'static,
214            <SecUpgrade::Upgrade as InboundConnectionUpgrade<Negotiated<$websocketStream>>>::Future: Send,
215            <SecUpgrade::Upgrade as OutboundConnectionUpgrade<Negotiated<$websocketStream>>>::Future: Send,
216            <<<SecUpgrade as IntoSecurityUpgrade<$websocketStream>>::Upgrade as UpgradeInfo>::InfoIter as IntoIterator>::IntoIter: Send,
217            <<SecUpgrade as IntoSecurityUpgrade<$websocketStream>>::Upgrade as UpgradeInfo>::Info: Send,
218
219                MuxStream: StreamMuxer + Send + 'static,
220                MuxStream::Substream: Send + 'static,
221                MuxStream::Error: Send + Sync + 'static,
222                MuxUpgrade: IntoMultiplexerUpgrade<SecStream>,
223                MuxUpgrade::Upgrade: InboundConnectionUpgrade<Negotiated<SecStream>, Output = MuxStream, Error = MuxError> + OutboundConnectionUpgrade<Negotiated<SecStream>, Output = MuxStream, Error = MuxError> + Clone + Send + 'static,
224                <MuxUpgrade::Upgrade as InboundConnectionUpgrade<Negotiated<SecStream>>>::Future: Send,
225                <MuxUpgrade::Upgrade as OutboundConnectionUpgrade<Negotiated<SecStream>>>::Future: Send,
226                    MuxError: std::error::Error + Send + Sync + 'static,
227                <<<MuxUpgrade as IntoMultiplexerUpgrade<SecStream>>::Upgrade as UpgradeInfo>::InfoIter as IntoIterator>::IntoIter: Send,
228                <<MuxUpgrade as IntoMultiplexerUpgrade<SecStream>>::Upgrade as UpgradeInfo>::Info: Send,
229            {
230                self.without_tcp()
231                    .without_quic()
232                    .without_any_other_transports()
233                    .without_dns()
234                    .with_websocket(security_upgrade, multiplexer_upgrade)
235                    .await
236            }
237        }
238    }
239}
240impl_tcp_phase_with_websocket!(
241    "async-std",
242    super::provider::AsyncStd,
243    rw_stream_sink::RwStreamSink<
244        libp2p_websocket::BytesConnection<libp2p_tcp::async_io::TcpStream>,
245    >
246);
247impl_tcp_phase_with_websocket!(
248    "tokio",
249    super::provider::Tokio,
250    rw_stream_sink::RwStreamSink<libp2p_websocket::BytesConnection<libp2p_tcp::tokio::TcpStream>>
251);