libp2p_swarm/connection/pool/
task.rs

1// Copyright 2021 Protocol Labs.
2// Copyright 2018 Parity Technologies (UK) Ltd.
3//
4// Permission is hereby granted, free of charge, to any person obtaining a
5// copy of this software and associated documentation files (the "Software"),
6// to deal in the Software without restriction, including without limitation
7// the rights to use, copy, modify, merge, publish, distribute, sublicense,
8// and/or sell copies of the Software, and to permit persons to whom the
9// Software is furnished to do so, subject to the following conditions:
10//
11// The above copyright notice and this permission notice shall be included in
12// all copies or substantial portions of the Software.
13//
14// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
15// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
19// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
20// DEALINGS IN THE SOFTWARE.
21
22//! Async functions driving pending and established connections in the form of a task.
23
24use super::concurrent_dial::ConcurrentDial;
25use crate::{
26    connection::{
27        self, ConnectionError, ConnectionId, PendingInboundConnectionError,
28        PendingOutboundConnectionError,
29    },
30    transport::TransportError,
31    ConnectionHandler, Multiaddr, PeerId,
32};
33use futures::{
34    channel::{mpsc, oneshot},
35    future::{poll_fn, Either, Future},
36    SinkExt, StreamExt,
37};
38use libp2p_core::muxing::StreamMuxerBox;
39use std::pin::Pin;
40use void::Void;
41
42/// Commands that can be sent to a task driving an established connection.
43#[derive(Debug)]
44pub(crate) enum Command<T> {
45    /// Notify the connection handler of an event.
46    NotifyHandler(T),
47    /// Gracefully close the connection (active close) before
48    /// terminating the task.
49    Close,
50}
51
52pub(crate) enum PendingConnectionEvent {
53    ConnectionEstablished {
54        id: ConnectionId,
55        output: (PeerId, StreamMuxerBox),
56        /// [`Some`] when the new connection is an outgoing connection.
57        /// Addresses are dialed in parallel. Contains the addresses and errors
58        /// of dial attempts that failed before the one successful dial.
59        outgoing: Option<(Multiaddr, Vec<(Multiaddr, TransportError<std::io::Error>)>)>,
60    },
61    /// A pending connection failed.
62    PendingFailed {
63        id: ConnectionId,
64        error: Either<PendingOutboundConnectionError, PendingInboundConnectionError>,
65    },
66}
67
68#[derive(Debug)]
69pub(crate) enum EstablishedConnectionEvent<ToBehaviour> {
70    /// A node we are connected to has changed its address.
71    AddressChange {
72        id: ConnectionId,
73        peer_id: PeerId,
74        new_address: Multiaddr,
75    },
76    /// Notify the manager of an event from the connection.
77    Notify {
78        id: ConnectionId,
79        peer_id: PeerId,
80        event: ToBehaviour,
81    },
82    /// A connection closed, possibly due to an error.
83    ///
84    /// If `error` is `None`, the connection has completed
85    /// an active orderly close.
86    Closed {
87        id: ConnectionId,
88        peer_id: PeerId,
89        error: Option<ConnectionError>,
90    },
91}
92
93pub(crate) async fn new_for_pending_outgoing_connection(
94    connection_id: ConnectionId,
95    dial: ConcurrentDial,
96    abort_receiver: oneshot::Receiver<Void>,
97    mut events: mpsc::Sender<PendingConnectionEvent>,
98) {
99    match futures::future::select(abort_receiver, Box::pin(dial)).await {
100        Either::Left((Err(oneshot::Canceled), _)) => {
101            let _ = events
102                .send(PendingConnectionEvent::PendingFailed {
103                    id: connection_id,
104                    error: Either::Left(PendingOutboundConnectionError::Aborted),
105                })
106                .await;
107        }
108        Either::Left((Ok(v), _)) => void::unreachable(v),
109        Either::Right((Ok((address, output, errors)), _)) => {
110            let _ = events
111                .send(PendingConnectionEvent::ConnectionEstablished {
112                    id: connection_id,
113                    output,
114                    outgoing: Some((address, errors)),
115                })
116                .await;
117        }
118        Either::Right((Err(e), _)) => {
119            let _ = events
120                .send(PendingConnectionEvent::PendingFailed {
121                    id: connection_id,
122                    error: Either::Left(PendingOutboundConnectionError::Transport(e)),
123                })
124                .await;
125        }
126    }
127}
128
129pub(crate) async fn new_for_pending_incoming_connection<TFut>(
130    connection_id: ConnectionId,
131    future: TFut,
132    abort_receiver: oneshot::Receiver<Void>,
133    mut events: mpsc::Sender<PendingConnectionEvent>,
134) where
135    TFut: Future<Output = Result<(PeerId, StreamMuxerBox), std::io::Error>> + Send + 'static,
136{
137    match futures::future::select(abort_receiver, Box::pin(future)).await {
138        Either::Left((Err(oneshot::Canceled), _)) => {
139            let _ = events
140                .send(PendingConnectionEvent::PendingFailed {
141                    id: connection_id,
142                    error: Either::Right(PendingInboundConnectionError::Aborted),
143                })
144                .await;
145        }
146        Either::Left((Ok(v), _)) => void::unreachable(v),
147        Either::Right((Ok(output), _)) => {
148            let _ = events
149                .send(PendingConnectionEvent::ConnectionEstablished {
150                    id: connection_id,
151                    output,
152                    outgoing: None,
153                })
154                .await;
155        }
156        Either::Right((Err(e), _)) => {
157            let _ = events
158                .send(PendingConnectionEvent::PendingFailed {
159                    id: connection_id,
160                    error: Either::Right(PendingInboundConnectionError::Transport(
161                        TransportError::Other(e),
162                    )),
163                })
164                .await;
165        }
166    }
167}
168
169pub(crate) async fn new_for_established_connection<THandler>(
170    connection_id: ConnectionId,
171    peer_id: PeerId,
172    mut connection: crate::connection::Connection<THandler>,
173    mut command_receiver: mpsc::Receiver<Command<THandler::FromBehaviour>>,
174    mut events: mpsc::Sender<EstablishedConnectionEvent<THandler::ToBehaviour>>,
175) where
176    THandler: ConnectionHandler,
177{
178    loop {
179        match futures::future::select(
180            command_receiver.next(),
181            poll_fn(|cx| Pin::new(&mut connection).poll(cx)),
182        )
183        .await
184        {
185            Either::Left((Some(command), _)) => match command {
186                Command::NotifyHandler(event) => connection.on_behaviour_event(event),
187                Command::Close => {
188                    command_receiver.close();
189                    let (remaining_events, closing_muxer) = connection.close();
190
191                    let _ = events
192                        .send_all(&mut remaining_events.map(|event| {
193                            Ok(EstablishedConnectionEvent::Notify {
194                                id: connection_id,
195                                event,
196                                peer_id,
197                            })
198                        }))
199                        .await;
200
201                    let error = closing_muxer.await.err().map(ConnectionError::IO);
202
203                    let _ = events
204                        .send(EstablishedConnectionEvent::Closed {
205                            id: connection_id,
206                            peer_id,
207                            error,
208                        })
209                        .await;
210                    return;
211                }
212            },
213
214            // The manager has disappeared; abort.
215            Either::Left((None, _)) => return,
216
217            Either::Right((event, _)) => {
218                match event {
219                    Ok(connection::Event::Handler(event)) => {
220                        let _ = events
221                            .send(EstablishedConnectionEvent::Notify {
222                                id: connection_id,
223                                peer_id,
224                                event,
225                            })
226                            .await;
227                    }
228                    Ok(connection::Event::AddressChange(new_address)) => {
229                        let _ = events
230                            .send(EstablishedConnectionEvent::AddressChange {
231                                id: connection_id,
232                                peer_id,
233                                new_address,
234                            })
235                            .await;
236                    }
237                    Err(error) => {
238                        command_receiver.close();
239                        let (remaining_events, _closing_muxer) = connection.close();
240
241                        let _ = events
242                            .send_all(&mut remaining_events.map(|event| {
243                                Ok(EstablishedConnectionEvent::Notify {
244                                    id: connection_id,
245                                    event,
246                                    peer_id,
247                                })
248                            }))
249                            .await;
250
251                        // Terminate the task with the error, dropping the connection.
252                        let _ = events
253                            .send(EstablishedConnectionEvent::Closed {
254                                id: connection_id,
255                                peer_id,
256                                error: Some(error),
257                            })
258                            .await;
259                        return;
260                    }
261                }
262            }
263        }
264    }
265}