1use crate::proto;
30use crate::record::{self, Record};
31use asynchronous_codec::{Decoder, Encoder, Framed};
32use bytes::BytesMut;
33use futures::prelude::*;
34use instant::Instant;
35use libp2p_core::upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo};
36use libp2p_core::Multiaddr;
37use libp2p_identity::PeerId;
38use libp2p_swarm::StreamProtocol;
39use std::marker::PhantomData;
40use std::{convert::TryFrom, time::Duration};
41use std::{io, iter};
42use tracing::debug;
43
44pub(crate) const DEFAULT_PROTO_NAME: StreamProtocol = StreamProtocol::new("/ipfs/kad/1.0.0");
46pub(crate) const DEFAULT_MAX_PACKET_SIZE: usize = 16 * 1024;
48#[derive(Copy, Clone, PartialEq, Eq, Debug, Hash)]
50pub enum ConnectionType {
51 NotConnected = 0,
53 Connected = 1,
55 CanConnect = 2,
57 CannotConnect = 3,
59}
60
61impl From<proto::ConnectionType> for ConnectionType {
62 fn from(raw: proto::ConnectionType) -> ConnectionType {
63 use proto::ConnectionType::*;
64 match raw {
65 NOT_CONNECTED => ConnectionType::NotConnected,
66 CONNECTED => ConnectionType::Connected,
67 CAN_CONNECT => ConnectionType::CanConnect,
68 CANNOT_CONNECT => ConnectionType::CannotConnect,
69 }
70 }
71}
72
73impl From<ConnectionType> for proto::ConnectionType {
74 fn from(val: ConnectionType) -> Self {
75 use proto::ConnectionType::*;
76 match val {
77 ConnectionType::NotConnected => NOT_CONNECTED,
78 ConnectionType::Connected => CONNECTED,
79 ConnectionType::CanConnect => CAN_CONNECT,
80 ConnectionType::CannotConnect => CANNOT_CONNECT,
81 }
82 }
83}
84
85#[derive(Debug, Clone, PartialEq, Eq)]
87pub struct KadPeer {
88 pub node_id: PeerId,
90 pub multiaddrs: Vec<Multiaddr>,
92 pub connection_ty: ConnectionType,
94}
95
96impl TryFrom<proto::Peer> for KadPeer {
98 type Error = io::Error;
99
100 fn try_from(peer: proto::Peer) -> Result<KadPeer, Self::Error> {
101 let node_id = PeerId::from_bytes(&peer.id).map_err(|_| invalid_data("invalid peer id"))?;
104
105 let mut addrs = Vec::with_capacity(peer.addrs.len());
106 for addr in peer.addrs.into_iter() {
107 match Multiaddr::try_from(addr).map(|addr| addr.with_p2p(node_id)) {
108 Ok(Ok(a)) => addrs.push(a),
109 Ok(Err(a)) => {
110 debug!("Unable to parse multiaddr: {a} is not compatible with {node_id}")
111 }
112 Err(e) => debug!("Unable to parse multiaddr: {e}"),
113 };
114 }
115
116 Ok(KadPeer {
117 node_id,
118 multiaddrs: addrs,
119 connection_ty: peer.connection.into(),
120 })
121 }
122}
123
124impl From<KadPeer> for proto::Peer {
125 fn from(peer: KadPeer) -> Self {
126 proto::Peer {
127 id: peer.node_id.to_bytes(),
128 addrs: peer.multiaddrs.into_iter().map(|a| a.to_vec()).collect(),
129 connection: peer.connection_ty.into(),
130 }
131 }
132}
133
134#[derive(Debug, Clone)]
140pub struct ProtocolConfig {
141 protocol_names: Vec<StreamProtocol>,
142 max_packet_size: usize,
144}
145
146impl ProtocolConfig {
147 pub fn protocol_names(&self) -> &[StreamProtocol] {
149 &self.protocol_names
150 }
151
152 pub fn set_protocol_names(&mut self, names: Vec<StreamProtocol>) {
155 self.protocol_names = names;
156 }
157
158 pub fn set_max_packet_size(&mut self, size: usize) {
160 self.max_packet_size = size;
161 }
162}
163
164impl Default for ProtocolConfig {
165 fn default() -> Self {
166 ProtocolConfig {
167 protocol_names: iter::once(DEFAULT_PROTO_NAME).collect(),
168 max_packet_size: DEFAULT_MAX_PACKET_SIZE,
169 }
170 }
171}
172
173impl UpgradeInfo for ProtocolConfig {
174 type Info = StreamProtocol;
175 type InfoIter = std::vec::IntoIter<Self::Info>;
176
177 fn protocol_info(&self) -> Self::InfoIter {
178 self.protocol_names.clone().into_iter()
179 }
180}
181
182pub struct Codec<A, B> {
184 codec: quick_protobuf_codec::Codec<proto::Message>,
185 __phantom: PhantomData<(A, B)>,
186}
187impl<A, B> Codec<A, B> {
188 fn new(max_packet_size: usize) -> Self {
189 Codec {
190 codec: quick_protobuf_codec::Codec::new(max_packet_size),
191 __phantom: PhantomData,
192 }
193 }
194}
195
196impl<A: Into<proto::Message>, B> Encoder for Codec<A, B> {
197 type Error = io::Error;
198 type Item<'a> = A;
199
200 fn encode(&mut self, item: Self::Item<'_>, dst: &mut BytesMut) -> Result<(), Self::Error> {
201 Ok(self.codec.encode(item.into(), dst)?)
202 }
203}
204impl<A, B: TryFrom<proto::Message, Error = io::Error>> Decoder for Codec<A, B> {
205 type Error = io::Error;
206 type Item = B;
207
208 fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
209 self.codec.decode(src)?.map(B::try_from).transpose()
210 }
211}
212
213pub(crate) type KadInStreamSink<S> = Framed<S, Codec<KadResponseMsg, KadRequestMsg>>;
215pub(crate) type KadOutStreamSink<S> = Framed<S, Codec<KadRequestMsg, KadResponseMsg>>;
217
218impl<C> InboundUpgrade<C> for ProtocolConfig
219where
220 C: AsyncRead + AsyncWrite + Unpin,
221{
222 type Output = KadInStreamSink<C>;
223 type Future = future::Ready<Result<Self::Output, io::Error>>;
224 type Error = io::Error;
225
226 fn upgrade_inbound(self, incoming: C, _: Self::Info) -> Self::Future {
227 let codec = Codec::new(self.max_packet_size);
228
229 future::ok(Framed::new(incoming, codec))
230 }
231}
232
233impl<C> OutboundUpgrade<C> for ProtocolConfig
234where
235 C: AsyncRead + AsyncWrite + Unpin,
236{
237 type Output = KadOutStreamSink<C>;
238 type Future = future::Ready<Result<Self::Output, io::Error>>;
239 type Error = io::Error;
240
241 fn upgrade_outbound(self, incoming: C, _: Self::Info) -> Self::Future {
242 let codec = Codec::new(self.max_packet_size);
243
244 future::ok(Framed::new(incoming, codec))
245 }
246}
247
248#[derive(Debug, Clone, PartialEq, Eq)]
250pub enum KadRequestMsg {
251 Ping,
253
254 FindNode {
257 key: Vec<u8>,
259 },
260
261 GetProviders {
264 key: record::Key,
266 },
267
268 AddProvider {
270 key: record::Key,
272 provider: KadPeer,
274 },
275
276 GetValue {
278 key: record::Key,
280 },
281
282 PutValue { record: Record },
284}
285
286#[derive(Debug, Clone, PartialEq, Eq)]
288pub enum KadResponseMsg {
289 Pong,
291
292 FindNode {
294 closer_peers: Vec<KadPeer>,
296 },
297
298 GetProviders {
300 closer_peers: Vec<KadPeer>,
302 provider_peers: Vec<KadPeer>,
304 },
305
306 GetValue {
308 record: Option<Record>,
310 closer_peers: Vec<KadPeer>,
312 },
313
314 PutValue {
316 key: record::Key,
318 value: Vec<u8>,
320 },
321}
322
323impl From<KadRequestMsg> for proto::Message {
324 fn from(kad_msg: KadRequestMsg) -> Self {
325 req_msg_to_proto(kad_msg)
326 }
327}
328impl From<KadResponseMsg> for proto::Message {
329 fn from(kad_msg: KadResponseMsg) -> Self {
330 resp_msg_to_proto(kad_msg)
331 }
332}
333impl TryFrom<proto::Message> for KadRequestMsg {
334 type Error = io::Error;
335
336 fn try_from(message: proto::Message) -> Result<Self, Self::Error> {
337 proto_to_req_msg(message)
338 }
339}
340impl TryFrom<proto::Message> for KadResponseMsg {
341 type Error = io::Error;
342
343 fn try_from(message: proto::Message) -> Result<Self, Self::Error> {
344 proto_to_resp_msg(message)
345 }
346}
347
348fn req_msg_to_proto(kad_msg: KadRequestMsg) -> proto::Message {
350 match kad_msg {
351 KadRequestMsg::Ping => proto::Message {
352 type_pb: proto::MessageType::PING,
353 ..proto::Message::default()
354 },
355 KadRequestMsg::FindNode { key } => proto::Message {
356 type_pb: proto::MessageType::FIND_NODE,
357 key,
358 clusterLevelRaw: 10,
359 ..proto::Message::default()
360 },
361 KadRequestMsg::GetProviders { key } => proto::Message {
362 type_pb: proto::MessageType::GET_PROVIDERS,
363 key: key.to_vec(),
364 clusterLevelRaw: 10,
365 ..proto::Message::default()
366 },
367 KadRequestMsg::AddProvider { key, provider } => proto::Message {
368 type_pb: proto::MessageType::ADD_PROVIDER,
369 clusterLevelRaw: 10,
370 key: key.to_vec(),
371 providerPeers: vec![provider.into()],
372 ..proto::Message::default()
373 },
374 KadRequestMsg::GetValue { key } => proto::Message {
375 type_pb: proto::MessageType::GET_VALUE,
376 clusterLevelRaw: 10,
377 key: key.to_vec(),
378 ..proto::Message::default()
379 },
380 KadRequestMsg::PutValue { record } => proto::Message {
381 type_pb: proto::MessageType::PUT_VALUE,
382 key: record.key.to_vec(),
383 record: Some(record_to_proto(record)),
384 ..proto::Message::default()
385 },
386 }
387}
388
389fn resp_msg_to_proto(kad_msg: KadResponseMsg) -> proto::Message {
391 match kad_msg {
392 KadResponseMsg::Pong => proto::Message {
393 type_pb: proto::MessageType::PING,
394 ..proto::Message::default()
395 },
396 KadResponseMsg::FindNode { closer_peers } => proto::Message {
397 type_pb: proto::MessageType::FIND_NODE,
398 clusterLevelRaw: 9,
399 closerPeers: closer_peers.into_iter().map(KadPeer::into).collect(),
400 ..proto::Message::default()
401 },
402 KadResponseMsg::GetProviders {
403 closer_peers,
404 provider_peers,
405 } => proto::Message {
406 type_pb: proto::MessageType::GET_PROVIDERS,
407 clusterLevelRaw: 9,
408 closerPeers: closer_peers.into_iter().map(KadPeer::into).collect(),
409 providerPeers: provider_peers.into_iter().map(KadPeer::into).collect(),
410 ..proto::Message::default()
411 },
412 KadResponseMsg::GetValue {
413 record,
414 closer_peers,
415 } => proto::Message {
416 type_pb: proto::MessageType::GET_VALUE,
417 clusterLevelRaw: 9,
418 closerPeers: closer_peers.into_iter().map(KadPeer::into).collect(),
419 record: record.map(record_to_proto),
420 ..proto::Message::default()
421 },
422 KadResponseMsg::PutValue { key, value } => proto::Message {
423 type_pb: proto::MessageType::PUT_VALUE,
424 key: key.to_vec(),
425 record: Some(proto::Record {
426 key: key.to_vec(),
427 value,
428 ..proto::Record::default()
429 }),
430 ..proto::Message::default()
431 },
432 }
433}
434
435fn proto_to_req_msg(message: proto::Message) -> Result<KadRequestMsg, io::Error> {
439 match message.type_pb {
440 proto::MessageType::PING => Ok(KadRequestMsg::Ping),
441 proto::MessageType::PUT_VALUE => {
442 let record = record_from_proto(message.record.unwrap_or_default())?;
443 Ok(KadRequestMsg::PutValue { record })
444 }
445 proto::MessageType::GET_VALUE => Ok(KadRequestMsg::GetValue {
446 key: record::Key::from(message.key),
447 }),
448 proto::MessageType::FIND_NODE => Ok(KadRequestMsg::FindNode { key: message.key }),
449 proto::MessageType::GET_PROVIDERS => Ok(KadRequestMsg::GetProviders {
450 key: record::Key::from(message.key),
451 }),
452 proto::MessageType::ADD_PROVIDER => {
453 let provider = message
457 .providerPeers
458 .into_iter()
459 .find_map(|peer| KadPeer::try_from(peer).ok());
460
461 if let Some(provider) = provider {
462 let key = record::Key::from(message.key);
463 Ok(KadRequestMsg::AddProvider { key, provider })
464 } else {
465 Err(invalid_data("AddProvider message with no valid peer."))
466 }
467 }
468 }
469}
470
471fn proto_to_resp_msg(message: proto::Message) -> Result<KadResponseMsg, io::Error> {
475 match message.type_pb {
476 proto::MessageType::PING => Ok(KadResponseMsg::Pong),
477 proto::MessageType::GET_VALUE => {
478 let record = if let Some(r) = message.record {
479 Some(record_from_proto(r)?)
480 } else {
481 None
482 };
483
484 let closer_peers = message
485 .closerPeers
486 .into_iter()
487 .filter_map(|peer| KadPeer::try_from(peer).ok())
488 .collect();
489
490 Ok(KadResponseMsg::GetValue {
491 record,
492 closer_peers,
493 })
494 }
495
496 proto::MessageType::FIND_NODE => {
497 let closer_peers = message
498 .closerPeers
499 .into_iter()
500 .filter_map(|peer| KadPeer::try_from(peer).ok())
501 .collect();
502
503 Ok(KadResponseMsg::FindNode { closer_peers })
504 }
505
506 proto::MessageType::GET_PROVIDERS => {
507 let closer_peers = message
508 .closerPeers
509 .into_iter()
510 .filter_map(|peer| KadPeer::try_from(peer).ok())
511 .collect();
512
513 let provider_peers = message
514 .providerPeers
515 .into_iter()
516 .filter_map(|peer| KadPeer::try_from(peer).ok())
517 .collect();
518
519 Ok(KadResponseMsg::GetProviders {
520 closer_peers,
521 provider_peers,
522 })
523 }
524
525 proto::MessageType::PUT_VALUE => {
526 let key = record::Key::from(message.key);
527 let rec = message
528 .record
529 .ok_or_else(|| invalid_data("received PutValue message with no record"))?;
530
531 Ok(KadResponseMsg::PutValue {
532 key,
533 value: rec.value,
534 })
535 }
536
537 proto::MessageType::ADD_PROVIDER => {
538 Err(invalid_data("received an unexpected AddProvider message"))
539 }
540 }
541}
542
543fn record_from_proto(record: proto::Record) -> Result<Record, io::Error> {
544 let key = record::Key::from(record.key);
545 let value = record.value;
546
547 let publisher = if !record.publisher.is_empty() {
548 PeerId::from_bytes(&record.publisher)
549 .map(Some)
550 .map_err(|_| invalid_data("Invalid publisher peer ID."))?
551 } else {
552 None
553 };
554
555 let expires = if record.ttl > 0 {
556 Some(Instant::now() + Duration::from_secs(record.ttl as u64))
557 } else {
558 None
559 };
560
561 Ok(Record {
562 key,
563 value,
564 publisher,
565 expires,
566 })
567}
568
569fn record_to_proto(record: Record) -> proto::Record {
570 proto::Record {
571 key: record.key.to_vec(),
572 value: record.value,
573 publisher: record.publisher.map(|id| id.to_bytes()).unwrap_or_default(),
574 ttl: record
575 .expires
576 .map(|t| {
577 let now = Instant::now();
578 if t > now {
579 (t - now).as_secs() as u32
580 } else {
581 1 }
583 })
584 .unwrap_or(0),
585 timeReceived: String::new(),
586 }
587}
588
589fn invalid_data<E>(e: E) -> io::Error
591where
592 E: Into<Box<dyn std::error::Error + Send + Sync>>,
593{
594 io::Error::new(io::ErrorKind::InvalidData, e)
595}
596
597#[cfg(test)]
598mod tests {
599 use super::*;
600
601 #[test]
602 fn append_p2p() {
603 let peer_id = PeerId::random();
604 let multiaddr = "/ip6/2001:db8::/tcp/1234".parse::<Multiaddr>().unwrap();
605
606 let payload = proto::Peer {
607 id: peer_id.to_bytes(),
608 addrs: vec![multiaddr.to_vec()],
609 connection: proto::ConnectionType::CAN_CONNECT,
610 };
611
612 let peer = KadPeer::try_from(payload).unwrap();
613
614 assert_eq!(peer.multiaddrs, vec![multiaddr.with_p2p(peer_id).unwrap()])
615 }
616
617 #[test]
618 fn skip_invalid_multiaddr() {
619 let peer_id = PeerId::random();
620 let multiaddr = "/ip6/2001:db8::/tcp/1234".parse::<Multiaddr>().unwrap();
621
622 let valid_multiaddr = multiaddr.clone().with_p2p(peer_id).unwrap();
623
624 let multiaddr_with_incorrect_peer_id = {
625 let other_peer_id = PeerId::random();
626 assert_ne!(peer_id, other_peer_id);
627 multiaddr.with_p2p(other_peer_id).unwrap()
628 };
629
630 let invalid_multiaddr = {
631 let a = vec![255; 8];
632 assert!(Multiaddr::try_from(a.clone()).is_err());
633 a
634 };
635
636 let payload = proto::Peer {
637 id: peer_id.to_bytes(),
638 addrs: vec![
639 valid_multiaddr.to_vec(),
640 multiaddr_with_incorrect_peer_id.to_vec(),
641 invalid_multiaddr,
642 ],
643 connection: proto::ConnectionType::CAN_CONNECT,
644 };
645
646 let peer = KadPeer::try_from(payload).unwrap();
647
648 assert_eq!(peer.multiaddrs, vec![valid_multiaddr])
649 }
650
651 }