p2p_chat/network/layer/
runtime.rs1use std::time::Duration;
3
4use anyhow::Result;
5use futures::StreamExt;
6use tokio::select;
7use tokio::sync::mpsc;
8use tracing::{error, info};
9
10use crate::types::Message;
11
12use super::NetworkLayer;
13
14impl NetworkLayer {
15 fn cleanup_blocked_peers(&mut self) {
17 let block_duration = Duration::from_secs(600);
18 let mut expired_peers = Vec::new();
19
20 for (&peer_id, &blocked_time) in &self.blocked_peers {
21 if blocked_time.elapsed() > block_duration {
22 expired_peers.push(peer_id);
23 }
24 }
25
26 for peer_id in expired_peers {
27 info!("Unblocking peer {} after timeout", peer_id);
28 self.blocked_peers.remove(&peer_id);
29 }
30 }
31
32 pub async fn run(&mut self, incoming_messages: mpsc::UnboundedSender<Message>) -> Result<()> {
46 info!("Starting network event loop");
47
48 let mut cleanup_timer = tokio::time::interval(Duration::from_secs(300));
49
50 loop {
51 select! {
52 event = self.swarm.select_next_some() => {
53 if let Err(e) = self.handle_swarm_event(event, &incoming_messages).await {
54 error!("Error handling swarm event: {}", e);
55 }
56 }
57
58 command = self.command_receiver.recv() => {
59 match command {
60 Some(cmd) => {
61 if let Err(e) = self.handle_command(cmd).await {
62 error!("Error handling command: {}", e);
63 }
64 }
65 None => {
66 info!("Command channel closed, shutting down network layer");
67 break;
68 }
69 }
70 }
71
72 _ = cleanup_timer.tick() => {
73 self.cleanup_blocked_peers();
74 }
75 }
76 }
77
78 Ok(())
79 }
80}