libp2p_kad/
protocol.rs

1// Copyright 2018 Parity Technologies (UK) Ltd.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21//! The Kademlia connection protocol upgrade and associated message types.
22//!
23//! The connection protocol upgrade is provided by [`ProtocolConfig`], with the
24//! request and response types [`KadRequestMsg`] and [`KadResponseMsg`], respectively.
25//! The upgrade's output is a `Sink + Stream` of messages. The `Stream` component is used
26//! to poll the underlying transport for incoming messages, and the `Sink` component
27//! is used to send messages to remote peers.
28
29use crate::proto;
30use crate::record::{self, Record};
31use asynchronous_codec::{Decoder, Encoder, Framed};
32use bytes::BytesMut;
33use futures::prelude::*;
34use instant::Instant;
35use libp2p_core::upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo};
36use libp2p_core::Multiaddr;
37use libp2p_identity::PeerId;
38use libp2p_swarm::StreamProtocol;
39use std::marker::PhantomData;
40use std::{convert::TryFrom, time::Duration};
41use std::{io, iter};
42use tracing::debug;
43
44/// The protocol name used for negotiating with multistream-select.
45pub(crate) const DEFAULT_PROTO_NAME: StreamProtocol = StreamProtocol::new("/ipfs/kad/1.0.0");
46/// The default maximum size for a varint length-delimited packet.
47pub(crate) const DEFAULT_MAX_PACKET_SIZE: usize = 16 * 1024;
48/// Status of our connection to a node reported by the Kademlia protocol.
49#[derive(Copy, Clone, PartialEq, Eq, Debug, Hash)]
50pub enum ConnectionType {
51    /// Sender hasn't tried to connect to peer.
52    NotConnected = 0,
53    /// Sender is currently connected to peer.
54    Connected = 1,
55    /// Sender was recently connected to peer.
56    CanConnect = 2,
57    /// Sender tried to connect to peer but failed.
58    CannotConnect = 3,
59}
60
61impl From<proto::ConnectionType> for ConnectionType {
62    fn from(raw: proto::ConnectionType) -> ConnectionType {
63        use proto::ConnectionType::*;
64        match raw {
65            NOT_CONNECTED => ConnectionType::NotConnected,
66            CONNECTED => ConnectionType::Connected,
67            CAN_CONNECT => ConnectionType::CanConnect,
68            CANNOT_CONNECT => ConnectionType::CannotConnect,
69        }
70    }
71}
72
73impl From<ConnectionType> for proto::ConnectionType {
74    fn from(val: ConnectionType) -> Self {
75        use proto::ConnectionType::*;
76        match val {
77            ConnectionType::NotConnected => NOT_CONNECTED,
78            ConnectionType::Connected => CONNECTED,
79            ConnectionType::CanConnect => CAN_CONNECT,
80            ConnectionType::CannotConnect => CANNOT_CONNECT,
81        }
82    }
83}
84
85/// Information about a peer, as known by the sender.
86#[derive(Debug, Clone, PartialEq, Eq)]
87pub struct KadPeer {
88    /// Identifier of the peer.
89    pub node_id: PeerId,
90    /// The multiaddresses that the sender think can be used in order to reach the peer.
91    pub multiaddrs: Vec<Multiaddr>,
92    /// How the sender is connected to that remote.
93    pub connection_ty: ConnectionType,
94}
95
96// Builds a `KadPeer` from a corresponding protobuf message.
97impl TryFrom<proto::Peer> for KadPeer {
98    type Error = io::Error;
99
100    fn try_from(peer: proto::Peer) -> Result<KadPeer, Self::Error> {
101        // TODO: this is in fact a CID; not sure if this should be handled in `from_bytes` or
102        //       as a special case here
103        let node_id = PeerId::from_bytes(&peer.id).map_err(|_| invalid_data("invalid peer id"))?;
104
105        let mut addrs = Vec::with_capacity(peer.addrs.len());
106        for addr in peer.addrs.into_iter() {
107            match Multiaddr::try_from(addr).map(|addr| addr.with_p2p(node_id)) {
108                Ok(Ok(a)) => addrs.push(a),
109                Ok(Err(a)) => {
110                    debug!("Unable to parse multiaddr: {a} is not compatible with {node_id}")
111                }
112                Err(e) => debug!("Unable to parse multiaddr: {e}"),
113            };
114        }
115
116        Ok(KadPeer {
117            node_id,
118            multiaddrs: addrs,
119            connection_ty: peer.connection.into(),
120        })
121    }
122}
123
124impl From<KadPeer> for proto::Peer {
125    fn from(peer: KadPeer) -> Self {
126        proto::Peer {
127            id: peer.node_id.to_bytes(),
128            addrs: peer.multiaddrs.into_iter().map(|a| a.to_vec()).collect(),
129            connection: peer.connection_ty.into(),
130        }
131    }
132}
133
134/// Configuration for a Kademlia connection upgrade. When applied to a connection, turns this
135/// connection into a `Stream + Sink` whose items are of type `KadRequestMsg` and `KadResponseMsg`.
136// TODO: if, as suspected, we can confirm with Protocol Labs that each open Kademlia substream does
137//       only one request, then we can change the output of the `InboundUpgrade` and
138//       `OutboundUpgrade` to be just a single message
139#[derive(Debug, Clone)]
140pub struct ProtocolConfig {
141    protocol_names: Vec<StreamProtocol>,
142    /// Maximum allowed size of a packet.
143    max_packet_size: usize,
144}
145
146impl ProtocolConfig {
147    /// Returns the configured protocol name.
148    pub fn protocol_names(&self) -> &[StreamProtocol] {
149        &self.protocol_names
150    }
151
152    /// Modifies the protocol names used on the wire. Can be used to create incompatibilities
153    /// between networks on purpose.
154    pub fn set_protocol_names(&mut self, names: Vec<StreamProtocol>) {
155        self.protocol_names = names;
156    }
157
158    /// Modifies the maximum allowed size of a single Kademlia packet.
159    pub fn set_max_packet_size(&mut self, size: usize) {
160        self.max_packet_size = size;
161    }
162}
163
164impl Default for ProtocolConfig {
165    fn default() -> Self {
166        ProtocolConfig {
167            protocol_names: iter::once(DEFAULT_PROTO_NAME).collect(),
168            max_packet_size: DEFAULT_MAX_PACKET_SIZE,
169        }
170    }
171}
172
173impl UpgradeInfo for ProtocolConfig {
174    type Info = StreamProtocol;
175    type InfoIter = std::vec::IntoIter<Self::Info>;
176
177    fn protocol_info(&self) -> Self::InfoIter {
178        self.protocol_names.clone().into_iter()
179    }
180}
181
182/// Codec for Kademlia inbound and outbound message framing.
183pub struct Codec<A, B> {
184    codec: quick_protobuf_codec::Codec<proto::Message>,
185    __phantom: PhantomData<(A, B)>,
186}
187impl<A, B> Codec<A, B> {
188    fn new(max_packet_size: usize) -> Self {
189        Codec {
190            codec: quick_protobuf_codec::Codec::new(max_packet_size),
191            __phantom: PhantomData,
192        }
193    }
194}
195
196impl<A: Into<proto::Message>, B> Encoder for Codec<A, B> {
197    type Error = io::Error;
198    type Item<'a> = A;
199
200    fn encode(&mut self, item: Self::Item<'_>, dst: &mut BytesMut) -> Result<(), Self::Error> {
201        Ok(self.codec.encode(item.into(), dst)?)
202    }
203}
204impl<A, B: TryFrom<proto::Message, Error = io::Error>> Decoder for Codec<A, B> {
205    type Error = io::Error;
206    type Item = B;
207
208    fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
209        self.codec.decode(src)?.map(B::try_from).transpose()
210    }
211}
212
213/// Sink of responses and stream of requests.
214pub(crate) type KadInStreamSink<S> = Framed<S, Codec<KadResponseMsg, KadRequestMsg>>;
215/// Sink of requests and stream of responses.
216pub(crate) type KadOutStreamSink<S> = Framed<S, Codec<KadRequestMsg, KadResponseMsg>>;
217
218impl<C> InboundUpgrade<C> for ProtocolConfig
219where
220    C: AsyncRead + AsyncWrite + Unpin,
221{
222    type Output = KadInStreamSink<C>;
223    type Future = future::Ready<Result<Self::Output, io::Error>>;
224    type Error = io::Error;
225
226    fn upgrade_inbound(self, incoming: C, _: Self::Info) -> Self::Future {
227        let codec = Codec::new(self.max_packet_size);
228
229        future::ok(Framed::new(incoming, codec))
230    }
231}
232
233impl<C> OutboundUpgrade<C> for ProtocolConfig
234where
235    C: AsyncRead + AsyncWrite + Unpin,
236{
237    type Output = KadOutStreamSink<C>;
238    type Future = future::Ready<Result<Self::Output, io::Error>>;
239    type Error = io::Error;
240
241    fn upgrade_outbound(self, incoming: C, _: Self::Info) -> Self::Future {
242        let codec = Codec::new(self.max_packet_size);
243
244        future::ok(Framed::new(incoming, codec))
245    }
246}
247
248/// Request that we can send to a peer or that we received from a peer.
249#[derive(Debug, Clone, PartialEq, Eq)]
250pub enum KadRequestMsg {
251    /// Ping request.
252    Ping,
253
254    /// Request for the list of nodes whose IDs are the closest to `key`. The number of nodes
255    /// returned is not specified, but should be around 20.
256    FindNode {
257        /// The key for which to locate the closest nodes.
258        key: Vec<u8>,
259    },
260
261    /// Same as `FindNode`, but should also return the entries of the local providers list for
262    /// this key.
263    GetProviders {
264        /// Identifier being searched.
265        key: record::Key,
266    },
267
268    /// Indicates that this list of providers is known for this key.
269    AddProvider {
270        /// Key for which we should add providers.
271        key: record::Key,
272        /// Known provider for this key.
273        provider: KadPeer,
274    },
275
276    /// Request to get a value from the dht records.
277    GetValue {
278        /// The key we are searching for.
279        key: record::Key,
280    },
281
282    /// Request to put a value into the dht records.
283    PutValue { record: Record },
284}
285
286/// Response that we can send to a peer or that we received from a peer.
287#[derive(Debug, Clone, PartialEq, Eq)]
288pub enum KadResponseMsg {
289    /// Ping response.
290    Pong,
291
292    /// Response to a `FindNode`.
293    FindNode {
294        /// Results of the request.
295        closer_peers: Vec<KadPeer>,
296    },
297
298    /// Response to a `GetProviders`.
299    GetProviders {
300        /// Nodes closest to the key.
301        closer_peers: Vec<KadPeer>,
302        /// Known providers for this key.
303        provider_peers: Vec<KadPeer>,
304    },
305
306    /// Response to a `GetValue`.
307    GetValue {
308        /// Result that might have been found
309        record: Option<Record>,
310        /// Nodes closest to the key
311        closer_peers: Vec<KadPeer>,
312    },
313
314    /// Response to a `PutValue`.
315    PutValue {
316        /// The key of the record.
317        key: record::Key,
318        /// Value of the record.
319        value: Vec<u8>,
320    },
321}
322
323impl From<KadRequestMsg> for proto::Message {
324    fn from(kad_msg: KadRequestMsg) -> Self {
325        req_msg_to_proto(kad_msg)
326    }
327}
328impl From<KadResponseMsg> for proto::Message {
329    fn from(kad_msg: KadResponseMsg) -> Self {
330        resp_msg_to_proto(kad_msg)
331    }
332}
333impl TryFrom<proto::Message> for KadRequestMsg {
334    type Error = io::Error;
335
336    fn try_from(message: proto::Message) -> Result<Self, Self::Error> {
337        proto_to_req_msg(message)
338    }
339}
340impl TryFrom<proto::Message> for KadResponseMsg {
341    type Error = io::Error;
342
343    fn try_from(message: proto::Message) -> Result<Self, Self::Error> {
344        proto_to_resp_msg(message)
345    }
346}
347
348/// Converts a `KadRequestMsg` into the corresponding protobuf message for sending.
349fn req_msg_to_proto(kad_msg: KadRequestMsg) -> proto::Message {
350    match kad_msg {
351        KadRequestMsg::Ping => proto::Message {
352            type_pb: proto::MessageType::PING,
353            ..proto::Message::default()
354        },
355        KadRequestMsg::FindNode { key } => proto::Message {
356            type_pb: proto::MessageType::FIND_NODE,
357            key,
358            clusterLevelRaw: 10,
359            ..proto::Message::default()
360        },
361        KadRequestMsg::GetProviders { key } => proto::Message {
362            type_pb: proto::MessageType::GET_PROVIDERS,
363            key: key.to_vec(),
364            clusterLevelRaw: 10,
365            ..proto::Message::default()
366        },
367        KadRequestMsg::AddProvider { key, provider } => proto::Message {
368            type_pb: proto::MessageType::ADD_PROVIDER,
369            clusterLevelRaw: 10,
370            key: key.to_vec(),
371            providerPeers: vec![provider.into()],
372            ..proto::Message::default()
373        },
374        KadRequestMsg::GetValue { key } => proto::Message {
375            type_pb: proto::MessageType::GET_VALUE,
376            clusterLevelRaw: 10,
377            key: key.to_vec(),
378            ..proto::Message::default()
379        },
380        KadRequestMsg::PutValue { record } => proto::Message {
381            type_pb: proto::MessageType::PUT_VALUE,
382            key: record.key.to_vec(),
383            record: Some(record_to_proto(record)),
384            ..proto::Message::default()
385        },
386    }
387}
388
389/// Converts a `KadResponseMsg` into the corresponding protobuf message for sending.
390fn resp_msg_to_proto(kad_msg: KadResponseMsg) -> proto::Message {
391    match kad_msg {
392        KadResponseMsg::Pong => proto::Message {
393            type_pb: proto::MessageType::PING,
394            ..proto::Message::default()
395        },
396        KadResponseMsg::FindNode { closer_peers } => proto::Message {
397            type_pb: proto::MessageType::FIND_NODE,
398            clusterLevelRaw: 9,
399            closerPeers: closer_peers.into_iter().map(KadPeer::into).collect(),
400            ..proto::Message::default()
401        },
402        KadResponseMsg::GetProviders {
403            closer_peers,
404            provider_peers,
405        } => proto::Message {
406            type_pb: proto::MessageType::GET_PROVIDERS,
407            clusterLevelRaw: 9,
408            closerPeers: closer_peers.into_iter().map(KadPeer::into).collect(),
409            providerPeers: provider_peers.into_iter().map(KadPeer::into).collect(),
410            ..proto::Message::default()
411        },
412        KadResponseMsg::GetValue {
413            record,
414            closer_peers,
415        } => proto::Message {
416            type_pb: proto::MessageType::GET_VALUE,
417            clusterLevelRaw: 9,
418            closerPeers: closer_peers.into_iter().map(KadPeer::into).collect(),
419            record: record.map(record_to_proto),
420            ..proto::Message::default()
421        },
422        KadResponseMsg::PutValue { key, value } => proto::Message {
423            type_pb: proto::MessageType::PUT_VALUE,
424            key: key.to_vec(),
425            record: Some(proto::Record {
426                key: key.to_vec(),
427                value,
428                ..proto::Record::default()
429            }),
430            ..proto::Message::default()
431        },
432    }
433}
434
435/// Converts a received protobuf message into a corresponding `KadRequestMsg`.
436///
437/// Fails if the protobuf message is not a valid and supported Kademlia request message.
438fn proto_to_req_msg(message: proto::Message) -> Result<KadRequestMsg, io::Error> {
439    match message.type_pb {
440        proto::MessageType::PING => Ok(KadRequestMsg::Ping),
441        proto::MessageType::PUT_VALUE => {
442            let record = record_from_proto(message.record.unwrap_or_default())?;
443            Ok(KadRequestMsg::PutValue { record })
444        }
445        proto::MessageType::GET_VALUE => Ok(KadRequestMsg::GetValue {
446            key: record::Key::from(message.key),
447        }),
448        proto::MessageType::FIND_NODE => Ok(KadRequestMsg::FindNode { key: message.key }),
449        proto::MessageType::GET_PROVIDERS => Ok(KadRequestMsg::GetProviders {
450            key: record::Key::from(message.key),
451        }),
452        proto::MessageType::ADD_PROVIDER => {
453            // TODO: for now we don't parse the peer properly, so it is possible that we get
454            //       parsing errors for peers even when they are valid; we ignore these
455            //       errors for now, but ultimately we should just error altogether
456            let provider = message
457                .providerPeers
458                .into_iter()
459                .find_map(|peer| KadPeer::try_from(peer).ok());
460
461            if let Some(provider) = provider {
462                let key = record::Key::from(message.key);
463                Ok(KadRequestMsg::AddProvider { key, provider })
464            } else {
465                Err(invalid_data("AddProvider message with no valid peer."))
466            }
467        }
468    }
469}
470
471/// Converts a received protobuf message into a corresponding `KadResponseMessage`.
472///
473/// Fails if the protobuf message is not a valid and supported Kademlia response message.
474fn proto_to_resp_msg(message: proto::Message) -> Result<KadResponseMsg, io::Error> {
475    match message.type_pb {
476        proto::MessageType::PING => Ok(KadResponseMsg::Pong),
477        proto::MessageType::GET_VALUE => {
478            let record = if let Some(r) = message.record {
479                Some(record_from_proto(r)?)
480            } else {
481                None
482            };
483
484            let closer_peers = message
485                .closerPeers
486                .into_iter()
487                .filter_map(|peer| KadPeer::try_from(peer).ok())
488                .collect();
489
490            Ok(KadResponseMsg::GetValue {
491                record,
492                closer_peers,
493            })
494        }
495
496        proto::MessageType::FIND_NODE => {
497            let closer_peers = message
498                .closerPeers
499                .into_iter()
500                .filter_map(|peer| KadPeer::try_from(peer).ok())
501                .collect();
502
503            Ok(KadResponseMsg::FindNode { closer_peers })
504        }
505
506        proto::MessageType::GET_PROVIDERS => {
507            let closer_peers = message
508                .closerPeers
509                .into_iter()
510                .filter_map(|peer| KadPeer::try_from(peer).ok())
511                .collect();
512
513            let provider_peers = message
514                .providerPeers
515                .into_iter()
516                .filter_map(|peer| KadPeer::try_from(peer).ok())
517                .collect();
518
519            Ok(KadResponseMsg::GetProviders {
520                closer_peers,
521                provider_peers,
522            })
523        }
524
525        proto::MessageType::PUT_VALUE => {
526            let key = record::Key::from(message.key);
527            let rec = message
528                .record
529                .ok_or_else(|| invalid_data("received PutValue message with no record"))?;
530
531            Ok(KadResponseMsg::PutValue {
532                key,
533                value: rec.value,
534            })
535        }
536
537        proto::MessageType::ADD_PROVIDER => {
538            Err(invalid_data("received an unexpected AddProvider message"))
539        }
540    }
541}
542
543fn record_from_proto(record: proto::Record) -> Result<Record, io::Error> {
544    let key = record::Key::from(record.key);
545    let value = record.value;
546
547    let publisher = if !record.publisher.is_empty() {
548        PeerId::from_bytes(&record.publisher)
549            .map(Some)
550            .map_err(|_| invalid_data("Invalid publisher peer ID."))?
551    } else {
552        None
553    };
554
555    let expires = if record.ttl > 0 {
556        Some(Instant::now() + Duration::from_secs(record.ttl as u64))
557    } else {
558        None
559    };
560
561    Ok(Record {
562        key,
563        value,
564        publisher,
565        expires,
566    })
567}
568
569fn record_to_proto(record: Record) -> proto::Record {
570    proto::Record {
571        key: record.key.to_vec(),
572        value: record.value,
573        publisher: record.publisher.map(|id| id.to_bytes()).unwrap_or_default(),
574        ttl: record
575            .expires
576            .map(|t| {
577                let now = Instant::now();
578                if t > now {
579                    (t - now).as_secs() as u32
580                } else {
581                    1 // because 0 means "does not expire"
582                }
583            })
584            .unwrap_or(0),
585        timeReceived: String::new(),
586    }
587}
588
589/// Creates an `io::Error` with `io::ErrorKind::InvalidData`.
590fn invalid_data<E>(e: E) -> io::Error
591where
592    E: Into<Box<dyn std::error::Error + Send + Sync>>,
593{
594    io::Error::new(io::ErrorKind::InvalidData, e)
595}
596
597#[cfg(test)]
598mod tests {
599    use super::*;
600
601    #[test]
602    fn append_p2p() {
603        let peer_id = PeerId::random();
604        let multiaddr = "/ip6/2001:db8::/tcp/1234".parse::<Multiaddr>().unwrap();
605
606        let payload = proto::Peer {
607            id: peer_id.to_bytes(),
608            addrs: vec![multiaddr.to_vec()],
609            connection: proto::ConnectionType::CAN_CONNECT,
610        };
611
612        let peer = KadPeer::try_from(payload).unwrap();
613
614        assert_eq!(peer.multiaddrs, vec![multiaddr.with_p2p(peer_id).unwrap()])
615    }
616
617    #[test]
618    fn skip_invalid_multiaddr() {
619        let peer_id = PeerId::random();
620        let multiaddr = "/ip6/2001:db8::/tcp/1234".parse::<Multiaddr>().unwrap();
621
622        let valid_multiaddr = multiaddr.clone().with_p2p(peer_id).unwrap();
623
624        let multiaddr_with_incorrect_peer_id = {
625            let other_peer_id = PeerId::random();
626            assert_ne!(peer_id, other_peer_id);
627            multiaddr.with_p2p(other_peer_id).unwrap()
628        };
629
630        let invalid_multiaddr = {
631            let a = vec![255; 8];
632            assert!(Multiaddr::try_from(a.clone()).is_err());
633            a
634        };
635
636        let payload = proto::Peer {
637            id: peer_id.to_bytes(),
638            addrs: vec![
639                valid_multiaddr.to_vec(),
640                multiaddr_with_incorrect_peer_id.to_vec(),
641                invalid_multiaddr,
642            ],
643            connection: proto::ConnectionType::CAN_CONNECT,
644        };
645
646        let peer = KadPeer::try_from(payload).unwrap();
647
648        assert_eq!(peer.multiaddrs, vec![valid_multiaddr])
649    }
650
651    /*// TODO: restore
652    use self::libp2p_tcp::TcpTransport;
653    use self::tokio::runtime::current_thread::Runtime;
654    use futures::{Future, Sink, Stream};
655    use libp2p_core::{PeerId, PublicKey, Transport};
656    use multihash::{encode, Hash};
657    use protocol::{ConnectionType, KadPeer, ProtocolConfig};
658    use std::sync::mpsc;
659    use std::thread;
660
661    #[test]
662    fn correct_transfer() {
663        // We open a server and a client, send a message between the two, and check that they were
664        // successfully received.
665
666        test_one(KadMsg::Ping);
667        test_one(KadMsg::FindNodeReq {
668            key: PeerId::random(),
669        });
670        test_one(KadMsg::FindNodeRes {
671            closer_peers: vec![KadPeer {
672                node_id: PeerId::random(),
673                multiaddrs: vec!["/ip4/100.101.102.103/tcp/20105".parse().unwrap()],
674                connection_ty: ConnectionType::Connected,
675            }],
676        });
677        test_one(KadMsg::GetProvidersReq {
678            key: encode(Hash::SHA2256, &[9, 12, 0, 245, 245, 201, 28, 95]).unwrap(),
679        });
680        test_one(KadMsg::GetProvidersRes {
681            closer_peers: vec![KadPeer {
682                node_id: PeerId::random(),
683                multiaddrs: vec!["/ip4/100.101.102.103/tcp/20105".parse().unwrap()],
684                connection_ty: ConnectionType::Connected,
685            }],
686            provider_peers: vec![KadPeer {
687                node_id: PeerId::random(),
688                multiaddrs: vec!["/ip4/200.201.202.203/tcp/1999".parse().unwrap()],
689                connection_ty: ConnectionType::NotConnected,
690            }],
691        });
692        test_one(KadMsg::AddProvider {
693            key: encode(Hash::SHA2256, &[9, 12, 0, 245, 245, 201, 28, 95]).unwrap(),
694            provider_peer: KadPeer {
695                node_id: PeerId::random(),
696                multiaddrs: vec!["/ip4/9.1.2.3/udp/23".parse().unwrap()],
697                connection_ty: ConnectionType::Connected,
698            },
699        });
700        // TODO: all messages
701
702        fn test_one(msg_server: KadMsg) {
703            let msg_client = msg_server.clone();
704            let (tx, rx) = mpsc::channel();
705
706            let bg_thread = thread::spawn(move || {
707                let transport = TcpTransport::default().with_upgrade(ProtocolConfig);
708
709                let (listener, addr) = transport
710                    .listen_on( "/ip4/127.0.0.1/tcp/0".parse().unwrap())
711                    .unwrap();
712                tx.send(addr).unwrap();
713
714                let future = listener
715                    .into_future()
716                    .map_err(|(err, _)| err)
717                    .and_then(|(client, _)| client.unwrap().0)
718                    .and_then(|proto| proto.into_future().map_err(|(err, _)| err).map(|(v, _)| v))
719                    .map(|recv_msg| {
720                        assert_eq!(recv_msg.unwrap(), msg_server);
721                        ()
722                    });
723                let mut rt = Runtime::new().unwrap();
724                let _ = rt.block_on(future).unwrap();
725            });
726
727            let transport = TcpTransport::default().with_upgrade(ProtocolConfig);
728
729            let future = transport
730                .dial(rx.recv().unwrap())
731                .unwrap()
732                .and_then(|proto| proto.send(msg_client))
733                .map(|_| ());
734            let mut rt = Runtime::new().unwrap();
735            let _ = rt.block_on(future).unwrap();
736            bg_thread.join().unwrap();
737        }
738    }*/
739}