libp2p_swarm/
handler.rs

1// Copyright 2018 Parity Technologies (UK) Ltd.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21//! Once a connection to a remote peer is established, a [`ConnectionHandler`] negotiates
22//! and handles one or more specific protocols on the connection.
23//!
24//! Protocols are negotiated and used on individual substreams of the connection. Thus a
25//! [`ConnectionHandler`] defines the inbound and outbound upgrades to apply when creating a new
26//! inbound or outbound substream, respectively, and is notified by a [`Swarm`](crate::Swarm) when
27//! these upgrades have been successfully applied, including the final output of the upgrade. A
28//! [`ConnectionHandler`] can then continue communicating with the peer over the substream using the
29//! negotiated protocol(s).
30//!
31//! Two [`ConnectionHandler`]s can be composed with [`ConnectionHandler::select()`]
32//! in order to build a new handler supporting the combined set of protocols,
33//! with methods being dispatched to the appropriate handler according to the
34//! used protocol(s) determined by the associated types of the handlers.
35//!
36//! > **Note**: A [`ConnectionHandler`] handles one or more protocols in the context of a single
37//! >           connection with a remote. In order to handle a protocol that requires knowledge of
38//! >           the network as a whole, see the
39//! >           [`NetworkBehaviour`](crate::behaviour::NetworkBehaviour) trait.
40
41pub mod either;
42mod map_in;
43mod map_out;
44pub mod multi;
45mod one_shot;
46mod pending;
47mod select;
48
49pub use crate::upgrade::{InboundUpgradeSend, OutboundUpgradeSend, SendWrapper, UpgradeInfoSend};
50pub use map_in::MapInEvent;
51pub use map_out::MapOutEvent;
52pub use one_shot::{OneShotHandler, OneShotHandlerConfig};
53pub use pending::PendingConnectionHandler;
54pub use select::ConnectionHandlerSelect;
55
56use crate::StreamProtocol;
57use ::either::Either;
58use libp2p_core::Multiaddr;
59use once_cell::sync::Lazy;
60use smallvec::SmallVec;
61use std::collections::hash_map::RandomState;
62use std::collections::hash_set::{Difference, Intersection};
63use std::collections::HashSet;
64use std::iter::Peekable;
65use std::{error, fmt, io, task::Context, task::Poll, time::Duration};
66
67/// A handler for a set of protocols used on a connection with a remote.
68///
69/// This trait should be implemented for a type that maintains the state for
70/// the execution of a specific protocol with a remote.
71///
72/// # Handling a protocol
73///
74/// Communication with a remote over a set of protocols is initiated in one of two ways:
75///
76///   1. Dialing by initiating a new outbound substream. In order to do so,
77///      [`ConnectionHandler::poll()`] must return an [`ConnectionHandlerEvent::OutboundSubstreamRequest`],
78///      providing an instance of [`libp2p_core::upgrade::OutboundUpgrade`] that is used to negotiate the
79///      protocol(s). Upon success, [`ConnectionHandler::on_connection_event`] is called with
80///      [`ConnectionEvent::FullyNegotiatedOutbound`] translating the final output of the upgrade.
81///
82///   2. Listening by accepting a new inbound substream. When a new inbound substream
83///      is created on a connection, [`ConnectionHandler::listen_protocol`] is called
84///      to obtain an instance of [`libp2p_core::upgrade::InboundUpgrade`] that is used to
85///      negotiate the protocol(s). Upon success,
86///      [`ConnectionHandler::on_connection_event`] is called with [`ConnectionEvent::FullyNegotiatedInbound`]
87///      translating the final output of the upgrade.
88///
89///
90/// # Connection Keep-Alive
91///
92/// A [`ConnectionHandler`] can influence the lifetime of the underlying connection
93/// through [`ConnectionHandler::connection_keep_alive`]. That is, the protocol
94/// implemented by the handler can include conditions for terminating the connection.
95/// The lifetime of successfully negotiated substreams is fully controlled by the handler.
96///
97/// Implementors of this trait should keep in mind that the connection can be closed at any time.
98/// When a connection is closed gracefully, the substreams used by the handler may still
99/// continue reading data until the remote closes its side of the connection.
100pub trait ConnectionHandler: Send + 'static {
101    /// A type representing the message(s) a [`NetworkBehaviour`](crate::behaviour::NetworkBehaviour) can send to a [`ConnectionHandler`] via [`ToSwarm::NotifyHandler`](crate::behaviour::ToSwarm::NotifyHandler)
102    type FromBehaviour: fmt::Debug + Send + 'static;
103    /// A type representing message(s) a [`ConnectionHandler`] can send to a [`NetworkBehaviour`](crate::behaviour::NetworkBehaviour) via [`ConnectionHandlerEvent::NotifyBehaviour`].
104    type ToBehaviour: fmt::Debug + Send + 'static;
105    /// The inbound upgrade for the protocol(s) used by the handler.
106    type InboundProtocol: InboundUpgradeSend;
107    /// The outbound upgrade for the protocol(s) used by the handler.
108    type OutboundProtocol: OutboundUpgradeSend;
109    /// The type of additional information returned from `listen_protocol`.
110    type InboundOpenInfo: Send + 'static;
111    /// The type of additional information passed to an `OutboundSubstreamRequest`.
112    type OutboundOpenInfo: Send + 'static;
113
114    /// The [`InboundUpgrade`](libp2p_core::upgrade::InboundUpgrade) to apply on inbound
115    /// substreams to negotiate the desired protocols.
116    ///
117    /// > **Note**: The returned `InboundUpgrade` should always accept all the generally
118    /// >           supported protocols, even if in a specific context a particular one is
119    /// >           not supported, (eg. when only allowing one substream at a time for a protocol).
120    /// >           This allows a remote to put the list of supported protocols in a cache.
121    fn listen_protocol(&self) -> SubstreamProtocol<Self::InboundProtocol, Self::InboundOpenInfo>;
122
123    /// Returns whether the connection should be kept alive.
124    ///
125    /// ## Keep alive algorithm
126    ///
127    /// A connection is always kept alive:
128    ///
129    /// - Whilst a [`ConnectionHandler`] returns [`Poll::Ready`].
130    /// - We are negotiating inbound or outbound streams.
131    /// - There are active [`Stream`](crate::Stream)s on the connection.
132    ///
133    /// The combination of the above means that _most_ protocols will not need to override this method.
134    /// This method is only invoked when all of the above are `false`, i.e. when the connection is entirely idle.
135    ///
136    /// ## Exceptions
137    ///
138    /// - Protocols like [circuit-relay v2](https://github.com/libp2p/specs/blob/master/relay/circuit-v2.md) need to keep a connection alive beyond these circumstances and can thus override this method.
139    /// - Protocols like [ping](https://github.com/libp2p/specs/blob/master/ping/ping.md) **don't** want to keep a connection alive despite an active streams.
140    /// In that case, protocol authors can use [`Stream::ignore_for_keep_alive`](crate::Stream::ignore_for_keep_alive) to opt-out a particular stream from the keep-alive algorithm.
141    fn connection_keep_alive(&self) -> bool {
142        false
143    }
144
145    /// Should behave like `Stream::poll()`.
146    fn poll(
147        &mut self,
148        cx: &mut Context<'_>,
149    ) -> Poll<
150        ConnectionHandlerEvent<Self::OutboundProtocol, Self::OutboundOpenInfo, Self::ToBehaviour>,
151    >;
152
153    /// Gracefully close the [`ConnectionHandler`].
154    ///
155    /// The contract for this function is equivalent to a [`Stream`](futures::Stream).
156    /// When a connection is being shut down, we will first poll this function to completion.
157    /// Following that, the physical connection will be shut down.
158    ///
159    /// This is also called when the shutdown was initiated due to an error on the connection.
160    /// We therefore cannot guarantee that performing IO within here will succeed.
161    ///
162    /// To signal completion, [`Poll::Ready(None)`] should be returned.
163    ///
164    /// Implementations MUST have a [`fuse`](futures::StreamExt::fuse)-like behaviour.
165    /// That is, [`Poll::Ready(None)`] MUST be returned on repeated calls to [`ConnectionHandler::poll_close`].
166    fn poll_close(&mut self, _: &mut Context<'_>) -> Poll<Option<Self::ToBehaviour>> {
167        Poll::Ready(None)
168    }
169
170    /// Adds a closure that turns the input event into something else.
171    fn map_in_event<TNewIn, TMap>(self, map: TMap) -> MapInEvent<Self, TNewIn, TMap>
172    where
173        Self: Sized,
174        TMap: Fn(&TNewIn) -> Option<&Self::FromBehaviour>,
175    {
176        MapInEvent::new(self, map)
177    }
178
179    /// Adds a closure that turns the output event into something else.
180    fn map_out_event<TMap, TNewOut>(self, map: TMap) -> MapOutEvent<Self, TMap>
181    where
182        Self: Sized,
183        TMap: FnMut(Self::ToBehaviour) -> TNewOut,
184    {
185        MapOutEvent::new(self, map)
186    }
187
188    /// Creates a new [`ConnectionHandler`] that selects either this handler or
189    /// `other` by delegating methods calls appropriately.
190    fn select<TProto2>(self, other: TProto2) -> ConnectionHandlerSelect<Self, TProto2>
191    where
192        Self: Sized,
193    {
194        ConnectionHandlerSelect::new(self, other)
195    }
196
197    /// Informs the handler about an event from the [`NetworkBehaviour`](super::NetworkBehaviour).
198    fn on_behaviour_event(&mut self, _event: Self::FromBehaviour);
199
200    fn on_connection_event(
201        &mut self,
202        event: ConnectionEvent<
203            Self::InboundProtocol,
204            Self::OutboundProtocol,
205            Self::InboundOpenInfo,
206            Self::OutboundOpenInfo,
207        >,
208    );
209}
210
211/// Enumeration with the list of the possible stream events
212/// to pass to [`on_connection_event`](ConnectionHandler::on_connection_event).
213#[non_exhaustive]
214pub enum ConnectionEvent<'a, IP: InboundUpgradeSend, OP: OutboundUpgradeSend, IOI, OOI> {
215    /// Informs the handler about the output of a successful upgrade on a new inbound substream.
216    FullyNegotiatedInbound(FullyNegotiatedInbound<IP, IOI>),
217    /// Informs the handler about the output of a successful upgrade on a new outbound stream.
218    FullyNegotiatedOutbound(FullyNegotiatedOutbound<OP, OOI>),
219    /// Informs the handler about a change in the address of the remote.
220    AddressChange(AddressChange<'a>),
221    /// Informs the handler that upgrading an outbound substream to the given protocol has failed.
222    DialUpgradeError(DialUpgradeError<OOI, OP>),
223    /// Informs the handler that upgrading an inbound substream to the given protocol has failed.
224    ListenUpgradeError(ListenUpgradeError<IOI, IP>),
225    /// The local [`ConnectionHandler`] added or removed support for one or more protocols.
226    LocalProtocolsChange(ProtocolsChange<'a>),
227    /// The remote [`ConnectionHandler`] now supports a different set of protocols.
228    RemoteProtocolsChange(ProtocolsChange<'a>),
229}
230
231impl<'a, IP, OP, IOI, OOI> fmt::Debug for ConnectionEvent<'a, IP, OP, IOI, OOI>
232where
233    IP: InboundUpgradeSend + fmt::Debug,
234    IP::Output: fmt::Debug,
235    IP::Error: fmt::Debug,
236    OP: OutboundUpgradeSend + fmt::Debug,
237    OP::Output: fmt::Debug,
238    OP::Error: fmt::Debug,
239    IOI: fmt::Debug,
240    OOI: fmt::Debug,
241{
242    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
243        match self {
244            ConnectionEvent::FullyNegotiatedInbound(v) => {
245                f.debug_tuple("FullyNegotiatedInbound").field(v).finish()
246            }
247            ConnectionEvent::FullyNegotiatedOutbound(v) => {
248                f.debug_tuple("FullyNegotiatedOutbound").field(v).finish()
249            }
250            ConnectionEvent::AddressChange(v) => f.debug_tuple("AddressChange").field(v).finish(),
251            ConnectionEvent::DialUpgradeError(v) => {
252                f.debug_tuple("DialUpgradeError").field(v).finish()
253            }
254            ConnectionEvent::ListenUpgradeError(v) => {
255                f.debug_tuple("ListenUpgradeError").field(v).finish()
256            }
257            ConnectionEvent::LocalProtocolsChange(v) => {
258                f.debug_tuple("LocalProtocolsChange").field(v).finish()
259            }
260            ConnectionEvent::RemoteProtocolsChange(v) => {
261                f.debug_tuple("RemoteProtocolsChange").field(v).finish()
262            }
263        }
264    }
265}
266
267impl<'a, IP: InboundUpgradeSend, OP: OutboundUpgradeSend, IOI, OOI>
268    ConnectionEvent<'a, IP, OP, IOI, OOI>
269{
270    /// Whether the event concerns an outbound stream.
271    pub fn is_outbound(&self) -> bool {
272        match self {
273            ConnectionEvent::DialUpgradeError(_) | ConnectionEvent::FullyNegotiatedOutbound(_) => {
274                true
275            }
276            ConnectionEvent::FullyNegotiatedInbound(_)
277            | ConnectionEvent::AddressChange(_)
278            | ConnectionEvent::LocalProtocolsChange(_)
279            | ConnectionEvent::RemoteProtocolsChange(_)
280            | ConnectionEvent::ListenUpgradeError(_) => false,
281        }
282    }
283
284    /// Whether the event concerns an inbound stream.
285    pub fn is_inbound(&self) -> bool {
286        match self {
287            ConnectionEvent::FullyNegotiatedInbound(_) | ConnectionEvent::ListenUpgradeError(_) => {
288                true
289            }
290            ConnectionEvent::FullyNegotiatedOutbound(_)
291            | ConnectionEvent::AddressChange(_)
292            | ConnectionEvent::LocalProtocolsChange(_)
293            | ConnectionEvent::RemoteProtocolsChange(_)
294            | ConnectionEvent::DialUpgradeError(_) => false,
295        }
296    }
297}
298
299/// [`ConnectionEvent`] variant that informs the handler about
300/// the output of a successful upgrade on a new inbound substream.
301///
302/// Note that it is up to the [`ConnectionHandler`] implementation to manage the lifetime of the
303/// negotiated inbound substreams. E.g. the implementation has to enforce a limit on the number
304/// of simultaneously open negotiated inbound substreams. In other words it is up to the
305/// [`ConnectionHandler`] implementation to stop a malicious remote node to open and keep alive
306/// an excessive amount of inbound substreams.
307#[derive(Debug)]
308pub struct FullyNegotiatedInbound<IP: InboundUpgradeSend, IOI> {
309    pub protocol: IP::Output,
310    pub info: IOI,
311}
312
313/// [`ConnectionEvent`] variant that informs the handler about successful upgrade on a new outbound stream.
314///
315/// The `protocol` field is the information that was previously passed to
316/// [`ConnectionHandlerEvent::OutboundSubstreamRequest`].
317#[derive(Debug)]
318pub struct FullyNegotiatedOutbound<OP: OutboundUpgradeSend, OOI> {
319    pub protocol: OP::Output,
320    pub info: OOI,
321}
322
323/// [`ConnectionEvent`] variant that informs the handler about a change in the address of the remote.
324#[derive(Debug)]
325pub struct AddressChange<'a> {
326    pub new_address: &'a Multiaddr,
327}
328
329/// [`ConnectionEvent`] variant that informs the handler about a change in the protocols supported on the connection.
330#[derive(Debug, Clone)]
331pub enum ProtocolsChange<'a> {
332    Added(ProtocolsAdded<'a>),
333    Removed(ProtocolsRemoved<'a>),
334}
335
336impl<'a> ProtocolsChange<'a> {
337    /// Compute the [`ProtocolsChange`] that results from adding `to_add` to `existing_protocols`.
338    ///
339    /// Returns `None` if the change is a no-op, i.e. `to_add` is a subset of `existing_protocols`.
340    pub(crate) fn add(
341        existing_protocols: &'a HashSet<StreamProtocol>,
342        to_add: &'a HashSet<StreamProtocol>,
343    ) -> Option<Self> {
344        let mut actually_added_protocols = to_add.difference(existing_protocols).peekable();
345
346        actually_added_protocols.peek()?;
347
348        Some(ProtocolsChange::Added(ProtocolsAdded {
349            protocols: actually_added_protocols,
350        }))
351    }
352
353    /// Compute the [`ProtocolsChange`] that results from removing `to_remove` from `existing_protocols`.
354    ///
355    /// Returns `None` if the change is a no-op, i.e. none of the protocols in `to_remove` are in `existing_protocols`.
356    pub(crate) fn remove(
357        existing_protocols: &'a HashSet<StreamProtocol>,
358        to_remove: &'a HashSet<StreamProtocol>,
359    ) -> Option<Self> {
360        let mut actually_removed_protocols = existing_protocols.intersection(to_remove).peekable();
361
362        actually_removed_protocols.peek()?;
363
364        Some(ProtocolsChange::Removed(ProtocolsRemoved {
365            protocols: Either::Right(actually_removed_protocols),
366        }))
367    }
368
369    /// Compute the [`ProtocolsChange`]s required to go from `existing_protocols` to `new_protocols`.
370    pub(crate) fn from_full_sets(
371        existing_protocols: &'a HashSet<StreamProtocol>,
372        new_protocols: &'a HashSet<StreamProtocol>,
373    ) -> SmallVec<[Self; 2]> {
374        if existing_protocols == new_protocols {
375            return SmallVec::new();
376        }
377
378        let mut changes = SmallVec::new();
379
380        let mut added_protocols = new_protocols.difference(existing_protocols).peekable();
381        let mut removed_protocols = existing_protocols.difference(new_protocols).peekable();
382
383        if added_protocols.peek().is_some() {
384            changes.push(ProtocolsChange::Added(ProtocolsAdded {
385                protocols: added_protocols,
386            }));
387        }
388
389        if removed_protocols.peek().is_some() {
390            changes.push(ProtocolsChange::Removed(ProtocolsRemoved {
391                protocols: Either::Left(removed_protocols),
392            }));
393        }
394
395        changes
396    }
397}
398
399/// An [`Iterator`] over all protocols that have been added.
400#[derive(Debug, Clone)]
401pub struct ProtocolsAdded<'a> {
402    protocols: Peekable<Difference<'a, StreamProtocol, RandomState>>,
403}
404
405impl<'a> ProtocolsAdded<'a> {
406    pub(crate) fn from_set(protocols: &'a HashSet<StreamProtocol, RandomState>) -> Self {
407        ProtocolsAdded {
408            protocols: protocols.difference(&EMPTY_HASHSET).peekable(),
409        }
410    }
411}
412
413/// An [`Iterator`] over all protocols that have been removed.
414#[derive(Debug, Clone)]
415pub struct ProtocolsRemoved<'a> {
416    protocols: Either<
417        Peekable<Difference<'a, StreamProtocol, RandomState>>,
418        Peekable<Intersection<'a, StreamProtocol, RandomState>>,
419    >,
420}
421
422impl<'a> ProtocolsRemoved<'a> {
423    #[cfg(test)]
424    pub(crate) fn from_set(protocols: &'a HashSet<StreamProtocol, RandomState>) -> Self {
425        ProtocolsRemoved {
426            protocols: Either::Left(protocols.difference(&EMPTY_HASHSET).peekable()),
427        }
428    }
429}
430
431impl<'a> Iterator for ProtocolsAdded<'a> {
432    type Item = &'a StreamProtocol;
433    fn next(&mut self) -> Option<Self::Item> {
434        self.protocols.next()
435    }
436}
437
438impl<'a> Iterator for ProtocolsRemoved<'a> {
439    type Item = &'a StreamProtocol;
440    fn next(&mut self) -> Option<Self::Item> {
441        self.protocols.next()
442    }
443}
444
445/// [`ConnectionEvent`] variant that informs the handler
446/// that upgrading an outbound substream to the given protocol has failed.
447#[derive(Debug)]
448pub struct DialUpgradeError<OOI, OP: OutboundUpgradeSend> {
449    pub info: OOI,
450    pub error: StreamUpgradeError<OP::Error>,
451}
452
453/// [`ConnectionEvent`] variant that informs the handler
454/// that upgrading an inbound substream to the given protocol has failed.
455#[derive(Debug)]
456pub struct ListenUpgradeError<IOI, IP: InboundUpgradeSend> {
457    pub info: IOI,
458    pub error: IP::Error,
459}
460
461/// Configuration of inbound or outbound substream protocol(s)
462/// for a [`ConnectionHandler`].
463///
464/// The inbound substream protocol(s) are defined by [`ConnectionHandler::listen_protocol`]
465/// and the outbound substream protocol(s) by [`ConnectionHandlerEvent::OutboundSubstreamRequest`].
466#[derive(Copy, Clone, Debug, PartialEq, Eq)]
467pub struct SubstreamProtocol<TUpgrade, TInfo> {
468    upgrade: TUpgrade,
469    info: TInfo,
470    timeout: Duration,
471}
472
473impl<TUpgrade, TInfo> SubstreamProtocol<TUpgrade, TInfo> {
474    /// Create a new `SubstreamProtocol` from the given upgrade.
475    ///
476    /// The default timeout for applying the given upgrade on a substream is
477    /// 10 seconds.
478    pub fn new(upgrade: TUpgrade, info: TInfo) -> Self {
479        SubstreamProtocol {
480            upgrade,
481            info,
482            timeout: Duration::from_secs(10),
483        }
484    }
485
486    /// Maps a function over the protocol upgrade.
487    pub fn map_upgrade<U, F>(self, f: F) -> SubstreamProtocol<U, TInfo>
488    where
489        F: FnOnce(TUpgrade) -> U,
490    {
491        SubstreamProtocol {
492            upgrade: f(self.upgrade),
493            info: self.info,
494            timeout: self.timeout,
495        }
496    }
497
498    /// Maps a function over the protocol info.
499    pub fn map_info<U, F>(self, f: F) -> SubstreamProtocol<TUpgrade, U>
500    where
501        F: FnOnce(TInfo) -> U,
502    {
503        SubstreamProtocol {
504            upgrade: self.upgrade,
505            info: f(self.info),
506            timeout: self.timeout,
507        }
508    }
509
510    /// Sets a new timeout for the protocol upgrade.
511    pub fn with_timeout(mut self, timeout: Duration) -> Self {
512        self.timeout = timeout;
513        self
514    }
515
516    /// Borrows the contained protocol upgrade.
517    pub fn upgrade(&self) -> &TUpgrade {
518        &self.upgrade
519    }
520
521    /// Borrows the contained protocol info.
522    pub fn info(&self) -> &TInfo {
523        &self.info
524    }
525
526    /// Borrows the timeout for the protocol upgrade.
527    pub fn timeout(&self) -> &Duration {
528        &self.timeout
529    }
530
531    /// Converts the substream protocol configuration into the contained upgrade.
532    pub fn into_upgrade(self) -> (TUpgrade, TInfo) {
533        (self.upgrade, self.info)
534    }
535}
536
537/// Event produced by a handler.
538#[derive(Debug, Clone, PartialEq, Eq)]
539#[non_exhaustive]
540pub enum ConnectionHandlerEvent<TConnectionUpgrade, TOutboundOpenInfo, TCustom> {
541    /// Request a new outbound substream to be opened with the remote.
542    OutboundSubstreamRequest {
543        /// The protocol(s) to apply on the substream.
544        protocol: SubstreamProtocol<TConnectionUpgrade, TOutboundOpenInfo>,
545    },
546    /// We learned something about the protocols supported by the remote.
547    ReportRemoteProtocols(ProtocolSupport),
548
549    /// Event that is sent to a [`NetworkBehaviour`](crate::behaviour::NetworkBehaviour).
550    NotifyBehaviour(TCustom),
551}
552
553#[derive(Debug, Clone, PartialEq, Eq)]
554pub enum ProtocolSupport {
555    /// The remote now supports these additional protocols.
556    Added(HashSet<StreamProtocol>),
557    /// The remote no longer supports these protocols.
558    Removed(HashSet<StreamProtocol>),
559}
560
561/// Event produced by a handler.
562impl<TConnectionUpgrade, TOutboundOpenInfo, TCustom>
563    ConnectionHandlerEvent<TConnectionUpgrade, TOutboundOpenInfo, TCustom>
564{
565    /// If this is an `OutboundSubstreamRequest`, maps the `info` member from a
566    /// `TOutboundOpenInfo` to something else.
567    pub fn map_outbound_open_info<F, I>(
568        self,
569        map: F,
570    ) -> ConnectionHandlerEvent<TConnectionUpgrade, I, TCustom>
571    where
572        F: FnOnce(TOutboundOpenInfo) -> I,
573    {
574        match self {
575            ConnectionHandlerEvent::OutboundSubstreamRequest { protocol } => {
576                ConnectionHandlerEvent::OutboundSubstreamRequest {
577                    protocol: protocol.map_info(map),
578                }
579            }
580            ConnectionHandlerEvent::NotifyBehaviour(val) => {
581                ConnectionHandlerEvent::NotifyBehaviour(val)
582            }
583            ConnectionHandlerEvent::ReportRemoteProtocols(support) => {
584                ConnectionHandlerEvent::ReportRemoteProtocols(support)
585            }
586        }
587    }
588
589    /// If this is an `OutboundSubstreamRequest`, maps the protocol (`TConnectionUpgrade`)
590    /// to something else.
591    pub fn map_protocol<F, I>(self, map: F) -> ConnectionHandlerEvent<I, TOutboundOpenInfo, TCustom>
592    where
593        F: FnOnce(TConnectionUpgrade) -> I,
594    {
595        match self {
596            ConnectionHandlerEvent::OutboundSubstreamRequest { protocol } => {
597                ConnectionHandlerEvent::OutboundSubstreamRequest {
598                    protocol: protocol.map_upgrade(map),
599                }
600            }
601            ConnectionHandlerEvent::NotifyBehaviour(val) => {
602                ConnectionHandlerEvent::NotifyBehaviour(val)
603            }
604            ConnectionHandlerEvent::ReportRemoteProtocols(support) => {
605                ConnectionHandlerEvent::ReportRemoteProtocols(support)
606            }
607        }
608    }
609
610    /// If this is a `Custom` event, maps the content to something else.
611    pub fn map_custom<F, I>(
612        self,
613        map: F,
614    ) -> ConnectionHandlerEvent<TConnectionUpgrade, TOutboundOpenInfo, I>
615    where
616        F: FnOnce(TCustom) -> I,
617    {
618        match self {
619            ConnectionHandlerEvent::OutboundSubstreamRequest { protocol } => {
620                ConnectionHandlerEvent::OutboundSubstreamRequest { protocol }
621            }
622            ConnectionHandlerEvent::NotifyBehaviour(val) => {
623                ConnectionHandlerEvent::NotifyBehaviour(map(val))
624            }
625            ConnectionHandlerEvent::ReportRemoteProtocols(support) => {
626                ConnectionHandlerEvent::ReportRemoteProtocols(support)
627            }
628        }
629    }
630}
631
632/// Error that can happen on an outbound substream opening attempt.
633#[derive(Debug)]
634pub enum StreamUpgradeError<TUpgrErr> {
635    /// The opening attempt timed out before the negotiation was fully completed.
636    Timeout,
637    /// The upgrade produced an error.
638    Apply(TUpgrErr),
639    /// No protocol could be agreed upon.
640    NegotiationFailed,
641    /// An IO or otherwise unrecoverable error happened.
642    Io(io::Error),
643}
644
645impl<TUpgrErr> StreamUpgradeError<TUpgrErr> {
646    /// Map the inner [`StreamUpgradeError`] type.
647    pub fn map_upgrade_err<F, E>(self, f: F) -> StreamUpgradeError<E>
648    where
649        F: FnOnce(TUpgrErr) -> E,
650    {
651        match self {
652            StreamUpgradeError::Timeout => StreamUpgradeError::Timeout,
653            StreamUpgradeError::Apply(e) => StreamUpgradeError::Apply(f(e)),
654            StreamUpgradeError::NegotiationFailed => StreamUpgradeError::NegotiationFailed,
655            StreamUpgradeError::Io(e) => StreamUpgradeError::Io(e),
656        }
657    }
658}
659
660impl<TUpgrErr> fmt::Display for StreamUpgradeError<TUpgrErr>
661where
662    TUpgrErr: error::Error + 'static,
663{
664    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
665        match self {
666            StreamUpgradeError::Timeout => {
667                write!(f, "Timeout error while opening a substream")
668            }
669            StreamUpgradeError::Apply(err) => {
670                write!(f, "Apply: ")?;
671                crate::print_error_chain(f, err)
672            }
673            StreamUpgradeError::NegotiationFailed => {
674                write!(f, "no protocols could be agreed upon")
675            }
676            StreamUpgradeError::Io(e) => {
677                write!(f, "IO error: ")?;
678                crate::print_error_chain(f, e)
679            }
680        }
681    }
682}
683
684impl<TUpgrErr> error::Error for StreamUpgradeError<TUpgrErr>
685where
686    TUpgrErr: error::Error + 'static,
687{
688    fn source(&self) -> Option<&(dyn error::Error + 'static)> {
689        None
690    }
691}
692
693/// A statically declared, empty [`HashSet`] allows us to work around borrow-checker rules for
694/// [`ProtocolsAdded::from_set`]. The lifetimes don't work unless we have a [`HashSet`] with a `'static' lifetime.
695static EMPTY_HASHSET: Lazy<HashSet<StreamProtocol>> = Lazy::new(HashSet::new);