libp2p_swarm/handler.rs
1// Copyright 2018 Parity Technologies (UK) Ltd.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21//! Once a connection to a remote peer is established, a [`ConnectionHandler`] negotiates
22//! and handles one or more specific protocols on the connection.
23//!
24//! Protocols are negotiated and used on individual substreams of the connection. Thus a
25//! [`ConnectionHandler`] defines the inbound and outbound upgrades to apply when creating a new
26//! inbound or outbound substream, respectively, and is notified by a [`Swarm`](crate::Swarm) when
27//! these upgrades have been successfully applied, including the final output of the upgrade. A
28//! [`ConnectionHandler`] can then continue communicating with the peer over the substream using the
29//! negotiated protocol(s).
30//!
31//! Two [`ConnectionHandler`]s can be composed with [`ConnectionHandler::select()`]
32//! in order to build a new handler supporting the combined set of protocols,
33//! with methods being dispatched to the appropriate handler according to the
34//! used protocol(s) determined by the associated types of the handlers.
35//!
36//! > **Note**: A [`ConnectionHandler`] handles one or more protocols in the context of a single
37//! > connection with a remote. In order to handle a protocol that requires knowledge of
38//! > the network as a whole, see the
39//! > [`NetworkBehaviour`](crate::behaviour::NetworkBehaviour) trait.
40
41pub mod either;
42mod map_in;
43mod map_out;
44pub mod multi;
45mod one_shot;
46mod pending;
47mod select;
48
49pub use crate::upgrade::{InboundUpgradeSend, OutboundUpgradeSend, SendWrapper, UpgradeInfoSend};
50pub use map_in::MapInEvent;
51pub use map_out::MapOutEvent;
52pub use one_shot::{OneShotHandler, OneShotHandlerConfig};
53pub use pending::PendingConnectionHandler;
54pub use select::ConnectionHandlerSelect;
55
56use crate::StreamProtocol;
57use ::either::Either;
58use libp2p_core::Multiaddr;
59use once_cell::sync::Lazy;
60use smallvec::SmallVec;
61use std::collections::hash_map::RandomState;
62use std::collections::hash_set::{Difference, Intersection};
63use std::collections::HashSet;
64use std::iter::Peekable;
65use std::{error, fmt, io, task::Context, task::Poll, time::Duration};
66
67/// A handler for a set of protocols used on a connection with a remote.
68///
69/// This trait should be implemented for a type that maintains the state for
70/// the execution of a specific protocol with a remote.
71///
72/// # Handling a protocol
73///
74/// Communication with a remote over a set of protocols is initiated in one of two ways:
75///
76/// 1. Dialing by initiating a new outbound substream. In order to do so,
77/// [`ConnectionHandler::poll()`] must return an [`ConnectionHandlerEvent::OutboundSubstreamRequest`],
78/// providing an instance of [`libp2p_core::upgrade::OutboundUpgrade`] that is used to negotiate the
79/// protocol(s). Upon success, [`ConnectionHandler::on_connection_event`] is called with
80/// [`ConnectionEvent::FullyNegotiatedOutbound`] translating the final output of the upgrade.
81///
82/// 2. Listening by accepting a new inbound substream. When a new inbound substream
83/// is created on a connection, [`ConnectionHandler::listen_protocol`] is called
84/// to obtain an instance of [`libp2p_core::upgrade::InboundUpgrade`] that is used to
85/// negotiate the protocol(s). Upon success,
86/// [`ConnectionHandler::on_connection_event`] is called with [`ConnectionEvent::FullyNegotiatedInbound`]
87/// translating the final output of the upgrade.
88///
89///
90/// # Connection Keep-Alive
91///
92/// A [`ConnectionHandler`] can influence the lifetime of the underlying connection
93/// through [`ConnectionHandler::connection_keep_alive`]. That is, the protocol
94/// implemented by the handler can include conditions for terminating the connection.
95/// The lifetime of successfully negotiated substreams is fully controlled by the handler.
96///
97/// Implementors of this trait should keep in mind that the connection can be closed at any time.
98/// When a connection is closed gracefully, the substreams used by the handler may still
99/// continue reading data until the remote closes its side of the connection.
100pub trait ConnectionHandler: Send + 'static {
101 /// A type representing the message(s) a [`NetworkBehaviour`](crate::behaviour::NetworkBehaviour) can send to a [`ConnectionHandler`] via [`ToSwarm::NotifyHandler`](crate::behaviour::ToSwarm::NotifyHandler)
102 type FromBehaviour: fmt::Debug + Send + 'static;
103 /// A type representing message(s) a [`ConnectionHandler`] can send to a [`NetworkBehaviour`](crate::behaviour::NetworkBehaviour) via [`ConnectionHandlerEvent::NotifyBehaviour`].
104 type ToBehaviour: fmt::Debug + Send + 'static;
105 /// The inbound upgrade for the protocol(s) used by the handler.
106 type InboundProtocol: InboundUpgradeSend;
107 /// The outbound upgrade for the protocol(s) used by the handler.
108 type OutboundProtocol: OutboundUpgradeSend;
109 /// The type of additional information returned from `listen_protocol`.
110 type InboundOpenInfo: Send + 'static;
111 /// The type of additional information passed to an `OutboundSubstreamRequest`.
112 type OutboundOpenInfo: Send + 'static;
113
114 /// The [`InboundUpgrade`](libp2p_core::upgrade::InboundUpgrade) to apply on inbound
115 /// substreams to negotiate the desired protocols.
116 ///
117 /// > **Note**: The returned `InboundUpgrade` should always accept all the generally
118 /// > supported protocols, even if in a specific context a particular one is
119 /// > not supported, (eg. when only allowing one substream at a time for a protocol).
120 /// > This allows a remote to put the list of supported protocols in a cache.
121 fn listen_protocol(&self) -> SubstreamProtocol<Self::InboundProtocol, Self::InboundOpenInfo>;
122
123 /// Returns whether the connection should be kept alive.
124 ///
125 /// ## Keep alive algorithm
126 ///
127 /// A connection is always kept alive:
128 ///
129 /// - Whilst a [`ConnectionHandler`] returns [`Poll::Ready`].
130 /// - We are negotiating inbound or outbound streams.
131 /// - There are active [`Stream`](crate::Stream)s on the connection.
132 ///
133 /// The combination of the above means that _most_ protocols will not need to override this method.
134 /// This method is only invoked when all of the above are `false`, i.e. when the connection is entirely idle.
135 ///
136 /// ## Exceptions
137 ///
138 /// - Protocols like [circuit-relay v2](https://github.com/libp2p/specs/blob/master/relay/circuit-v2.md) need to keep a connection alive beyond these circumstances and can thus override this method.
139 /// - Protocols like [ping](https://github.com/libp2p/specs/blob/master/ping/ping.md) **don't** want to keep a connection alive despite an active streams.
140 /// In that case, protocol authors can use [`Stream::ignore_for_keep_alive`](crate::Stream::ignore_for_keep_alive) to opt-out a particular stream from the keep-alive algorithm.
141 fn connection_keep_alive(&self) -> bool {
142 false
143 }
144
145 /// Should behave like `Stream::poll()`.
146 fn poll(
147 &mut self,
148 cx: &mut Context<'_>,
149 ) -> Poll<
150 ConnectionHandlerEvent<Self::OutboundProtocol, Self::OutboundOpenInfo, Self::ToBehaviour>,
151 >;
152
153 /// Gracefully close the [`ConnectionHandler`].
154 ///
155 /// The contract for this function is equivalent to a [`Stream`](futures::Stream).
156 /// When a connection is being shut down, we will first poll this function to completion.
157 /// Following that, the physical connection will be shut down.
158 ///
159 /// This is also called when the shutdown was initiated due to an error on the connection.
160 /// We therefore cannot guarantee that performing IO within here will succeed.
161 ///
162 /// To signal completion, [`Poll::Ready(None)`] should be returned.
163 ///
164 /// Implementations MUST have a [`fuse`](futures::StreamExt::fuse)-like behaviour.
165 /// That is, [`Poll::Ready(None)`] MUST be returned on repeated calls to [`ConnectionHandler::poll_close`].
166 fn poll_close(&mut self, _: &mut Context<'_>) -> Poll<Option<Self::ToBehaviour>> {
167 Poll::Ready(None)
168 }
169
170 /// Adds a closure that turns the input event into something else.
171 fn map_in_event<TNewIn, TMap>(self, map: TMap) -> MapInEvent<Self, TNewIn, TMap>
172 where
173 Self: Sized,
174 TMap: Fn(&TNewIn) -> Option<&Self::FromBehaviour>,
175 {
176 MapInEvent::new(self, map)
177 }
178
179 /// Adds a closure that turns the output event into something else.
180 fn map_out_event<TMap, TNewOut>(self, map: TMap) -> MapOutEvent<Self, TMap>
181 where
182 Self: Sized,
183 TMap: FnMut(Self::ToBehaviour) -> TNewOut,
184 {
185 MapOutEvent::new(self, map)
186 }
187
188 /// Creates a new [`ConnectionHandler`] that selects either this handler or
189 /// `other` by delegating methods calls appropriately.
190 fn select<TProto2>(self, other: TProto2) -> ConnectionHandlerSelect<Self, TProto2>
191 where
192 Self: Sized,
193 {
194 ConnectionHandlerSelect::new(self, other)
195 }
196
197 /// Informs the handler about an event from the [`NetworkBehaviour`](super::NetworkBehaviour).
198 fn on_behaviour_event(&mut self, _event: Self::FromBehaviour);
199
200 fn on_connection_event(
201 &mut self,
202 event: ConnectionEvent<
203 Self::InboundProtocol,
204 Self::OutboundProtocol,
205 Self::InboundOpenInfo,
206 Self::OutboundOpenInfo,
207 >,
208 );
209}
210
211/// Enumeration with the list of the possible stream events
212/// to pass to [`on_connection_event`](ConnectionHandler::on_connection_event).
213#[non_exhaustive]
214pub enum ConnectionEvent<'a, IP: InboundUpgradeSend, OP: OutboundUpgradeSend, IOI, OOI> {
215 /// Informs the handler about the output of a successful upgrade on a new inbound substream.
216 FullyNegotiatedInbound(FullyNegotiatedInbound<IP, IOI>),
217 /// Informs the handler about the output of a successful upgrade on a new outbound stream.
218 FullyNegotiatedOutbound(FullyNegotiatedOutbound<OP, OOI>),
219 /// Informs the handler about a change in the address of the remote.
220 AddressChange(AddressChange<'a>),
221 /// Informs the handler that upgrading an outbound substream to the given protocol has failed.
222 DialUpgradeError(DialUpgradeError<OOI, OP>),
223 /// Informs the handler that upgrading an inbound substream to the given protocol has failed.
224 ListenUpgradeError(ListenUpgradeError<IOI, IP>),
225 /// The local [`ConnectionHandler`] added or removed support for one or more protocols.
226 LocalProtocolsChange(ProtocolsChange<'a>),
227 /// The remote [`ConnectionHandler`] now supports a different set of protocols.
228 RemoteProtocolsChange(ProtocolsChange<'a>),
229}
230
231impl<'a, IP, OP, IOI, OOI> fmt::Debug for ConnectionEvent<'a, IP, OP, IOI, OOI>
232where
233 IP: InboundUpgradeSend + fmt::Debug,
234 IP::Output: fmt::Debug,
235 IP::Error: fmt::Debug,
236 OP: OutboundUpgradeSend + fmt::Debug,
237 OP::Output: fmt::Debug,
238 OP::Error: fmt::Debug,
239 IOI: fmt::Debug,
240 OOI: fmt::Debug,
241{
242 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
243 match self {
244 ConnectionEvent::FullyNegotiatedInbound(v) => {
245 f.debug_tuple("FullyNegotiatedInbound").field(v).finish()
246 }
247 ConnectionEvent::FullyNegotiatedOutbound(v) => {
248 f.debug_tuple("FullyNegotiatedOutbound").field(v).finish()
249 }
250 ConnectionEvent::AddressChange(v) => f.debug_tuple("AddressChange").field(v).finish(),
251 ConnectionEvent::DialUpgradeError(v) => {
252 f.debug_tuple("DialUpgradeError").field(v).finish()
253 }
254 ConnectionEvent::ListenUpgradeError(v) => {
255 f.debug_tuple("ListenUpgradeError").field(v).finish()
256 }
257 ConnectionEvent::LocalProtocolsChange(v) => {
258 f.debug_tuple("LocalProtocolsChange").field(v).finish()
259 }
260 ConnectionEvent::RemoteProtocolsChange(v) => {
261 f.debug_tuple("RemoteProtocolsChange").field(v).finish()
262 }
263 }
264 }
265}
266
267impl<'a, IP: InboundUpgradeSend, OP: OutboundUpgradeSend, IOI, OOI>
268 ConnectionEvent<'a, IP, OP, IOI, OOI>
269{
270 /// Whether the event concerns an outbound stream.
271 pub fn is_outbound(&self) -> bool {
272 match self {
273 ConnectionEvent::DialUpgradeError(_) | ConnectionEvent::FullyNegotiatedOutbound(_) => {
274 true
275 }
276 ConnectionEvent::FullyNegotiatedInbound(_)
277 | ConnectionEvent::AddressChange(_)
278 | ConnectionEvent::LocalProtocolsChange(_)
279 | ConnectionEvent::RemoteProtocolsChange(_)
280 | ConnectionEvent::ListenUpgradeError(_) => false,
281 }
282 }
283
284 /// Whether the event concerns an inbound stream.
285 pub fn is_inbound(&self) -> bool {
286 match self {
287 ConnectionEvent::FullyNegotiatedInbound(_) | ConnectionEvent::ListenUpgradeError(_) => {
288 true
289 }
290 ConnectionEvent::FullyNegotiatedOutbound(_)
291 | ConnectionEvent::AddressChange(_)
292 | ConnectionEvent::LocalProtocolsChange(_)
293 | ConnectionEvent::RemoteProtocolsChange(_)
294 | ConnectionEvent::DialUpgradeError(_) => false,
295 }
296 }
297}
298
299/// [`ConnectionEvent`] variant that informs the handler about
300/// the output of a successful upgrade on a new inbound substream.
301///
302/// Note that it is up to the [`ConnectionHandler`] implementation to manage the lifetime of the
303/// negotiated inbound substreams. E.g. the implementation has to enforce a limit on the number
304/// of simultaneously open negotiated inbound substreams. In other words it is up to the
305/// [`ConnectionHandler`] implementation to stop a malicious remote node to open and keep alive
306/// an excessive amount of inbound substreams.
307#[derive(Debug)]
308pub struct FullyNegotiatedInbound<IP: InboundUpgradeSend, IOI> {
309 pub protocol: IP::Output,
310 pub info: IOI,
311}
312
313/// [`ConnectionEvent`] variant that informs the handler about successful upgrade on a new outbound stream.
314///
315/// The `protocol` field is the information that was previously passed to
316/// [`ConnectionHandlerEvent::OutboundSubstreamRequest`].
317#[derive(Debug)]
318pub struct FullyNegotiatedOutbound<OP: OutboundUpgradeSend, OOI> {
319 pub protocol: OP::Output,
320 pub info: OOI,
321}
322
323/// [`ConnectionEvent`] variant that informs the handler about a change in the address of the remote.
324#[derive(Debug)]
325pub struct AddressChange<'a> {
326 pub new_address: &'a Multiaddr,
327}
328
329/// [`ConnectionEvent`] variant that informs the handler about a change in the protocols supported on the connection.
330#[derive(Debug, Clone)]
331pub enum ProtocolsChange<'a> {
332 Added(ProtocolsAdded<'a>),
333 Removed(ProtocolsRemoved<'a>),
334}
335
336impl<'a> ProtocolsChange<'a> {
337 /// Compute the [`ProtocolsChange`] that results from adding `to_add` to `existing_protocols`.
338 ///
339 /// Returns `None` if the change is a no-op, i.e. `to_add` is a subset of `existing_protocols`.
340 pub(crate) fn add(
341 existing_protocols: &'a HashSet<StreamProtocol>,
342 to_add: &'a HashSet<StreamProtocol>,
343 ) -> Option<Self> {
344 let mut actually_added_protocols = to_add.difference(existing_protocols).peekable();
345
346 actually_added_protocols.peek()?;
347
348 Some(ProtocolsChange::Added(ProtocolsAdded {
349 protocols: actually_added_protocols,
350 }))
351 }
352
353 /// Compute the [`ProtocolsChange`] that results from removing `to_remove` from `existing_protocols`.
354 ///
355 /// Returns `None` if the change is a no-op, i.e. none of the protocols in `to_remove` are in `existing_protocols`.
356 pub(crate) fn remove(
357 existing_protocols: &'a HashSet<StreamProtocol>,
358 to_remove: &'a HashSet<StreamProtocol>,
359 ) -> Option<Self> {
360 let mut actually_removed_protocols = existing_protocols.intersection(to_remove).peekable();
361
362 actually_removed_protocols.peek()?;
363
364 Some(ProtocolsChange::Removed(ProtocolsRemoved {
365 protocols: Either::Right(actually_removed_protocols),
366 }))
367 }
368
369 /// Compute the [`ProtocolsChange`]s required to go from `existing_protocols` to `new_protocols`.
370 pub(crate) fn from_full_sets(
371 existing_protocols: &'a HashSet<StreamProtocol>,
372 new_protocols: &'a HashSet<StreamProtocol>,
373 ) -> SmallVec<[Self; 2]> {
374 if existing_protocols == new_protocols {
375 return SmallVec::new();
376 }
377
378 let mut changes = SmallVec::new();
379
380 let mut added_protocols = new_protocols.difference(existing_protocols).peekable();
381 let mut removed_protocols = existing_protocols.difference(new_protocols).peekable();
382
383 if added_protocols.peek().is_some() {
384 changes.push(ProtocolsChange::Added(ProtocolsAdded {
385 protocols: added_protocols,
386 }));
387 }
388
389 if removed_protocols.peek().is_some() {
390 changes.push(ProtocolsChange::Removed(ProtocolsRemoved {
391 protocols: Either::Left(removed_protocols),
392 }));
393 }
394
395 changes
396 }
397}
398
399/// An [`Iterator`] over all protocols that have been added.
400#[derive(Debug, Clone)]
401pub struct ProtocolsAdded<'a> {
402 protocols: Peekable<Difference<'a, StreamProtocol, RandomState>>,
403}
404
405impl<'a> ProtocolsAdded<'a> {
406 pub(crate) fn from_set(protocols: &'a HashSet<StreamProtocol, RandomState>) -> Self {
407 ProtocolsAdded {
408 protocols: protocols.difference(&EMPTY_HASHSET).peekable(),
409 }
410 }
411}
412
413/// An [`Iterator`] over all protocols that have been removed.
414#[derive(Debug, Clone)]
415pub struct ProtocolsRemoved<'a> {
416 protocols: Either<
417 Peekable<Difference<'a, StreamProtocol, RandomState>>,
418 Peekable<Intersection<'a, StreamProtocol, RandomState>>,
419 >,
420}
421
422impl<'a> ProtocolsRemoved<'a> {
423 #[cfg(test)]
424 pub(crate) fn from_set(protocols: &'a HashSet<StreamProtocol, RandomState>) -> Self {
425 ProtocolsRemoved {
426 protocols: Either::Left(protocols.difference(&EMPTY_HASHSET).peekable()),
427 }
428 }
429}
430
431impl<'a> Iterator for ProtocolsAdded<'a> {
432 type Item = &'a StreamProtocol;
433 fn next(&mut self) -> Option<Self::Item> {
434 self.protocols.next()
435 }
436}
437
438impl<'a> Iterator for ProtocolsRemoved<'a> {
439 type Item = &'a StreamProtocol;
440 fn next(&mut self) -> Option<Self::Item> {
441 self.protocols.next()
442 }
443}
444
445/// [`ConnectionEvent`] variant that informs the handler
446/// that upgrading an outbound substream to the given protocol has failed.
447#[derive(Debug)]
448pub struct DialUpgradeError<OOI, OP: OutboundUpgradeSend> {
449 pub info: OOI,
450 pub error: StreamUpgradeError<OP::Error>,
451}
452
453/// [`ConnectionEvent`] variant that informs the handler
454/// that upgrading an inbound substream to the given protocol has failed.
455#[derive(Debug)]
456pub struct ListenUpgradeError<IOI, IP: InboundUpgradeSend> {
457 pub info: IOI,
458 pub error: IP::Error,
459}
460
461/// Configuration of inbound or outbound substream protocol(s)
462/// for a [`ConnectionHandler`].
463///
464/// The inbound substream protocol(s) are defined by [`ConnectionHandler::listen_protocol`]
465/// and the outbound substream protocol(s) by [`ConnectionHandlerEvent::OutboundSubstreamRequest`].
466#[derive(Copy, Clone, Debug, PartialEq, Eq)]
467pub struct SubstreamProtocol<TUpgrade, TInfo> {
468 upgrade: TUpgrade,
469 info: TInfo,
470 timeout: Duration,
471}
472
473impl<TUpgrade, TInfo> SubstreamProtocol<TUpgrade, TInfo> {
474 /// Create a new `SubstreamProtocol` from the given upgrade.
475 ///
476 /// The default timeout for applying the given upgrade on a substream is
477 /// 10 seconds.
478 pub fn new(upgrade: TUpgrade, info: TInfo) -> Self {
479 SubstreamProtocol {
480 upgrade,
481 info,
482 timeout: Duration::from_secs(10),
483 }
484 }
485
486 /// Maps a function over the protocol upgrade.
487 pub fn map_upgrade<U, F>(self, f: F) -> SubstreamProtocol<U, TInfo>
488 where
489 F: FnOnce(TUpgrade) -> U,
490 {
491 SubstreamProtocol {
492 upgrade: f(self.upgrade),
493 info: self.info,
494 timeout: self.timeout,
495 }
496 }
497
498 /// Maps a function over the protocol info.
499 pub fn map_info<U, F>(self, f: F) -> SubstreamProtocol<TUpgrade, U>
500 where
501 F: FnOnce(TInfo) -> U,
502 {
503 SubstreamProtocol {
504 upgrade: self.upgrade,
505 info: f(self.info),
506 timeout: self.timeout,
507 }
508 }
509
510 /// Sets a new timeout for the protocol upgrade.
511 pub fn with_timeout(mut self, timeout: Duration) -> Self {
512 self.timeout = timeout;
513 self
514 }
515
516 /// Borrows the contained protocol upgrade.
517 pub fn upgrade(&self) -> &TUpgrade {
518 &self.upgrade
519 }
520
521 /// Borrows the contained protocol info.
522 pub fn info(&self) -> &TInfo {
523 &self.info
524 }
525
526 /// Borrows the timeout for the protocol upgrade.
527 pub fn timeout(&self) -> &Duration {
528 &self.timeout
529 }
530
531 /// Converts the substream protocol configuration into the contained upgrade.
532 pub fn into_upgrade(self) -> (TUpgrade, TInfo) {
533 (self.upgrade, self.info)
534 }
535}
536
537/// Event produced by a handler.
538#[derive(Debug, Clone, PartialEq, Eq)]
539#[non_exhaustive]
540pub enum ConnectionHandlerEvent<TConnectionUpgrade, TOutboundOpenInfo, TCustom> {
541 /// Request a new outbound substream to be opened with the remote.
542 OutboundSubstreamRequest {
543 /// The protocol(s) to apply on the substream.
544 protocol: SubstreamProtocol<TConnectionUpgrade, TOutboundOpenInfo>,
545 },
546 /// We learned something about the protocols supported by the remote.
547 ReportRemoteProtocols(ProtocolSupport),
548
549 /// Event that is sent to a [`NetworkBehaviour`](crate::behaviour::NetworkBehaviour).
550 NotifyBehaviour(TCustom),
551}
552
553#[derive(Debug, Clone, PartialEq, Eq)]
554pub enum ProtocolSupport {
555 /// The remote now supports these additional protocols.
556 Added(HashSet<StreamProtocol>),
557 /// The remote no longer supports these protocols.
558 Removed(HashSet<StreamProtocol>),
559}
560
561/// Event produced by a handler.
562impl<TConnectionUpgrade, TOutboundOpenInfo, TCustom>
563 ConnectionHandlerEvent<TConnectionUpgrade, TOutboundOpenInfo, TCustom>
564{
565 /// If this is an `OutboundSubstreamRequest`, maps the `info` member from a
566 /// `TOutboundOpenInfo` to something else.
567 pub fn map_outbound_open_info<F, I>(
568 self,
569 map: F,
570 ) -> ConnectionHandlerEvent<TConnectionUpgrade, I, TCustom>
571 where
572 F: FnOnce(TOutboundOpenInfo) -> I,
573 {
574 match self {
575 ConnectionHandlerEvent::OutboundSubstreamRequest { protocol } => {
576 ConnectionHandlerEvent::OutboundSubstreamRequest {
577 protocol: protocol.map_info(map),
578 }
579 }
580 ConnectionHandlerEvent::NotifyBehaviour(val) => {
581 ConnectionHandlerEvent::NotifyBehaviour(val)
582 }
583 ConnectionHandlerEvent::ReportRemoteProtocols(support) => {
584 ConnectionHandlerEvent::ReportRemoteProtocols(support)
585 }
586 }
587 }
588
589 /// If this is an `OutboundSubstreamRequest`, maps the protocol (`TConnectionUpgrade`)
590 /// to something else.
591 pub fn map_protocol<F, I>(self, map: F) -> ConnectionHandlerEvent<I, TOutboundOpenInfo, TCustom>
592 where
593 F: FnOnce(TConnectionUpgrade) -> I,
594 {
595 match self {
596 ConnectionHandlerEvent::OutboundSubstreamRequest { protocol } => {
597 ConnectionHandlerEvent::OutboundSubstreamRequest {
598 protocol: protocol.map_upgrade(map),
599 }
600 }
601 ConnectionHandlerEvent::NotifyBehaviour(val) => {
602 ConnectionHandlerEvent::NotifyBehaviour(val)
603 }
604 ConnectionHandlerEvent::ReportRemoteProtocols(support) => {
605 ConnectionHandlerEvent::ReportRemoteProtocols(support)
606 }
607 }
608 }
609
610 /// If this is a `Custom` event, maps the content to something else.
611 pub fn map_custom<F, I>(
612 self,
613 map: F,
614 ) -> ConnectionHandlerEvent<TConnectionUpgrade, TOutboundOpenInfo, I>
615 where
616 F: FnOnce(TCustom) -> I,
617 {
618 match self {
619 ConnectionHandlerEvent::OutboundSubstreamRequest { protocol } => {
620 ConnectionHandlerEvent::OutboundSubstreamRequest { protocol }
621 }
622 ConnectionHandlerEvent::NotifyBehaviour(val) => {
623 ConnectionHandlerEvent::NotifyBehaviour(map(val))
624 }
625 ConnectionHandlerEvent::ReportRemoteProtocols(support) => {
626 ConnectionHandlerEvent::ReportRemoteProtocols(support)
627 }
628 }
629 }
630}
631
632/// Error that can happen on an outbound substream opening attempt.
633#[derive(Debug)]
634pub enum StreamUpgradeError<TUpgrErr> {
635 /// The opening attempt timed out before the negotiation was fully completed.
636 Timeout,
637 /// The upgrade produced an error.
638 Apply(TUpgrErr),
639 /// No protocol could be agreed upon.
640 NegotiationFailed,
641 /// An IO or otherwise unrecoverable error happened.
642 Io(io::Error),
643}
644
645impl<TUpgrErr> StreamUpgradeError<TUpgrErr> {
646 /// Map the inner [`StreamUpgradeError`] type.
647 pub fn map_upgrade_err<F, E>(self, f: F) -> StreamUpgradeError<E>
648 where
649 F: FnOnce(TUpgrErr) -> E,
650 {
651 match self {
652 StreamUpgradeError::Timeout => StreamUpgradeError::Timeout,
653 StreamUpgradeError::Apply(e) => StreamUpgradeError::Apply(f(e)),
654 StreamUpgradeError::NegotiationFailed => StreamUpgradeError::NegotiationFailed,
655 StreamUpgradeError::Io(e) => StreamUpgradeError::Io(e),
656 }
657 }
658}
659
660impl<TUpgrErr> fmt::Display for StreamUpgradeError<TUpgrErr>
661where
662 TUpgrErr: error::Error + 'static,
663{
664 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
665 match self {
666 StreamUpgradeError::Timeout => {
667 write!(f, "Timeout error while opening a substream")
668 }
669 StreamUpgradeError::Apply(err) => {
670 write!(f, "Apply: ")?;
671 crate::print_error_chain(f, err)
672 }
673 StreamUpgradeError::NegotiationFailed => {
674 write!(f, "no protocols could be agreed upon")
675 }
676 StreamUpgradeError::Io(e) => {
677 write!(f, "IO error: ")?;
678 crate::print_error_chain(f, e)
679 }
680 }
681 }
682}
683
684impl<TUpgrErr> error::Error for StreamUpgradeError<TUpgrErr>
685where
686 TUpgrErr: error::Error + 'static,
687{
688 fn source(&self) -> Option<&(dyn error::Error + 'static)> {
689 None
690 }
691}
692
693/// A statically declared, empty [`HashSet`] allows us to work around borrow-checker rules for
694/// [`ProtocolsAdded::from_set`]. The lifetimes don't work unless we have a [`HashSet`] with a `'static' lifetime.
695static EMPTY_HASHSET: Lazy<HashSet<StreamProtocol>> = Lazy::new(HashSet::new);