libp2p_ping/
handler.rs

1// Copyright 2019 Parity Technologies (UK) Ltd.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21use crate::{protocol, PROTOCOL_NAME};
22use futures::future::{BoxFuture, Either};
23use futures::prelude::*;
24use futures_timer::Delay;
25use libp2p_core::upgrade::ReadyUpgrade;
26use libp2p_swarm::handler::{
27    ConnectionEvent, DialUpgradeError, FullyNegotiatedInbound, FullyNegotiatedOutbound,
28};
29use libp2p_swarm::{
30    ConnectionHandler, ConnectionHandlerEvent, Stream, StreamProtocol, StreamUpgradeError,
31    SubstreamProtocol,
32};
33use std::collections::VecDeque;
34use std::{
35    error::Error,
36    fmt, io,
37    task::{Context, Poll},
38    time::Duration,
39};
40use void::Void;
41
42/// The configuration for outbound pings.
43#[derive(Debug, Clone)]
44pub struct Config {
45    /// The timeout of an outbound ping.
46    timeout: Duration,
47    /// The duration between outbound pings.
48    interval: Duration,
49}
50
51impl Config {
52    /// Creates a new [`Config`] with the following default settings:
53    ///
54    ///   * [`Config::with_interval`] 15s
55    ///   * [`Config::with_timeout`] 20s
56    ///
57    /// These settings have the following effect:
58    ///
59    ///   * A ping is sent every 15 seconds on a healthy connection.
60    ///   * Every ping sent must yield a response within 20 seconds in order to
61    ///     be successful.
62    pub fn new() -> Self {
63        Self {
64            timeout: Duration::from_secs(20),
65            interval: Duration::from_secs(15),
66        }
67    }
68
69    /// Sets the ping timeout.
70    pub fn with_timeout(mut self, d: Duration) -> Self {
71        self.timeout = d;
72        self
73    }
74
75    /// Sets the ping interval.
76    pub fn with_interval(mut self, d: Duration) -> Self {
77        self.interval = d;
78        self
79    }
80}
81
82impl Default for Config {
83    fn default() -> Self {
84        Self::new()
85    }
86}
87
88/// An outbound ping failure.
89#[derive(Debug)]
90pub enum Failure {
91    /// The ping timed out, i.e. no response was received within the
92    /// configured ping timeout.
93    Timeout,
94    /// The peer does not support the ping protocol.
95    Unsupported,
96    /// The ping failed for reasons other than a timeout.
97    Other {
98        error: Box<dyn std::error::Error + Send + Sync + 'static>,
99    },
100}
101
102impl Failure {
103    fn other(e: impl std::error::Error + Send + Sync + 'static) -> Self {
104        Self::Other { error: Box::new(e) }
105    }
106}
107
108impl fmt::Display for Failure {
109    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
110        match self {
111            Failure::Timeout => f.write_str("Ping timeout"),
112            Failure::Other { error } => write!(f, "Ping error: {error}"),
113            Failure::Unsupported => write!(f, "Ping protocol not supported"),
114        }
115    }
116}
117
118impl Error for Failure {
119    fn source(&self) -> Option<&(dyn Error + 'static)> {
120        match self {
121            Failure::Timeout => None,
122            Failure::Other { error } => Some(&**error),
123            Failure::Unsupported => None,
124        }
125    }
126}
127
128/// Protocol handler that handles pinging the remote at a regular period
129/// and answering ping queries.
130pub struct Handler {
131    /// Configuration options.
132    config: Config,
133    /// The timer used for the delay to the next ping.
134    interval: Delay,
135    /// Outbound ping failures that are pending to be processed by `poll()`.
136    pending_errors: VecDeque<Failure>,
137    /// The number of consecutive ping failures that occurred.
138    ///
139    /// Each successful ping resets this counter to 0.
140    failures: u32,
141    /// The outbound ping state.
142    outbound: Option<OutboundState>,
143    /// The inbound pong handler, i.e. if there is an inbound
144    /// substream, this is always a future that waits for the
145    /// next inbound ping to be answered.
146    inbound: Option<PongFuture>,
147    /// Tracks the state of our handler.
148    state: State,
149}
150
151#[derive(Debug, Clone, Copy, PartialEq, Eq)]
152enum State {
153    /// We are inactive because the other peer doesn't support ping.
154    Inactive {
155        /// Whether or not we've reported the missing support yet.
156        ///
157        /// This is used to avoid repeated events being emitted for a specific connection.
158        reported: bool,
159    },
160    /// We are actively pinging the other peer.
161    Active,
162}
163
164impl Handler {
165    /// Builds a new [`Handler`] with the given configuration.
166    pub fn new(config: Config) -> Self {
167        Handler {
168            config,
169            interval: Delay::new(Duration::new(0, 0)),
170            pending_errors: VecDeque::with_capacity(2),
171            failures: 0,
172            outbound: None,
173            inbound: None,
174            state: State::Active,
175        }
176    }
177
178    fn on_dial_upgrade_error(
179        &mut self,
180        DialUpgradeError { error, .. }: DialUpgradeError<
181            <Self as ConnectionHandler>::OutboundOpenInfo,
182            <Self as ConnectionHandler>::OutboundProtocol,
183        >,
184    ) {
185        self.outbound = None; // Request a new substream on the next `poll`.
186
187        let error = match error {
188            StreamUpgradeError::NegotiationFailed => {
189                debug_assert_eq!(self.state, State::Active);
190
191                self.state = State::Inactive { reported: false };
192                return;
193            }
194            // Note: This timeout only covers protocol negotiation.
195            StreamUpgradeError::Timeout => Failure::Other {
196                error: Box::new(std::io::Error::new(
197                    std::io::ErrorKind::TimedOut,
198                    "ping protocol negotiation timed out",
199                )),
200            },
201            StreamUpgradeError::Apply(e) => void::unreachable(e),
202            StreamUpgradeError::Io(e) => Failure::Other { error: Box::new(e) },
203        };
204
205        self.pending_errors.push_front(error);
206    }
207}
208
209impl ConnectionHandler for Handler {
210    type FromBehaviour = Void;
211    type ToBehaviour = Result<Duration, Failure>;
212    type InboundProtocol = ReadyUpgrade<StreamProtocol>;
213    type OutboundProtocol = ReadyUpgrade<StreamProtocol>;
214    type OutboundOpenInfo = ();
215    type InboundOpenInfo = ();
216
217    fn listen_protocol(&self) -> SubstreamProtocol<ReadyUpgrade<StreamProtocol>, ()> {
218        SubstreamProtocol::new(ReadyUpgrade::new(PROTOCOL_NAME), ())
219    }
220
221    fn on_behaviour_event(&mut self, _: Void) {}
222
223    #[tracing::instrument(level = "trace", name = "ConnectionHandler::poll", skip(self, cx))]
224    fn poll(
225        &mut self,
226        cx: &mut Context<'_>,
227    ) -> Poll<ConnectionHandlerEvent<ReadyUpgrade<StreamProtocol>, (), Result<Duration, Failure>>>
228    {
229        match self.state {
230            State::Inactive { reported: true } => {
231                return Poll::Pending; // nothing to do on this connection
232            }
233            State::Inactive { reported: false } => {
234                self.state = State::Inactive { reported: true };
235                return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(Err(
236                    Failure::Unsupported,
237                )));
238            }
239            State::Active => {}
240        }
241
242        // Respond to inbound pings.
243        if let Some(fut) = self.inbound.as_mut() {
244            match fut.poll_unpin(cx) {
245                Poll::Pending => {}
246                Poll::Ready(Err(e)) => {
247                    tracing::debug!("Inbound ping error: {:?}", e);
248                    self.inbound = None;
249                }
250                Poll::Ready(Ok(stream)) => {
251                    tracing::trace!("answered inbound ping from peer");
252
253                    // A ping from a remote peer has been answered, wait for the next.
254                    self.inbound = Some(protocol::recv_ping(stream).boxed());
255                }
256            }
257        }
258
259        loop {
260            // Check for outbound ping failures.
261            if let Some(error) = self.pending_errors.pop_back() {
262                tracing::debug!("Ping failure: {:?}", error);
263
264                self.failures += 1;
265
266                // Note: For backward-compatibility the first failure is always "free"
267                // and silent. This allows peers who use a new substream
268                // for each ping to have successful ping exchanges with peers
269                // that use a single substream, since every successful ping
270                // resets `failures` to `0`.
271                if self.failures > 1 {
272                    return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(Err(error)));
273                }
274            }
275
276            // Continue outbound pings.
277            match self.outbound.take() {
278                Some(OutboundState::Ping(mut ping)) => match ping.poll_unpin(cx) {
279                    Poll::Pending => {
280                        self.outbound = Some(OutboundState::Ping(ping));
281                        break;
282                    }
283                    Poll::Ready(Ok((stream, rtt))) => {
284                        tracing::debug!(?rtt, "ping succeeded");
285                        self.failures = 0;
286                        self.interval.reset(self.config.interval);
287                        self.outbound = Some(OutboundState::Idle(stream));
288                        return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(Ok(rtt)));
289                    }
290                    Poll::Ready(Err(e)) => {
291                        self.interval.reset(self.config.interval);
292                        self.pending_errors.push_front(e);
293                    }
294                },
295                Some(OutboundState::Idle(stream)) => match self.interval.poll_unpin(cx) {
296                    Poll::Pending => {
297                        self.outbound = Some(OutboundState::Idle(stream));
298                        break;
299                    }
300                    Poll::Ready(()) => {
301                        self.outbound = Some(OutboundState::Ping(
302                            send_ping(stream, self.config.timeout).boxed(),
303                        ));
304                    }
305                },
306                Some(OutboundState::OpenStream) => {
307                    self.outbound = Some(OutboundState::OpenStream);
308                    break;
309                }
310                None => match self.interval.poll_unpin(cx) {
311                    Poll::Pending => break,
312                    Poll::Ready(()) => {
313                        self.outbound = Some(OutboundState::OpenStream);
314                        let protocol = SubstreamProtocol::new(ReadyUpgrade::new(PROTOCOL_NAME), ());
315                        return Poll::Ready(ConnectionHandlerEvent::OutboundSubstreamRequest {
316                            protocol,
317                        });
318                    }
319                },
320            }
321        }
322
323        Poll::Pending
324    }
325
326    fn on_connection_event(
327        &mut self,
328        event: ConnectionEvent<
329            Self::InboundProtocol,
330            Self::OutboundProtocol,
331            Self::InboundOpenInfo,
332            Self::OutboundOpenInfo,
333        >,
334    ) {
335        match event {
336            ConnectionEvent::FullyNegotiatedInbound(FullyNegotiatedInbound {
337                protocol: mut stream,
338                ..
339            }) => {
340                stream.ignore_for_keep_alive();
341                self.inbound = Some(protocol::recv_ping(stream).boxed());
342            }
343            ConnectionEvent::FullyNegotiatedOutbound(FullyNegotiatedOutbound {
344                protocol: mut stream,
345                ..
346            }) => {
347                stream.ignore_for_keep_alive();
348                self.outbound = Some(OutboundState::Ping(
349                    send_ping(stream, self.config.timeout).boxed(),
350                ));
351            }
352            ConnectionEvent::DialUpgradeError(dial_upgrade_error) => {
353                self.on_dial_upgrade_error(dial_upgrade_error)
354            }
355            _ => {}
356        }
357    }
358}
359
360type PingFuture = BoxFuture<'static, Result<(Stream, Duration), Failure>>;
361type PongFuture = BoxFuture<'static, Result<Stream, io::Error>>;
362
363/// The current state w.r.t. outbound pings.
364enum OutboundState {
365    /// A new substream is being negotiated for the ping protocol.
366    OpenStream,
367    /// The substream is idle, waiting to send the next ping.
368    Idle(Stream),
369    /// A ping is being sent and the response awaited.
370    Ping(PingFuture),
371}
372
373/// A wrapper around [`protocol::send_ping`] that enforces a time out.
374async fn send_ping(stream: Stream, timeout: Duration) -> Result<(Stream, Duration), Failure> {
375    let ping = protocol::send_ping(stream);
376    futures::pin_mut!(ping);
377
378    match future::select(ping, Delay::new(timeout)).await {
379        Either::Left((Ok((stream, rtt)), _)) => Ok((stream, rtt)),
380        Either::Left((Err(e), _)) => Err(Failure::other(e)),
381        Either::Right(((), _)) => Err(Failure::Timeout),
382    }
383}