libp2p_swarm/connection/pool/
task.rs1use super::concurrent_dial::ConcurrentDial;
25use crate::{
26 connection::{
27 self, ConnectionError, ConnectionId, PendingInboundConnectionError,
28 PendingOutboundConnectionError,
29 },
30 transport::TransportError,
31 ConnectionHandler, Multiaddr, PeerId,
32};
33use futures::{
34 channel::{mpsc, oneshot},
35 future::{poll_fn, Either, Future},
36 SinkExt, StreamExt,
37};
38use libp2p_core::muxing::StreamMuxerBox;
39use std::pin::Pin;
40use void::Void;
41
42#[derive(Debug)]
44pub(crate) enum Command<T> {
45 NotifyHandler(T),
47 Close,
50}
51
52pub(crate) enum PendingConnectionEvent {
53 ConnectionEstablished {
54 id: ConnectionId,
55 output: (PeerId, StreamMuxerBox),
56 outgoing: Option<(Multiaddr, Vec<(Multiaddr, TransportError<std::io::Error>)>)>,
60 },
61 PendingFailed {
63 id: ConnectionId,
64 error: Either<PendingOutboundConnectionError, PendingInboundConnectionError>,
65 },
66}
67
68#[derive(Debug)]
69pub(crate) enum EstablishedConnectionEvent<ToBehaviour> {
70 AddressChange {
72 id: ConnectionId,
73 peer_id: PeerId,
74 new_address: Multiaddr,
75 },
76 Notify {
78 id: ConnectionId,
79 peer_id: PeerId,
80 event: ToBehaviour,
81 },
82 Closed {
87 id: ConnectionId,
88 peer_id: PeerId,
89 error: Option<ConnectionError>,
90 },
91}
92
93pub(crate) async fn new_for_pending_outgoing_connection(
94 connection_id: ConnectionId,
95 dial: ConcurrentDial,
96 abort_receiver: oneshot::Receiver<Void>,
97 mut events: mpsc::Sender<PendingConnectionEvent>,
98) {
99 match futures::future::select(abort_receiver, Box::pin(dial)).await {
100 Either::Left((Err(oneshot::Canceled), _)) => {
101 let _ = events
102 .send(PendingConnectionEvent::PendingFailed {
103 id: connection_id,
104 error: Either::Left(PendingOutboundConnectionError::Aborted),
105 })
106 .await;
107 }
108 Either::Left((Ok(v), _)) => void::unreachable(v),
109 Either::Right((Ok((address, output, errors)), _)) => {
110 let _ = events
111 .send(PendingConnectionEvent::ConnectionEstablished {
112 id: connection_id,
113 output,
114 outgoing: Some((address, errors)),
115 })
116 .await;
117 }
118 Either::Right((Err(e), _)) => {
119 let _ = events
120 .send(PendingConnectionEvent::PendingFailed {
121 id: connection_id,
122 error: Either::Left(PendingOutboundConnectionError::Transport(e)),
123 })
124 .await;
125 }
126 }
127}
128
129pub(crate) async fn new_for_pending_incoming_connection<TFut>(
130 connection_id: ConnectionId,
131 future: TFut,
132 abort_receiver: oneshot::Receiver<Void>,
133 mut events: mpsc::Sender<PendingConnectionEvent>,
134) where
135 TFut: Future<Output = Result<(PeerId, StreamMuxerBox), std::io::Error>> + Send + 'static,
136{
137 match futures::future::select(abort_receiver, Box::pin(future)).await {
138 Either::Left((Err(oneshot::Canceled), _)) => {
139 let _ = events
140 .send(PendingConnectionEvent::PendingFailed {
141 id: connection_id,
142 error: Either::Right(PendingInboundConnectionError::Aborted),
143 })
144 .await;
145 }
146 Either::Left((Ok(v), _)) => void::unreachable(v),
147 Either::Right((Ok(output), _)) => {
148 let _ = events
149 .send(PendingConnectionEvent::ConnectionEstablished {
150 id: connection_id,
151 output,
152 outgoing: None,
153 })
154 .await;
155 }
156 Either::Right((Err(e), _)) => {
157 let _ = events
158 .send(PendingConnectionEvent::PendingFailed {
159 id: connection_id,
160 error: Either::Right(PendingInboundConnectionError::Transport(
161 TransportError::Other(e),
162 )),
163 })
164 .await;
165 }
166 }
167}
168
169pub(crate) async fn new_for_established_connection<THandler>(
170 connection_id: ConnectionId,
171 peer_id: PeerId,
172 mut connection: crate::connection::Connection<THandler>,
173 mut command_receiver: mpsc::Receiver<Command<THandler::FromBehaviour>>,
174 mut events: mpsc::Sender<EstablishedConnectionEvent<THandler::ToBehaviour>>,
175) where
176 THandler: ConnectionHandler,
177{
178 loop {
179 match futures::future::select(
180 command_receiver.next(),
181 poll_fn(|cx| Pin::new(&mut connection).poll(cx)),
182 )
183 .await
184 {
185 Either::Left((Some(command), _)) => match command {
186 Command::NotifyHandler(event) => connection.on_behaviour_event(event),
187 Command::Close => {
188 command_receiver.close();
189 let (remaining_events, closing_muxer) = connection.close();
190
191 let _ = events
192 .send_all(&mut remaining_events.map(|event| {
193 Ok(EstablishedConnectionEvent::Notify {
194 id: connection_id,
195 event,
196 peer_id,
197 })
198 }))
199 .await;
200
201 let error = closing_muxer.await.err().map(ConnectionError::IO);
202
203 let _ = events
204 .send(EstablishedConnectionEvent::Closed {
205 id: connection_id,
206 peer_id,
207 error,
208 })
209 .await;
210 return;
211 }
212 },
213
214 Either::Left((None, _)) => return,
216
217 Either::Right((event, _)) => {
218 match event {
219 Ok(connection::Event::Handler(event)) => {
220 let _ = events
221 .send(EstablishedConnectionEvent::Notify {
222 id: connection_id,
223 peer_id,
224 event,
225 })
226 .await;
227 }
228 Ok(connection::Event::AddressChange(new_address)) => {
229 let _ = events
230 .send(EstablishedConnectionEvent::AddressChange {
231 id: connection_id,
232 peer_id,
233 new_address,
234 })
235 .await;
236 }
237 Err(error) => {
238 command_receiver.close();
239 let (remaining_events, _closing_muxer) = connection.close();
240
241 let _ = events
242 .send_all(&mut remaining_events.map(|event| {
243 Ok(EstablishedConnectionEvent::Notify {
244 id: connection_id,
245 event,
246 peer_id,
247 })
248 }))
249 .await;
250
251 let _ = events
253 .send(EstablishedConnectionEvent::Closed {
254 id: connection_id,
255 peer_id,
256 error: Some(error),
257 })
258 .await;
259 return;
260 }
261 }
262 }
263 }
264 }
265}