libp2p_swarm/behaviour/
toggle.rs

1// Copyright 2019 Parity Technologies (UK) Ltd.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21use crate::behaviour::FromSwarm;
22use crate::connection::ConnectionId;
23use crate::handler::{
24    AddressChange, ConnectionEvent, ConnectionHandler, ConnectionHandlerEvent, DialUpgradeError,
25    FullyNegotiatedInbound, FullyNegotiatedOutbound, ListenUpgradeError, SubstreamProtocol,
26};
27use crate::upgrade::SendWrapper;
28use crate::{
29    ConnectionDenied, NetworkBehaviour, THandler, THandlerInEvent, THandlerOutEvent, ToSwarm,
30};
31use either::Either;
32use futures::future;
33use libp2p_core::{upgrade::DeniedUpgrade, Endpoint, Multiaddr};
34use libp2p_identity::PeerId;
35use std::{task::Context, task::Poll};
36
37/// Implementation of `NetworkBehaviour` that can be either in the disabled or enabled state.
38///
39/// The state can only be chosen at initialization.
40pub struct Toggle<TBehaviour> {
41    inner: Option<TBehaviour>,
42}
43
44impl<TBehaviour> Toggle<TBehaviour> {
45    /// Returns `true` if `Toggle` is enabled and `false` if it's disabled.
46    pub fn is_enabled(&self) -> bool {
47        self.inner.is_some()
48    }
49
50    /// Returns a reference to the inner `NetworkBehaviour`.
51    pub fn as_ref(&self) -> Option<&TBehaviour> {
52        self.inner.as_ref()
53    }
54
55    /// Returns a mutable reference to the inner `NetworkBehaviour`.
56    pub fn as_mut(&mut self) -> Option<&mut TBehaviour> {
57        self.inner.as_mut()
58    }
59}
60
61impl<TBehaviour> From<Option<TBehaviour>> for Toggle<TBehaviour> {
62    fn from(inner: Option<TBehaviour>) -> Self {
63        Toggle { inner }
64    }
65}
66
67impl<TBehaviour> NetworkBehaviour for Toggle<TBehaviour>
68where
69    TBehaviour: NetworkBehaviour,
70{
71    type ConnectionHandler = ToggleConnectionHandler<THandler<TBehaviour>>;
72    type ToSwarm = TBehaviour::ToSwarm;
73
74    fn handle_pending_inbound_connection(
75        &mut self,
76        connection_id: ConnectionId,
77        local_addr: &Multiaddr,
78        remote_addr: &Multiaddr,
79    ) -> Result<(), ConnectionDenied> {
80        let inner = match self.inner.as_mut() {
81            None => return Ok(()),
82            Some(inner) => inner,
83        };
84
85        inner.handle_pending_inbound_connection(connection_id, local_addr, remote_addr)?;
86
87        Ok(())
88    }
89
90    fn handle_established_inbound_connection(
91        &mut self,
92        connection_id: ConnectionId,
93        peer: PeerId,
94        local_addr: &Multiaddr,
95        remote_addr: &Multiaddr,
96    ) -> Result<THandler<Self>, ConnectionDenied> {
97        let inner = match self.inner.as_mut() {
98            None => return Ok(ToggleConnectionHandler { inner: None }),
99            Some(inner) => inner,
100        };
101
102        let handler = inner.handle_established_inbound_connection(
103            connection_id,
104            peer,
105            local_addr,
106            remote_addr,
107        )?;
108
109        Ok(ToggleConnectionHandler {
110            inner: Some(handler),
111        })
112    }
113
114    fn handle_pending_outbound_connection(
115        &mut self,
116        connection_id: ConnectionId,
117        maybe_peer: Option<PeerId>,
118        addresses: &[Multiaddr],
119        effective_role: Endpoint,
120    ) -> Result<Vec<Multiaddr>, ConnectionDenied> {
121        let inner = match self.inner.as_mut() {
122            None => return Ok(vec![]),
123            Some(inner) => inner,
124        };
125
126        let addresses = inner.handle_pending_outbound_connection(
127            connection_id,
128            maybe_peer,
129            addresses,
130            effective_role,
131        )?;
132
133        Ok(addresses)
134    }
135
136    fn handle_established_outbound_connection(
137        &mut self,
138        connection_id: ConnectionId,
139        peer: PeerId,
140        addr: &Multiaddr,
141        role_override: Endpoint,
142    ) -> Result<THandler<Self>, ConnectionDenied> {
143        let inner = match self.inner.as_mut() {
144            None => return Ok(ToggleConnectionHandler { inner: None }),
145            Some(inner) => inner,
146        };
147
148        let handler = inner.handle_established_outbound_connection(
149            connection_id,
150            peer,
151            addr,
152            role_override,
153        )?;
154
155        Ok(ToggleConnectionHandler {
156            inner: Some(handler),
157        })
158    }
159
160    fn on_swarm_event(&mut self, event: FromSwarm) {
161        if let Some(behaviour) = &mut self.inner {
162            behaviour.on_swarm_event(event);
163        }
164    }
165
166    fn on_connection_handler_event(
167        &mut self,
168        peer_id: PeerId,
169        connection_id: ConnectionId,
170        event: THandlerOutEvent<Self>,
171    ) {
172        if let Some(behaviour) = &mut self.inner {
173            behaviour.on_connection_handler_event(peer_id, connection_id, event)
174        }
175    }
176
177    fn poll(
178        &mut self,
179        cx: &mut Context<'_>,
180    ) -> Poll<ToSwarm<Self::ToSwarm, THandlerInEvent<Self>>> {
181        if let Some(inner) = self.inner.as_mut() {
182            inner.poll(cx)
183        } else {
184            Poll::Pending
185        }
186    }
187}
188
189/// Implementation of [`ConnectionHandler`] that can be in the disabled state.
190pub struct ToggleConnectionHandler<TInner> {
191    inner: Option<TInner>,
192}
193
194impl<TInner> ToggleConnectionHandler<TInner>
195where
196    TInner: ConnectionHandler,
197{
198    fn on_fully_negotiated_inbound(
199        &mut self,
200        FullyNegotiatedInbound {
201            protocol: out,
202            info,
203        }: FullyNegotiatedInbound<
204            <Self as ConnectionHandler>::InboundProtocol,
205            <Self as ConnectionHandler>::InboundOpenInfo,
206        >,
207    ) {
208        let out = match out {
209            future::Either::Left(out) => out,
210            future::Either::Right(v) => void::unreachable(v),
211        };
212
213        if let Either::Left(info) = info {
214            self.inner
215                .as_mut()
216                .expect("Can't receive an inbound substream if disabled; QED")
217                .on_connection_event(ConnectionEvent::FullyNegotiatedInbound(
218                    FullyNegotiatedInbound {
219                        protocol: out,
220                        info,
221                    },
222                ));
223        } else {
224            panic!("Unexpected Either::Right in enabled `on_fully_negotiated_inbound`.")
225        }
226    }
227
228    fn on_listen_upgrade_error(
229        &mut self,
230        ListenUpgradeError { info, error: err }: ListenUpgradeError<
231            <Self as ConnectionHandler>::InboundOpenInfo,
232            <Self as ConnectionHandler>::InboundProtocol,
233        >,
234    ) {
235        let (inner, info) = match (self.inner.as_mut(), info) {
236            (Some(inner), Either::Left(info)) => (inner, info),
237            // Ignore listen upgrade errors in disabled state.
238            (None, Either::Right(())) => return,
239            (Some(_), Either::Right(())) => panic!(
240                "Unexpected `Either::Right` inbound info through \
241                 `on_listen_upgrade_error` in enabled state.",
242            ),
243            (None, Either::Left(_)) => panic!(
244                "Unexpected `Either::Left` inbound info through \
245                 `on_listen_upgrade_error` in disabled state.",
246            ),
247        };
248
249        let err = match err {
250            Either::Left(e) => e,
251            Either::Right(v) => void::unreachable(v),
252        };
253
254        inner.on_connection_event(ConnectionEvent::ListenUpgradeError(ListenUpgradeError {
255            info,
256            error: err,
257        }));
258    }
259}
260
261impl<TInner> ConnectionHandler for ToggleConnectionHandler<TInner>
262where
263    TInner: ConnectionHandler,
264{
265    type FromBehaviour = TInner::FromBehaviour;
266    type ToBehaviour = TInner::ToBehaviour;
267    type InboundProtocol = Either<SendWrapper<TInner::InboundProtocol>, SendWrapper<DeniedUpgrade>>;
268    type OutboundProtocol = TInner::OutboundProtocol;
269    type OutboundOpenInfo = TInner::OutboundOpenInfo;
270    type InboundOpenInfo = Either<TInner::InboundOpenInfo, ()>;
271
272    fn listen_protocol(&self) -> SubstreamProtocol<Self::InboundProtocol, Self::InboundOpenInfo> {
273        if let Some(inner) = self.inner.as_ref() {
274            inner
275                .listen_protocol()
276                .map_upgrade(|u| Either::Left(SendWrapper(u)))
277                .map_info(Either::Left)
278        } else {
279            SubstreamProtocol::new(Either::Right(SendWrapper(DeniedUpgrade)), Either::Right(()))
280        }
281    }
282
283    fn on_behaviour_event(&mut self, event: Self::FromBehaviour) {
284        self.inner
285            .as_mut()
286            .expect("Can't receive events if disabled; QED")
287            .on_behaviour_event(event)
288    }
289
290    fn connection_keep_alive(&self) -> bool {
291        self.inner
292            .as_ref()
293            .map(|h| h.connection_keep_alive())
294            .unwrap_or(false)
295    }
296
297    fn poll(
298        &mut self,
299        cx: &mut Context<'_>,
300    ) -> Poll<
301        ConnectionHandlerEvent<Self::OutboundProtocol, Self::OutboundOpenInfo, Self::ToBehaviour>,
302    > {
303        if let Some(inner) = self.inner.as_mut() {
304            inner.poll(cx)
305        } else {
306            Poll::Pending
307        }
308    }
309
310    fn on_connection_event(
311        &mut self,
312        event: ConnectionEvent<
313            Self::InboundProtocol,
314            Self::OutboundProtocol,
315            Self::InboundOpenInfo,
316            Self::OutboundOpenInfo,
317        >,
318    ) {
319        match event {
320            ConnectionEvent::FullyNegotiatedInbound(fully_negotiated_inbound) => {
321                self.on_fully_negotiated_inbound(fully_negotiated_inbound)
322            }
323            ConnectionEvent::FullyNegotiatedOutbound(FullyNegotiatedOutbound {
324                protocol: out,
325                info,
326            }) => self
327                .inner
328                .as_mut()
329                .expect("Can't receive an outbound substream if disabled; QED")
330                .on_connection_event(ConnectionEvent::FullyNegotiatedOutbound(
331                    FullyNegotiatedOutbound {
332                        protocol: out,
333                        info,
334                    },
335                )),
336            ConnectionEvent::AddressChange(address_change) => {
337                if let Some(inner) = self.inner.as_mut() {
338                    inner.on_connection_event(ConnectionEvent::AddressChange(AddressChange {
339                        new_address: address_change.new_address,
340                    }));
341                }
342            }
343            ConnectionEvent::DialUpgradeError(DialUpgradeError { info, error: err }) => self
344                .inner
345                .as_mut()
346                .expect("Can't receive an outbound substream if disabled; QED")
347                .on_connection_event(ConnectionEvent::DialUpgradeError(DialUpgradeError {
348                    info,
349                    error: err,
350                })),
351            ConnectionEvent::ListenUpgradeError(listen_upgrade_error) => {
352                self.on_listen_upgrade_error(listen_upgrade_error)
353            }
354            ConnectionEvent::LocalProtocolsChange(change) => {
355                if let Some(inner) = self.inner.as_mut() {
356                    inner.on_connection_event(ConnectionEvent::LocalProtocolsChange(change));
357                }
358            }
359            ConnectionEvent::RemoteProtocolsChange(change) => {
360                if let Some(inner) = self.inner.as_mut() {
361                    inner.on_connection_event(ConnectionEvent::RemoteProtocolsChange(change));
362                }
363            }
364        }
365    }
366
367    fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll<Option<Self::ToBehaviour>> {
368        let Some(inner) = self.inner.as_mut() else {
369            return Poll::Ready(None);
370        };
371
372        inner.poll_close(cx)
373    }
374}