libp2p_mdns/behaviour/
iface.rs

1// Copyright 2018 Parity Technologies (UK) Ltd.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21mod dns;
22mod query;
23
24use self::dns::{build_query, build_query_response, build_service_discovery_response};
25use self::query::MdnsPacket;
26use crate::behaviour::{socket::AsyncSocket, timer::Builder};
27use crate::Config;
28use futures::channel::mpsc;
29use futures::{SinkExt, StreamExt};
30use libp2p_core::Multiaddr;
31use libp2p_identity::PeerId;
32use libp2p_swarm::ListenAddresses;
33use socket2::{Domain, Socket, Type};
34use std::future::Future;
35use std::sync::{Arc, RwLock};
36use std::{
37    collections::VecDeque,
38    io,
39    net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, UdpSocket},
40    pin::Pin,
41    task::{Context, Poll},
42    time::{Duration, Instant},
43};
44
45/// Initial interval for starting probe
46const INITIAL_TIMEOUT_INTERVAL: Duration = Duration::from_millis(500);
47
48#[derive(Debug, Clone)]
49enum ProbeState {
50    Probing(Duration),
51    Finished(Duration),
52}
53
54impl Default for ProbeState {
55    fn default() -> Self {
56        ProbeState::Probing(INITIAL_TIMEOUT_INTERVAL)
57    }
58}
59
60impl ProbeState {
61    fn interval(&self) -> &Duration {
62        match self {
63            ProbeState::Probing(query_interval) => query_interval,
64            ProbeState::Finished(query_interval) => query_interval,
65        }
66    }
67}
68
69/// An mDNS instance for a networking interface. To discover all peers when having multiple
70/// interfaces an [`InterfaceState`] is required for each interface.
71#[derive(Debug)]
72pub(crate) struct InterfaceState<U, T> {
73    /// Address this instance is bound to.
74    addr: IpAddr,
75    /// Receive socket.
76    recv_socket: U,
77    /// Send socket.
78    send_socket: U,
79
80    listen_addresses: Arc<RwLock<ListenAddresses>>,
81
82    query_response_sender: mpsc::Sender<(PeerId, Multiaddr, Instant)>,
83
84    /// Buffer used for receiving data from the main socket.
85    /// RFC6762 discourages packets larger than the interface MTU, but allows sizes of up to 9000
86    /// bytes, if it can be ensured that all participating devices can handle such large packets.
87    /// For computers with several interfaces and IP addresses responses can easily reach sizes in
88    /// the range of 3000 bytes, so 4096 seems sensible for now. For more information see
89    /// [rfc6762](https://tools.ietf.org/html/rfc6762#page-46).
90    recv_buffer: [u8; 4096],
91    /// Buffers pending to send on the main socket.
92    send_buffer: VecDeque<Vec<u8>>,
93    /// Discovery interval.
94    query_interval: Duration,
95    /// Discovery timer.
96    timeout: T,
97    /// Multicast address.
98    multicast_addr: IpAddr,
99    /// Discovered addresses.
100    discovered: VecDeque<(PeerId, Multiaddr, Instant)>,
101    /// TTL
102    ttl: Duration,
103    probe_state: ProbeState,
104    local_peer_id: PeerId,
105}
106
107impl<U, T> InterfaceState<U, T>
108where
109    U: AsyncSocket,
110    T: Builder + futures::Stream,
111{
112    /// Builds a new [`InterfaceState`].
113    pub(crate) fn new(
114        addr: IpAddr,
115        config: Config,
116        local_peer_id: PeerId,
117        listen_addresses: Arc<RwLock<ListenAddresses>>,
118        query_response_sender: mpsc::Sender<(PeerId, Multiaddr, Instant)>,
119    ) -> io::Result<Self> {
120        tracing::info!(address=%addr, "creating instance on iface address");
121        let recv_socket = match addr {
122            IpAddr::V4(addr) => {
123                let socket = Socket::new(Domain::IPV4, Type::DGRAM, Some(socket2::Protocol::UDP))?;
124                socket.set_reuse_address(true)?;
125                #[cfg(unix)]
126                socket.set_reuse_port(true)?;
127                socket.bind(&SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 5353).into())?;
128                socket.set_multicast_loop_v4(true)?;
129                socket.set_multicast_ttl_v4(255)?;
130                socket.join_multicast_v4(&crate::IPV4_MDNS_MULTICAST_ADDRESS, &addr)?;
131                U::from_std(UdpSocket::from(socket))?
132            }
133            IpAddr::V6(_) => {
134                let socket = Socket::new(Domain::IPV6, Type::DGRAM, Some(socket2::Protocol::UDP))?;
135                socket.set_reuse_address(true)?;
136                #[cfg(unix)]
137                socket.set_reuse_port(true)?;
138                socket.bind(&SocketAddr::new(IpAddr::V6(Ipv6Addr::UNSPECIFIED), 5353).into())?;
139                socket.set_multicast_loop_v6(true)?;
140                // TODO: find interface matching addr.
141                socket.join_multicast_v6(&crate::IPV6_MDNS_MULTICAST_ADDRESS, 0)?;
142                U::from_std(UdpSocket::from(socket))?
143            }
144        };
145        let bind_addr = match addr {
146            IpAddr::V4(_) => SocketAddr::new(addr, 0),
147            IpAddr::V6(_addr) => {
148                // TODO: if-watch should return the scope_id of an address
149                // as a workaround we bind to unspecified, which means that
150                // this probably won't work when using multiple interfaces.
151                // SocketAddr::V6(SocketAddrV6::new(addr, 0, 0, scope_id))
152                SocketAddr::new(IpAddr::V6(Ipv6Addr::UNSPECIFIED), 0)
153            }
154        };
155        let send_socket = U::from_std(UdpSocket::bind(bind_addr)?)?;
156
157        // randomize timer to prevent all converging and firing at the same time.
158        let query_interval = {
159            use rand::Rng;
160            let mut rng = rand::thread_rng();
161            let jitter = rng.gen_range(0..100);
162            config.query_interval + Duration::from_millis(jitter)
163        };
164        let multicast_addr = match addr {
165            IpAddr::V4(_) => IpAddr::V4(crate::IPV4_MDNS_MULTICAST_ADDRESS),
166            IpAddr::V6(_) => IpAddr::V6(crate::IPV6_MDNS_MULTICAST_ADDRESS),
167        };
168        Ok(Self {
169            addr,
170            recv_socket,
171            send_socket,
172            listen_addresses,
173            query_response_sender,
174            recv_buffer: [0; 4096],
175            send_buffer: Default::default(),
176            discovered: Default::default(),
177            query_interval,
178            timeout: T::interval_at(Instant::now(), INITIAL_TIMEOUT_INTERVAL),
179            multicast_addr,
180            ttl: config.ttl,
181            probe_state: Default::default(),
182            local_peer_id,
183        })
184    }
185
186    pub(crate) fn reset_timer(&mut self) {
187        tracing::trace!(address=%self.addr, probe_state=?self.probe_state, "reset timer");
188        let interval = *self.probe_state.interval();
189        self.timeout = T::interval(interval);
190    }
191
192    fn mdns_socket(&self) -> SocketAddr {
193        SocketAddr::new(self.multicast_addr, 5353)
194    }
195}
196
197impl<U, T> Future for InterfaceState<U, T>
198where
199    U: AsyncSocket,
200    T: Builder + futures::Stream,
201{
202    type Output = ();
203
204    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
205        let this = self.get_mut();
206
207        loop {
208            // 1st priority: Low latency: Create packet ASAP after timeout.
209            if this.timeout.poll_next_unpin(cx).is_ready() {
210                tracing::trace!(address=%this.addr, "sending query on iface");
211                this.send_buffer.push_back(build_query());
212                tracing::trace!(address=%this.addr, probe_state=?this.probe_state, "tick");
213
214                // Stop to probe when the initial interval reach the query interval
215                if let ProbeState::Probing(interval) = this.probe_state {
216                    let interval = interval * 2;
217                    this.probe_state = if interval >= this.query_interval {
218                        ProbeState::Finished(this.query_interval)
219                    } else {
220                        ProbeState::Probing(interval)
221                    };
222                }
223
224                this.reset_timer();
225            }
226
227            // 2nd priority: Keep local buffers small: Send packets to remote.
228            if let Some(packet) = this.send_buffer.pop_front() {
229                match this.send_socket.poll_write(cx, &packet, this.mdns_socket()) {
230                    Poll::Ready(Ok(_)) => {
231                        tracing::trace!(address=%this.addr, "sent packet on iface address");
232                        continue;
233                    }
234                    Poll::Ready(Err(err)) => {
235                        tracing::error!(address=%this.addr, "error sending packet on iface address {}", err);
236                        continue;
237                    }
238                    Poll::Pending => {
239                        this.send_buffer.push_front(packet);
240                    }
241                }
242            }
243
244            // 3rd priority: Keep local buffers small: Return discovered addresses.
245            if this.query_response_sender.poll_ready_unpin(cx).is_ready() {
246                if let Some(discovered) = this.discovered.pop_front() {
247                    match this.query_response_sender.try_send(discovered) {
248                        Ok(()) => {}
249                        Err(e) if e.is_disconnected() => {
250                            return Poll::Ready(());
251                        }
252                        Err(e) => {
253                            this.discovered.push_front(e.into_inner());
254                        }
255                    }
256
257                    continue;
258                }
259            }
260
261            // 4th priority: Remote work: Answer incoming requests.
262            match this
263                .recv_socket
264                .poll_read(cx, &mut this.recv_buffer)
265                .map_ok(|(len, from)| MdnsPacket::new_from_bytes(&this.recv_buffer[..len], from))
266            {
267                Poll::Ready(Ok(Ok(Some(MdnsPacket::Query(query))))) => {
268                    tracing::trace!(
269                        address=%this.addr,
270                        remote_address=%query.remote_addr(),
271                        "received query from remote address on address"
272                    );
273
274                    this.send_buffer.extend(build_query_response(
275                        query.query_id(),
276                        this.local_peer_id,
277                        this.listen_addresses
278                            .read()
279                            .unwrap_or_else(|e| e.into_inner())
280                            .iter(),
281                        this.ttl,
282                    ));
283                    continue;
284                }
285                Poll::Ready(Ok(Ok(Some(MdnsPacket::Response(response))))) => {
286                    tracing::trace!(
287                        address=%this.addr,
288                        remote_address=%response.remote_addr(),
289                        "received response from remote address on address"
290                    );
291
292                    this.discovered
293                        .extend(response.extract_discovered(Instant::now(), this.local_peer_id));
294
295                    // Stop probing when we have a valid response
296                    if !this.discovered.is_empty() {
297                        this.probe_state = ProbeState::Finished(this.query_interval);
298                        this.reset_timer();
299                    }
300                    continue;
301                }
302                Poll::Ready(Ok(Ok(Some(MdnsPacket::ServiceDiscovery(disc))))) => {
303                    tracing::trace!(
304                        address=%this.addr,
305                        remote_address=%disc.remote_addr(),
306                        "received service discovery from remote address on address"
307                    );
308
309                    this.send_buffer
310                        .push_back(build_service_discovery_response(disc.query_id(), this.ttl));
311                    continue;
312                }
313                Poll::Ready(Err(err)) if err.kind() == std::io::ErrorKind::WouldBlock => {
314                    // No more bytes available on the socket to read
315                    continue;
316                }
317                Poll::Ready(Err(err)) => {
318                    tracing::error!("failed reading datagram: {}", err);
319                    return Poll::Ready(());
320                }
321                Poll::Ready(Ok(Err(err))) => {
322                    tracing::debug!("Parsing mdns packet failed: {:?}", err);
323                    continue;
324                }
325                Poll::Ready(Ok(Ok(None))) => continue,
326                Poll::Pending => {}
327            }
328
329            return Poll::Pending;
330        }
331    }
332}