1use super::*;
2use crate::SwarmBuilder;
3#[cfg(all(
4 not(target_arch = "wasm32"),
5 any(feature = "tcp", feature = "websocket")
6))]
7use libp2p_core::muxing::{StreamMuxer, StreamMuxerBox};
8#[cfg(all(feature = "websocket", not(target_arch = "wasm32")))]
9use libp2p_core::Transport;
10#[cfg(all(
11 not(target_arch = "wasm32"),
12 any(feature = "tcp", feature = "websocket")
13))]
14use libp2p_core::{
15 upgrade::InboundConnectionUpgrade, upgrade::OutboundConnectionUpgrade, Negotiated, UpgradeInfo,
16};
17use std::marker::PhantomData;
18
19pub struct TcpPhase {}
20
21macro_rules! impl_tcp_builder {
22 ($providerKebabCase:literal, $providerPascalCase:ty, $path:ident) => {
23 #[cfg(all(
24 not(target_arch = "wasm32"),
25 feature = "tcp",
26 feature = $providerKebabCase,
27 ))]
28 impl SwarmBuilder<$providerPascalCase, TcpPhase> {
29 pub fn with_tcp<SecUpgrade, SecStream, SecError, MuxUpgrade, MuxStream, MuxError>(
51 self,
52 tcp_config: libp2p_tcp::Config,
53 security_upgrade: SecUpgrade,
54 multiplexer_upgrade: MuxUpgrade,
55 ) -> Result<
56 SwarmBuilder<$providerPascalCase, QuicPhase<impl AuthenticatedMultiplexedTransport>>,
57 SecUpgrade::Error,
58 >
59 where
60 SecStream: futures::AsyncRead + futures::AsyncWrite + Unpin + Send + 'static,
61 SecError: std::error::Error + Send + Sync + 'static,
62 SecUpgrade: IntoSecurityUpgrade<libp2p_tcp::$path::TcpStream>,
63 SecUpgrade::Upgrade: InboundConnectionUpgrade<Negotiated<libp2p_tcp::$path::TcpStream>, Output = (libp2p_identity::PeerId, SecStream), Error = SecError> + OutboundConnectionUpgrade<Negotiated<libp2p_tcp::$path::TcpStream>, Output = (libp2p_identity::PeerId, SecStream), Error = SecError> + Clone + Send + 'static,
64 <SecUpgrade::Upgrade as InboundConnectionUpgrade<Negotiated<libp2p_tcp::$path::TcpStream>>>::Future: Send,
65 <SecUpgrade::Upgrade as OutboundConnectionUpgrade<Negotiated<libp2p_tcp::$path::TcpStream>>>::Future: Send,
66 <<<SecUpgrade as IntoSecurityUpgrade<libp2p_tcp::$path::TcpStream>>::Upgrade as UpgradeInfo>::InfoIter as IntoIterator>::IntoIter: Send,
67 <<SecUpgrade as IntoSecurityUpgrade<libp2p_tcp::$path::TcpStream>>::Upgrade as UpgradeInfo>::Info: Send,
68
69 MuxStream: StreamMuxer + Send + 'static,
70 MuxStream::Substream: Send + 'static,
71 MuxStream::Error: Send + Sync + 'static,
72 MuxUpgrade: IntoMultiplexerUpgrade<SecStream>,
73 MuxUpgrade::Upgrade: InboundConnectionUpgrade<Negotiated<SecStream>, Output = MuxStream, Error = MuxError> + OutboundConnectionUpgrade<Negotiated<SecStream>, Output = MuxStream, Error = MuxError> + Clone + Send + 'static,
74 <MuxUpgrade::Upgrade as InboundConnectionUpgrade<Negotiated<SecStream>>>::Future: Send,
75 <MuxUpgrade::Upgrade as OutboundConnectionUpgrade<Negotiated<SecStream>>>::Future: Send,
76 MuxError: std::error::Error + Send + Sync + 'static,
77 <<<MuxUpgrade as IntoMultiplexerUpgrade<SecStream>>::Upgrade as UpgradeInfo>::InfoIter as IntoIterator>::IntoIter: Send,
78 <<MuxUpgrade as IntoMultiplexerUpgrade<SecStream>>::Upgrade as UpgradeInfo>::Info: Send,
79 {
80 Ok(SwarmBuilder {
81 phase: QuicPhase {
82 transport: libp2p_tcp::$path::Transport::new(tcp_config)
83 .upgrade(libp2p_core::upgrade::Version::V1Lazy)
84 .authenticate(
85 security_upgrade.into_security_upgrade(&self.keypair)?,
86 )
87 .multiplex(multiplexer_upgrade.into_multiplexer_upgrade())
88 .map(|(p, c), _| (p, StreamMuxerBox::new(c))),
89 },
90 keypair: self.keypair,
91 phantom: PhantomData,
92 })
93 }
94 }
95 };
96}
97
98impl_tcp_builder!("async-std", super::provider::AsyncStd, async_io);
99impl_tcp_builder!("tokio", super::provider::Tokio, tokio);
100
101impl<Provider> SwarmBuilder<Provider, TcpPhase> {
102 pub(crate) fn without_tcp(
103 self,
104 ) -> SwarmBuilder<Provider, QuicPhase<impl AuthenticatedMultiplexedTransport>> {
105 SwarmBuilder {
106 keypair: self.keypair,
107 phantom: PhantomData,
108 phase: QuicPhase {
109 transport: libp2p_core::transport::dummy::DummyTransport::new(),
110 },
111 }
112 }
113}
114
115#[cfg(all(not(target_arch = "wasm32"), feature = "quic", feature = "async-std"))]
117impl SwarmBuilder<super::provider::AsyncStd, TcpPhase> {
118 pub fn with_quic(
119 self,
120 ) -> SwarmBuilder<
121 super::provider::AsyncStd,
122 OtherTransportPhase<impl AuthenticatedMultiplexedTransport>,
123 > {
124 self.without_tcp().with_quic()
125 }
126}
127#[cfg(all(not(target_arch = "wasm32"), feature = "quic", feature = "tokio"))]
128impl SwarmBuilder<super::provider::Tokio, TcpPhase> {
129 pub fn with_quic(
130 self,
131 ) -> SwarmBuilder<
132 super::provider::Tokio,
133 OtherTransportPhase<impl AuthenticatedMultiplexedTransport>,
134 > {
135 self.without_tcp().with_quic()
136 }
137}
138#[cfg(all(not(target_arch = "wasm32"), feature = "quic", feature = "async-std"))]
139impl SwarmBuilder<super::provider::AsyncStd, TcpPhase> {
140 pub fn with_quic_config(
141 self,
142 constructor: impl FnOnce(libp2p_quic::Config) -> libp2p_quic::Config,
143 ) -> SwarmBuilder<
144 super::provider::AsyncStd,
145 OtherTransportPhase<impl AuthenticatedMultiplexedTransport>,
146 > {
147 self.without_tcp().with_quic_config(constructor)
148 }
149}
150#[cfg(all(not(target_arch = "wasm32"), feature = "quic", feature = "tokio"))]
151impl SwarmBuilder<super::provider::Tokio, TcpPhase> {
152 pub fn with_quic_config(
153 self,
154 constructor: impl FnOnce(libp2p_quic::Config) -> libp2p_quic::Config,
155 ) -> SwarmBuilder<
156 super::provider::Tokio,
157 OtherTransportPhase<impl AuthenticatedMultiplexedTransport>,
158 > {
159 self.without_tcp().with_quic_config(constructor)
160 }
161}
162impl<Provider> SwarmBuilder<Provider, TcpPhase> {
163 pub fn with_other_transport<
164 Muxer: libp2p_core::muxing::StreamMuxer + Send + 'static,
165 OtherTransport: Transport<Output = (libp2p_identity::PeerId, Muxer)> + Send + Unpin + 'static,
166 R: TryIntoTransport<OtherTransport>,
167 >(
168 self,
169 constructor: impl FnOnce(&libp2p_identity::Keypair) -> R,
170 ) -> Result<
171 SwarmBuilder<Provider, OtherTransportPhase<impl AuthenticatedMultiplexedTransport>>,
172 R::Error,
173 >
174 where
175 <OtherTransport as Transport>::Error: Send + Sync + 'static,
176 <OtherTransport as Transport>::Dial: Send,
177 <OtherTransport as Transport>::ListenerUpgrade: Send,
178 <Muxer as libp2p_core::muxing::StreamMuxer>::Substream: Send,
179 <Muxer as libp2p_core::muxing::StreamMuxer>::Error: Send + Sync,
180 {
181 self.without_tcp()
182 .without_quic()
183 .with_other_transport(constructor)
184 }
185}
186macro_rules! impl_tcp_phase_with_websocket {
187 ($providerKebabCase:literal, $providerPascalCase:ty, $websocketStream:ty) => {
188 #[cfg(all(feature = $providerKebabCase, not(target_arch = "wasm32"), feature = "websocket"))]
189 impl SwarmBuilder<$providerPascalCase, TcpPhase> {
190 pub async fn with_websocket <
192 SecUpgrade,
193 SecStream,
194 SecError,
195 MuxUpgrade,
196 MuxStream,
197 MuxError,
198 > (
199 self,
200 security_upgrade: SecUpgrade,
201 multiplexer_upgrade: MuxUpgrade,
202 ) -> Result<
203 SwarmBuilder<
204 $providerPascalCase,
205 RelayPhase<impl AuthenticatedMultiplexedTransport>,
206 >,
207 WebsocketError<SecUpgrade::Error>,
208 >
209 where
210 SecStream: futures::AsyncRead + futures::AsyncWrite + Unpin + Send + 'static,
211 SecError: std::error::Error + Send + Sync + 'static,
212 SecUpgrade: IntoSecurityUpgrade<$websocketStream>,
213 SecUpgrade::Upgrade: InboundConnectionUpgrade<Negotiated<$websocketStream>, Output = (libp2p_identity::PeerId, SecStream), Error = SecError> + OutboundConnectionUpgrade<Negotiated<$websocketStream>, Output = (libp2p_identity::PeerId, SecStream), Error = SecError> + Clone + Send + 'static,
214 <SecUpgrade::Upgrade as InboundConnectionUpgrade<Negotiated<$websocketStream>>>::Future: Send,
215 <SecUpgrade::Upgrade as OutboundConnectionUpgrade<Negotiated<$websocketStream>>>::Future: Send,
216 <<<SecUpgrade as IntoSecurityUpgrade<$websocketStream>>::Upgrade as UpgradeInfo>::InfoIter as IntoIterator>::IntoIter: Send,
217 <<SecUpgrade as IntoSecurityUpgrade<$websocketStream>>::Upgrade as UpgradeInfo>::Info: Send,
218
219 MuxStream: StreamMuxer + Send + 'static,
220 MuxStream::Substream: Send + 'static,
221 MuxStream::Error: Send + Sync + 'static,
222 MuxUpgrade: IntoMultiplexerUpgrade<SecStream>,
223 MuxUpgrade::Upgrade: InboundConnectionUpgrade<Negotiated<SecStream>, Output = MuxStream, Error = MuxError> + OutboundConnectionUpgrade<Negotiated<SecStream>, Output = MuxStream, Error = MuxError> + Clone + Send + 'static,
224 <MuxUpgrade::Upgrade as InboundConnectionUpgrade<Negotiated<SecStream>>>::Future: Send,
225 <MuxUpgrade::Upgrade as OutboundConnectionUpgrade<Negotiated<SecStream>>>::Future: Send,
226 MuxError: std::error::Error + Send + Sync + 'static,
227 <<<MuxUpgrade as IntoMultiplexerUpgrade<SecStream>>::Upgrade as UpgradeInfo>::InfoIter as IntoIterator>::IntoIter: Send,
228 <<MuxUpgrade as IntoMultiplexerUpgrade<SecStream>>::Upgrade as UpgradeInfo>::Info: Send,
229 {
230 self.without_tcp()
231 .without_quic()
232 .without_any_other_transports()
233 .without_dns()
234 .with_websocket(security_upgrade, multiplexer_upgrade)
235 .await
236 }
237 }
238 }
239}
240impl_tcp_phase_with_websocket!(
241 "async-std",
242 super::provider::AsyncStd,
243 rw_stream_sink::RwStreamSink<
244 libp2p_websocket::BytesConnection<libp2p_tcp::async_io::TcpStream>,
245 >
246);
247impl_tcp_phase_with_websocket!(
248 "tokio",
249 super::provider::Tokio,
250 rw_stream_sink::RwStreamSink<libp2p_websocket::BytesConnection<libp2p_tcp::tokio::TcpStream>>
251);