1mod dns;
22mod query;
23
24use self::dns::{build_query, build_query_response, build_service_discovery_response};
25use self::query::MdnsPacket;
26use crate::behaviour::{socket::AsyncSocket, timer::Builder};
27use crate::Config;
28use futures::channel::mpsc;
29use futures::{SinkExt, StreamExt};
30use libp2p_core::Multiaddr;
31use libp2p_identity::PeerId;
32use libp2p_swarm::ListenAddresses;
33use socket2::{Domain, Socket, Type};
34use std::future::Future;
35use std::sync::{Arc, RwLock};
36use std::{
37 collections::VecDeque,
38 io,
39 net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, UdpSocket},
40 pin::Pin,
41 task::{Context, Poll},
42 time::{Duration, Instant},
43};
44
45const INITIAL_TIMEOUT_INTERVAL: Duration = Duration::from_millis(500);
47
48#[derive(Debug, Clone)]
49enum ProbeState {
50 Probing(Duration),
51 Finished(Duration),
52}
53
54impl Default for ProbeState {
55 fn default() -> Self {
56 ProbeState::Probing(INITIAL_TIMEOUT_INTERVAL)
57 }
58}
59
60impl ProbeState {
61 fn interval(&self) -> &Duration {
62 match self {
63 ProbeState::Probing(query_interval) => query_interval,
64 ProbeState::Finished(query_interval) => query_interval,
65 }
66 }
67}
68
69#[derive(Debug)]
72pub(crate) struct InterfaceState<U, T> {
73 addr: IpAddr,
75 recv_socket: U,
77 send_socket: U,
79
80 listen_addresses: Arc<RwLock<ListenAddresses>>,
81
82 query_response_sender: mpsc::Sender<(PeerId, Multiaddr, Instant)>,
83
84 recv_buffer: [u8; 4096],
91 send_buffer: VecDeque<Vec<u8>>,
93 query_interval: Duration,
95 timeout: T,
97 multicast_addr: IpAddr,
99 discovered: VecDeque<(PeerId, Multiaddr, Instant)>,
101 ttl: Duration,
103 probe_state: ProbeState,
104 local_peer_id: PeerId,
105}
106
107impl<U, T> InterfaceState<U, T>
108where
109 U: AsyncSocket,
110 T: Builder + futures::Stream,
111{
112 pub(crate) fn new(
114 addr: IpAddr,
115 config: Config,
116 local_peer_id: PeerId,
117 listen_addresses: Arc<RwLock<ListenAddresses>>,
118 query_response_sender: mpsc::Sender<(PeerId, Multiaddr, Instant)>,
119 ) -> io::Result<Self> {
120 tracing::info!(address=%addr, "creating instance on iface address");
121 let recv_socket = match addr {
122 IpAddr::V4(addr) => {
123 let socket = Socket::new(Domain::IPV4, Type::DGRAM, Some(socket2::Protocol::UDP))?;
124 socket.set_reuse_address(true)?;
125 #[cfg(unix)]
126 socket.set_reuse_port(true)?;
127 socket.bind(&SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 5353).into())?;
128 socket.set_multicast_loop_v4(true)?;
129 socket.set_multicast_ttl_v4(255)?;
130 socket.join_multicast_v4(&crate::IPV4_MDNS_MULTICAST_ADDRESS, &addr)?;
131 U::from_std(UdpSocket::from(socket))?
132 }
133 IpAddr::V6(_) => {
134 let socket = Socket::new(Domain::IPV6, Type::DGRAM, Some(socket2::Protocol::UDP))?;
135 socket.set_reuse_address(true)?;
136 #[cfg(unix)]
137 socket.set_reuse_port(true)?;
138 socket.bind(&SocketAddr::new(IpAddr::V6(Ipv6Addr::UNSPECIFIED), 5353).into())?;
139 socket.set_multicast_loop_v6(true)?;
140 socket.join_multicast_v6(&crate::IPV6_MDNS_MULTICAST_ADDRESS, 0)?;
142 U::from_std(UdpSocket::from(socket))?
143 }
144 };
145 let bind_addr = match addr {
146 IpAddr::V4(_) => SocketAddr::new(addr, 0),
147 IpAddr::V6(_addr) => {
148 SocketAddr::new(IpAddr::V6(Ipv6Addr::UNSPECIFIED), 0)
153 }
154 };
155 let send_socket = U::from_std(UdpSocket::bind(bind_addr)?)?;
156
157 let query_interval = {
159 use rand::Rng;
160 let mut rng = rand::thread_rng();
161 let jitter = rng.gen_range(0..100);
162 config.query_interval + Duration::from_millis(jitter)
163 };
164 let multicast_addr = match addr {
165 IpAddr::V4(_) => IpAddr::V4(crate::IPV4_MDNS_MULTICAST_ADDRESS),
166 IpAddr::V6(_) => IpAddr::V6(crate::IPV6_MDNS_MULTICAST_ADDRESS),
167 };
168 Ok(Self {
169 addr,
170 recv_socket,
171 send_socket,
172 listen_addresses,
173 query_response_sender,
174 recv_buffer: [0; 4096],
175 send_buffer: Default::default(),
176 discovered: Default::default(),
177 query_interval,
178 timeout: T::interval_at(Instant::now(), INITIAL_TIMEOUT_INTERVAL),
179 multicast_addr,
180 ttl: config.ttl,
181 probe_state: Default::default(),
182 local_peer_id,
183 })
184 }
185
186 pub(crate) fn reset_timer(&mut self) {
187 tracing::trace!(address=%self.addr, probe_state=?self.probe_state, "reset timer");
188 let interval = *self.probe_state.interval();
189 self.timeout = T::interval(interval);
190 }
191
192 fn mdns_socket(&self) -> SocketAddr {
193 SocketAddr::new(self.multicast_addr, 5353)
194 }
195}
196
197impl<U, T> Future for InterfaceState<U, T>
198where
199 U: AsyncSocket,
200 T: Builder + futures::Stream,
201{
202 type Output = ();
203
204 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
205 let this = self.get_mut();
206
207 loop {
208 if this.timeout.poll_next_unpin(cx).is_ready() {
210 tracing::trace!(address=%this.addr, "sending query on iface");
211 this.send_buffer.push_back(build_query());
212 tracing::trace!(address=%this.addr, probe_state=?this.probe_state, "tick");
213
214 if let ProbeState::Probing(interval) = this.probe_state {
216 let interval = interval * 2;
217 this.probe_state = if interval >= this.query_interval {
218 ProbeState::Finished(this.query_interval)
219 } else {
220 ProbeState::Probing(interval)
221 };
222 }
223
224 this.reset_timer();
225 }
226
227 if let Some(packet) = this.send_buffer.pop_front() {
229 match this.send_socket.poll_write(cx, &packet, this.mdns_socket()) {
230 Poll::Ready(Ok(_)) => {
231 tracing::trace!(address=%this.addr, "sent packet on iface address");
232 continue;
233 }
234 Poll::Ready(Err(err)) => {
235 tracing::error!(address=%this.addr, "error sending packet on iface address {}", err);
236 continue;
237 }
238 Poll::Pending => {
239 this.send_buffer.push_front(packet);
240 }
241 }
242 }
243
244 if this.query_response_sender.poll_ready_unpin(cx).is_ready() {
246 if let Some(discovered) = this.discovered.pop_front() {
247 match this.query_response_sender.try_send(discovered) {
248 Ok(()) => {}
249 Err(e) if e.is_disconnected() => {
250 return Poll::Ready(());
251 }
252 Err(e) => {
253 this.discovered.push_front(e.into_inner());
254 }
255 }
256
257 continue;
258 }
259 }
260
261 match this
263 .recv_socket
264 .poll_read(cx, &mut this.recv_buffer)
265 .map_ok(|(len, from)| MdnsPacket::new_from_bytes(&this.recv_buffer[..len], from))
266 {
267 Poll::Ready(Ok(Ok(Some(MdnsPacket::Query(query))))) => {
268 tracing::trace!(
269 address=%this.addr,
270 remote_address=%query.remote_addr(),
271 "received query from remote address on address"
272 );
273
274 this.send_buffer.extend(build_query_response(
275 query.query_id(),
276 this.local_peer_id,
277 this.listen_addresses
278 .read()
279 .unwrap_or_else(|e| e.into_inner())
280 .iter(),
281 this.ttl,
282 ));
283 continue;
284 }
285 Poll::Ready(Ok(Ok(Some(MdnsPacket::Response(response))))) => {
286 tracing::trace!(
287 address=%this.addr,
288 remote_address=%response.remote_addr(),
289 "received response from remote address on address"
290 );
291
292 this.discovered
293 .extend(response.extract_discovered(Instant::now(), this.local_peer_id));
294
295 if !this.discovered.is_empty() {
297 this.probe_state = ProbeState::Finished(this.query_interval);
298 this.reset_timer();
299 }
300 continue;
301 }
302 Poll::Ready(Ok(Ok(Some(MdnsPacket::ServiceDiscovery(disc))))) => {
303 tracing::trace!(
304 address=%this.addr,
305 remote_address=%disc.remote_addr(),
306 "received service discovery from remote address on address"
307 );
308
309 this.send_buffer
310 .push_back(build_service_discovery_response(disc.query_id(), this.ttl));
311 continue;
312 }
313 Poll::Ready(Err(err)) if err.kind() == std::io::ErrorKind::WouldBlock => {
314 continue;
316 }
317 Poll::Ready(Err(err)) => {
318 tracing::error!("failed reading datagram: {}", err);
319 return Poll::Ready(());
320 }
321 Poll::Ready(Ok(Err(err))) => {
322 tracing::debug!("Parsing mdns packet failed: {:?}", err);
323 continue;
324 }
325 Poll::Ready(Ok(Ok(None))) => continue,
326 Poll::Pending => {}
327 }
328
329 return Poll::Pending;
330 }
331 }
332}