p2p_chat/network/layer/
runtime.rs

1//! This module contains the runtime logic for the `NetworkLayer`.
2use std::time::Duration;
3
4use anyhow::Result;
5use futures::StreamExt;
6use tokio::select;
7use tokio::sync::mpsc;
8use tracing::{error, info};
9
10use crate::types::Message;
11
12use super::NetworkLayer;
13
14impl NetworkLayer {
15    /// Periodically cleans up the list of blocked peers.
16    fn cleanup_blocked_peers(&mut self) {
17        let block_duration = Duration::from_secs(600);
18        let mut expired_peers = Vec::new();
19
20        for (&peer_id, &blocked_time) in &self.blocked_peers {
21            if blocked_time.elapsed() > block_duration {
22                expired_peers.push(peer_id);
23            }
24        }
25
26        for peer_id in expired_peers {
27            info!("Unblocking peer {} after timeout", peer_id);
28            self.blocked_peers.remove(&peer_id);
29        }
30    }
31
32    /// Runs the main event loop for the `NetworkLayer`.
33    ///
34    /// This function listens for events from the `libp2p` `Swarm` and for
35    /// commands from other parts of the application. It also periodically
36    /// cleans up the list of blocked peers.
37    ///
38    /// # Arguments
39    ///
40    /// * `incoming_messages` - The sender for incoming chat messages.
41    ///
42    /// # Errors
43    ///
44    /// This function will return an error if the event loop fails.
45    pub async fn run(&mut self, incoming_messages: mpsc::UnboundedSender<Message>) -> Result<()> {
46        info!("Starting network event loop");
47
48        let mut cleanup_timer = tokio::time::interval(Duration::from_secs(300));
49
50        loop {
51            select! {
52                event = self.swarm.select_next_some() => {
53                    if let Err(e) = self.handle_swarm_event(event, &incoming_messages).await {
54                        error!("Error handling swarm event: {}", e);
55                    }
56                }
57
58                command = self.command_receiver.recv() => {
59                    match command {
60                        Some(cmd) => {
61                            if let Err(e) = self.handle_command(cmd).await {
62                                error!("Error handling command: {}", e);
63                            }
64                        }
65                        None => {
66                            info!("Command channel closed, shutting down network layer");
67                            break;
68                        }
69                    }
70                }
71
72                _ = cleanup_timer.tick() => {
73                    self.cleanup_blocked_peers();
74                }
75            }
76        }
77
78        Ok(())
79    }
80}