1use crate::behaviour::FromSwarm;
22use crate::connection::ConnectionId;
23use crate::handler::{
24 AddressChange, ConnectionEvent, ConnectionHandler, ConnectionHandlerEvent, DialUpgradeError,
25 FullyNegotiatedInbound, FullyNegotiatedOutbound, ListenUpgradeError, SubstreamProtocol,
26};
27use crate::upgrade::SendWrapper;
28use crate::{
29 ConnectionDenied, NetworkBehaviour, THandler, THandlerInEvent, THandlerOutEvent, ToSwarm,
30};
31use either::Either;
32use futures::future;
33use libp2p_core::{upgrade::DeniedUpgrade, Endpoint, Multiaddr};
34use libp2p_identity::PeerId;
35use std::{task::Context, task::Poll};
36
37pub struct Toggle<TBehaviour> {
41 inner: Option<TBehaviour>,
42}
43
44impl<TBehaviour> Toggle<TBehaviour> {
45 pub fn is_enabled(&self) -> bool {
47 self.inner.is_some()
48 }
49
50 pub fn as_ref(&self) -> Option<&TBehaviour> {
52 self.inner.as_ref()
53 }
54
55 pub fn as_mut(&mut self) -> Option<&mut TBehaviour> {
57 self.inner.as_mut()
58 }
59}
60
61impl<TBehaviour> From<Option<TBehaviour>> for Toggle<TBehaviour> {
62 fn from(inner: Option<TBehaviour>) -> Self {
63 Toggle { inner }
64 }
65}
66
67impl<TBehaviour> NetworkBehaviour for Toggle<TBehaviour>
68where
69 TBehaviour: NetworkBehaviour,
70{
71 type ConnectionHandler = ToggleConnectionHandler<THandler<TBehaviour>>;
72 type ToSwarm = TBehaviour::ToSwarm;
73
74 fn handle_pending_inbound_connection(
75 &mut self,
76 connection_id: ConnectionId,
77 local_addr: &Multiaddr,
78 remote_addr: &Multiaddr,
79 ) -> Result<(), ConnectionDenied> {
80 let inner = match self.inner.as_mut() {
81 None => return Ok(()),
82 Some(inner) => inner,
83 };
84
85 inner.handle_pending_inbound_connection(connection_id, local_addr, remote_addr)?;
86
87 Ok(())
88 }
89
90 fn handle_established_inbound_connection(
91 &mut self,
92 connection_id: ConnectionId,
93 peer: PeerId,
94 local_addr: &Multiaddr,
95 remote_addr: &Multiaddr,
96 ) -> Result<THandler<Self>, ConnectionDenied> {
97 let inner = match self.inner.as_mut() {
98 None => return Ok(ToggleConnectionHandler { inner: None }),
99 Some(inner) => inner,
100 };
101
102 let handler = inner.handle_established_inbound_connection(
103 connection_id,
104 peer,
105 local_addr,
106 remote_addr,
107 )?;
108
109 Ok(ToggleConnectionHandler {
110 inner: Some(handler),
111 })
112 }
113
114 fn handle_pending_outbound_connection(
115 &mut self,
116 connection_id: ConnectionId,
117 maybe_peer: Option<PeerId>,
118 addresses: &[Multiaddr],
119 effective_role: Endpoint,
120 ) -> Result<Vec<Multiaddr>, ConnectionDenied> {
121 let inner = match self.inner.as_mut() {
122 None => return Ok(vec![]),
123 Some(inner) => inner,
124 };
125
126 let addresses = inner.handle_pending_outbound_connection(
127 connection_id,
128 maybe_peer,
129 addresses,
130 effective_role,
131 )?;
132
133 Ok(addresses)
134 }
135
136 fn handle_established_outbound_connection(
137 &mut self,
138 connection_id: ConnectionId,
139 peer: PeerId,
140 addr: &Multiaddr,
141 role_override: Endpoint,
142 ) -> Result<THandler<Self>, ConnectionDenied> {
143 let inner = match self.inner.as_mut() {
144 None => return Ok(ToggleConnectionHandler { inner: None }),
145 Some(inner) => inner,
146 };
147
148 let handler = inner.handle_established_outbound_connection(
149 connection_id,
150 peer,
151 addr,
152 role_override,
153 )?;
154
155 Ok(ToggleConnectionHandler {
156 inner: Some(handler),
157 })
158 }
159
160 fn on_swarm_event(&mut self, event: FromSwarm) {
161 if let Some(behaviour) = &mut self.inner {
162 behaviour.on_swarm_event(event);
163 }
164 }
165
166 fn on_connection_handler_event(
167 &mut self,
168 peer_id: PeerId,
169 connection_id: ConnectionId,
170 event: THandlerOutEvent<Self>,
171 ) {
172 if let Some(behaviour) = &mut self.inner {
173 behaviour.on_connection_handler_event(peer_id, connection_id, event)
174 }
175 }
176
177 fn poll(
178 &mut self,
179 cx: &mut Context<'_>,
180 ) -> Poll<ToSwarm<Self::ToSwarm, THandlerInEvent<Self>>> {
181 if let Some(inner) = self.inner.as_mut() {
182 inner.poll(cx)
183 } else {
184 Poll::Pending
185 }
186 }
187}
188
189pub struct ToggleConnectionHandler<TInner> {
191 inner: Option<TInner>,
192}
193
194impl<TInner> ToggleConnectionHandler<TInner>
195where
196 TInner: ConnectionHandler,
197{
198 fn on_fully_negotiated_inbound(
199 &mut self,
200 FullyNegotiatedInbound {
201 protocol: out,
202 info,
203 }: FullyNegotiatedInbound<
204 <Self as ConnectionHandler>::InboundProtocol,
205 <Self as ConnectionHandler>::InboundOpenInfo,
206 >,
207 ) {
208 let out = match out {
209 future::Either::Left(out) => out,
210 future::Either::Right(v) => void::unreachable(v),
211 };
212
213 if let Either::Left(info) = info {
214 self.inner
215 .as_mut()
216 .expect("Can't receive an inbound substream if disabled; QED")
217 .on_connection_event(ConnectionEvent::FullyNegotiatedInbound(
218 FullyNegotiatedInbound {
219 protocol: out,
220 info,
221 },
222 ));
223 } else {
224 panic!("Unexpected Either::Right in enabled `on_fully_negotiated_inbound`.")
225 }
226 }
227
228 fn on_listen_upgrade_error(
229 &mut self,
230 ListenUpgradeError { info, error: err }: ListenUpgradeError<
231 <Self as ConnectionHandler>::InboundOpenInfo,
232 <Self as ConnectionHandler>::InboundProtocol,
233 >,
234 ) {
235 let (inner, info) = match (self.inner.as_mut(), info) {
236 (Some(inner), Either::Left(info)) => (inner, info),
237 (None, Either::Right(())) => return,
239 (Some(_), Either::Right(())) => panic!(
240 "Unexpected `Either::Right` inbound info through \
241 `on_listen_upgrade_error` in enabled state.",
242 ),
243 (None, Either::Left(_)) => panic!(
244 "Unexpected `Either::Left` inbound info through \
245 `on_listen_upgrade_error` in disabled state.",
246 ),
247 };
248
249 let err = match err {
250 Either::Left(e) => e,
251 Either::Right(v) => void::unreachable(v),
252 };
253
254 inner.on_connection_event(ConnectionEvent::ListenUpgradeError(ListenUpgradeError {
255 info,
256 error: err,
257 }));
258 }
259}
260
261impl<TInner> ConnectionHandler for ToggleConnectionHandler<TInner>
262where
263 TInner: ConnectionHandler,
264{
265 type FromBehaviour = TInner::FromBehaviour;
266 type ToBehaviour = TInner::ToBehaviour;
267 type InboundProtocol = Either<SendWrapper<TInner::InboundProtocol>, SendWrapper<DeniedUpgrade>>;
268 type OutboundProtocol = TInner::OutboundProtocol;
269 type OutboundOpenInfo = TInner::OutboundOpenInfo;
270 type InboundOpenInfo = Either<TInner::InboundOpenInfo, ()>;
271
272 fn listen_protocol(&self) -> SubstreamProtocol<Self::InboundProtocol, Self::InboundOpenInfo> {
273 if let Some(inner) = self.inner.as_ref() {
274 inner
275 .listen_protocol()
276 .map_upgrade(|u| Either::Left(SendWrapper(u)))
277 .map_info(Either::Left)
278 } else {
279 SubstreamProtocol::new(Either::Right(SendWrapper(DeniedUpgrade)), Either::Right(()))
280 }
281 }
282
283 fn on_behaviour_event(&mut self, event: Self::FromBehaviour) {
284 self.inner
285 .as_mut()
286 .expect("Can't receive events if disabled; QED")
287 .on_behaviour_event(event)
288 }
289
290 fn connection_keep_alive(&self) -> bool {
291 self.inner
292 .as_ref()
293 .map(|h| h.connection_keep_alive())
294 .unwrap_or(false)
295 }
296
297 fn poll(
298 &mut self,
299 cx: &mut Context<'_>,
300 ) -> Poll<
301 ConnectionHandlerEvent<Self::OutboundProtocol, Self::OutboundOpenInfo, Self::ToBehaviour>,
302 > {
303 if let Some(inner) = self.inner.as_mut() {
304 inner.poll(cx)
305 } else {
306 Poll::Pending
307 }
308 }
309
310 fn on_connection_event(
311 &mut self,
312 event: ConnectionEvent<
313 Self::InboundProtocol,
314 Self::OutboundProtocol,
315 Self::InboundOpenInfo,
316 Self::OutboundOpenInfo,
317 >,
318 ) {
319 match event {
320 ConnectionEvent::FullyNegotiatedInbound(fully_negotiated_inbound) => {
321 self.on_fully_negotiated_inbound(fully_negotiated_inbound)
322 }
323 ConnectionEvent::FullyNegotiatedOutbound(FullyNegotiatedOutbound {
324 protocol: out,
325 info,
326 }) => self
327 .inner
328 .as_mut()
329 .expect("Can't receive an outbound substream if disabled; QED")
330 .on_connection_event(ConnectionEvent::FullyNegotiatedOutbound(
331 FullyNegotiatedOutbound {
332 protocol: out,
333 info,
334 },
335 )),
336 ConnectionEvent::AddressChange(address_change) => {
337 if let Some(inner) = self.inner.as_mut() {
338 inner.on_connection_event(ConnectionEvent::AddressChange(AddressChange {
339 new_address: address_change.new_address,
340 }));
341 }
342 }
343 ConnectionEvent::DialUpgradeError(DialUpgradeError { info, error: err }) => self
344 .inner
345 .as_mut()
346 .expect("Can't receive an outbound substream if disabled; QED")
347 .on_connection_event(ConnectionEvent::DialUpgradeError(DialUpgradeError {
348 info,
349 error: err,
350 })),
351 ConnectionEvent::ListenUpgradeError(listen_upgrade_error) => {
352 self.on_listen_upgrade_error(listen_upgrade_error)
353 }
354 ConnectionEvent::LocalProtocolsChange(change) => {
355 if let Some(inner) = self.inner.as_mut() {
356 inner.on_connection_event(ConnectionEvent::LocalProtocolsChange(change));
357 }
358 }
359 ConnectionEvent::RemoteProtocolsChange(change) => {
360 if let Some(inner) = self.inner.as_mut() {
361 inner.on_connection_event(ConnectionEvent::RemoteProtocolsChange(change));
362 }
363 }
364 }
365 }
366
367 fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll<Option<Self::ToBehaviour>> {
368 let Some(inner) = self.inner.as_mut() else {
369 return Poll::Ready(None);
370 };
371
372 inner.poll_close(cx)
373 }
374}