1use crate::{protocol, PROTOCOL_NAME};
22use futures::future::{BoxFuture, Either};
23use futures::prelude::*;
24use futures_timer::Delay;
25use libp2p_core::upgrade::ReadyUpgrade;
26use libp2p_swarm::handler::{
27 ConnectionEvent, DialUpgradeError, FullyNegotiatedInbound, FullyNegotiatedOutbound,
28};
29use libp2p_swarm::{
30 ConnectionHandler, ConnectionHandlerEvent, Stream, StreamProtocol, StreamUpgradeError,
31 SubstreamProtocol,
32};
33use std::collections::VecDeque;
34use std::{
35 error::Error,
36 fmt, io,
37 task::{Context, Poll},
38 time::Duration,
39};
40use void::Void;
41
42#[derive(Debug, Clone)]
44pub struct Config {
45 timeout: Duration,
47 interval: Duration,
49}
50
51impl Config {
52 pub fn new() -> Self {
63 Self {
64 timeout: Duration::from_secs(20),
65 interval: Duration::from_secs(15),
66 }
67 }
68
69 pub fn with_timeout(mut self, d: Duration) -> Self {
71 self.timeout = d;
72 self
73 }
74
75 pub fn with_interval(mut self, d: Duration) -> Self {
77 self.interval = d;
78 self
79 }
80}
81
82impl Default for Config {
83 fn default() -> Self {
84 Self::new()
85 }
86}
87
88#[derive(Debug)]
90pub enum Failure {
91 Timeout,
94 Unsupported,
96 Other {
98 error: Box<dyn std::error::Error + Send + Sync + 'static>,
99 },
100}
101
102impl Failure {
103 fn other(e: impl std::error::Error + Send + Sync + 'static) -> Self {
104 Self::Other { error: Box::new(e) }
105 }
106}
107
108impl fmt::Display for Failure {
109 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
110 match self {
111 Failure::Timeout => f.write_str("Ping timeout"),
112 Failure::Other { error } => write!(f, "Ping error: {error}"),
113 Failure::Unsupported => write!(f, "Ping protocol not supported"),
114 }
115 }
116}
117
118impl Error for Failure {
119 fn source(&self) -> Option<&(dyn Error + 'static)> {
120 match self {
121 Failure::Timeout => None,
122 Failure::Other { error } => Some(&**error),
123 Failure::Unsupported => None,
124 }
125 }
126}
127
128pub struct Handler {
131 config: Config,
133 interval: Delay,
135 pending_errors: VecDeque<Failure>,
137 failures: u32,
141 outbound: Option<OutboundState>,
143 inbound: Option<PongFuture>,
147 state: State,
149}
150
151#[derive(Debug, Clone, Copy, PartialEq, Eq)]
152enum State {
153 Inactive {
155 reported: bool,
159 },
160 Active,
162}
163
164impl Handler {
165 pub fn new(config: Config) -> Self {
167 Handler {
168 config,
169 interval: Delay::new(Duration::new(0, 0)),
170 pending_errors: VecDeque::with_capacity(2),
171 failures: 0,
172 outbound: None,
173 inbound: None,
174 state: State::Active,
175 }
176 }
177
178 fn on_dial_upgrade_error(
179 &mut self,
180 DialUpgradeError { error, .. }: DialUpgradeError<
181 <Self as ConnectionHandler>::OutboundOpenInfo,
182 <Self as ConnectionHandler>::OutboundProtocol,
183 >,
184 ) {
185 self.outbound = None; let error = match error {
188 StreamUpgradeError::NegotiationFailed => {
189 debug_assert_eq!(self.state, State::Active);
190
191 self.state = State::Inactive { reported: false };
192 return;
193 }
194 StreamUpgradeError::Timeout => Failure::Other {
196 error: Box::new(std::io::Error::new(
197 std::io::ErrorKind::TimedOut,
198 "ping protocol negotiation timed out",
199 )),
200 },
201 StreamUpgradeError::Apply(e) => void::unreachable(e),
202 StreamUpgradeError::Io(e) => Failure::Other { error: Box::new(e) },
203 };
204
205 self.pending_errors.push_front(error);
206 }
207}
208
209impl ConnectionHandler for Handler {
210 type FromBehaviour = Void;
211 type ToBehaviour = Result<Duration, Failure>;
212 type InboundProtocol = ReadyUpgrade<StreamProtocol>;
213 type OutboundProtocol = ReadyUpgrade<StreamProtocol>;
214 type OutboundOpenInfo = ();
215 type InboundOpenInfo = ();
216
217 fn listen_protocol(&self) -> SubstreamProtocol<ReadyUpgrade<StreamProtocol>, ()> {
218 SubstreamProtocol::new(ReadyUpgrade::new(PROTOCOL_NAME), ())
219 }
220
221 fn on_behaviour_event(&mut self, _: Void) {}
222
223 #[tracing::instrument(level = "trace", name = "ConnectionHandler::poll", skip(self, cx))]
224 fn poll(
225 &mut self,
226 cx: &mut Context<'_>,
227 ) -> Poll<ConnectionHandlerEvent<ReadyUpgrade<StreamProtocol>, (), Result<Duration, Failure>>>
228 {
229 match self.state {
230 State::Inactive { reported: true } => {
231 return Poll::Pending; }
233 State::Inactive { reported: false } => {
234 self.state = State::Inactive { reported: true };
235 return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(Err(
236 Failure::Unsupported,
237 )));
238 }
239 State::Active => {}
240 }
241
242 if let Some(fut) = self.inbound.as_mut() {
244 match fut.poll_unpin(cx) {
245 Poll::Pending => {}
246 Poll::Ready(Err(e)) => {
247 tracing::debug!("Inbound ping error: {:?}", e);
248 self.inbound = None;
249 }
250 Poll::Ready(Ok(stream)) => {
251 tracing::trace!("answered inbound ping from peer");
252
253 self.inbound = Some(protocol::recv_ping(stream).boxed());
255 }
256 }
257 }
258
259 loop {
260 if let Some(error) = self.pending_errors.pop_back() {
262 tracing::debug!("Ping failure: {:?}", error);
263
264 self.failures += 1;
265
266 if self.failures > 1 {
272 return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(Err(error)));
273 }
274 }
275
276 match self.outbound.take() {
278 Some(OutboundState::Ping(mut ping)) => match ping.poll_unpin(cx) {
279 Poll::Pending => {
280 self.outbound = Some(OutboundState::Ping(ping));
281 break;
282 }
283 Poll::Ready(Ok((stream, rtt))) => {
284 tracing::debug!(?rtt, "ping succeeded");
285 self.failures = 0;
286 self.interval.reset(self.config.interval);
287 self.outbound = Some(OutboundState::Idle(stream));
288 return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(Ok(rtt)));
289 }
290 Poll::Ready(Err(e)) => {
291 self.interval.reset(self.config.interval);
292 self.pending_errors.push_front(e);
293 }
294 },
295 Some(OutboundState::Idle(stream)) => match self.interval.poll_unpin(cx) {
296 Poll::Pending => {
297 self.outbound = Some(OutboundState::Idle(stream));
298 break;
299 }
300 Poll::Ready(()) => {
301 self.outbound = Some(OutboundState::Ping(
302 send_ping(stream, self.config.timeout).boxed(),
303 ));
304 }
305 },
306 Some(OutboundState::OpenStream) => {
307 self.outbound = Some(OutboundState::OpenStream);
308 break;
309 }
310 None => match self.interval.poll_unpin(cx) {
311 Poll::Pending => break,
312 Poll::Ready(()) => {
313 self.outbound = Some(OutboundState::OpenStream);
314 let protocol = SubstreamProtocol::new(ReadyUpgrade::new(PROTOCOL_NAME), ());
315 return Poll::Ready(ConnectionHandlerEvent::OutboundSubstreamRequest {
316 protocol,
317 });
318 }
319 },
320 }
321 }
322
323 Poll::Pending
324 }
325
326 fn on_connection_event(
327 &mut self,
328 event: ConnectionEvent<
329 Self::InboundProtocol,
330 Self::OutboundProtocol,
331 Self::InboundOpenInfo,
332 Self::OutboundOpenInfo,
333 >,
334 ) {
335 match event {
336 ConnectionEvent::FullyNegotiatedInbound(FullyNegotiatedInbound {
337 protocol: mut stream,
338 ..
339 }) => {
340 stream.ignore_for_keep_alive();
341 self.inbound = Some(protocol::recv_ping(stream).boxed());
342 }
343 ConnectionEvent::FullyNegotiatedOutbound(FullyNegotiatedOutbound {
344 protocol: mut stream,
345 ..
346 }) => {
347 stream.ignore_for_keep_alive();
348 self.outbound = Some(OutboundState::Ping(
349 send_ping(stream, self.config.timeout).boxed(),
350 ));
351 }
352 ConnectionEvent::DialUpgradeError(dial_upgrade_error) => {
353 self.on_dial_upgrade_error(dial_upgrade_error)
354 }
355 _ => {}
356 }
357 }
358}
359
360type PingFuture = BoxFuture<'static, Result<(Stream, Duration), Failure>>;
361type PongFuture = BoxFuture<'static, Result<Stream, io::Error>>;
362
363enum OutboundState {
365 OpenStream,
367 Idle(Stream),
369 Ping(PingFuture),
371}
372
373async fn send_ping(stream: Stream, timeout: Duration) -> Result<(Stream, Duration), Failure> {
375 let ping = protocol::send_ping(stream);
376 futures::pin_mut!(ping);
377
378 match future::select(ping, Delay::new(timeout)).await {
379 Either::Left((Ok((stream, rtt)), _)) => Ok((stream, rtt)),
380 Either::Left((Err(e), _)) => Err(Failure::other(e)),
381 Either::Right(((), _)) => Err(Failure::Timeout),
382 }
383}