p2p_chat/
mailbox.rs

1//! This module defines the `MailboxNode`, which is responsible for storing and
2//! forwarding messages for other peers in the network.
3use crate::crypto::{Identity, StorageEncryption};
4use crate::network::NetworkLayer;
5use crate::storage::{MailboxStore, SledMailboxStore};
6use anyhow::Result;
7use libp2p::kad;
8use std::sync::Arc;
9use std::time::Duration;
10use tokio::sync::mpsc;
11use tokio::time::interval;
12use tracing::{debug, error, info, trace};
13
14/// Represents a mailbox node in the network.
15///
16/// A mailbox node stores messages for other peers and forwards them upon request.
17pub struct MailboxNode {
18    /// The identity of the mailbox node.
19    pub identity: Arc<Identity>,
20    /// The storage for mailbox messages.
21    pub storage: Arc<SledMailboxStore>,
22    /// The maximum number of messages to store per user.
23    pub max_storage_per_user: usize,
24    /// The duration for which messages are retained.
25    pub retention_period: Duration,
26}
27
28impl MailboxNode {
29    /// Creates a new `MailboxNode`.
30    ///
31    /// # Arguments
32    ///
33    /// * `identity` - The identity of the node.
34    /// * `db` - The database instance for storing mailbox data.
35    /// * `encryption` - The encryption key for the storage.
36    /// * `max_storage_per_user` - The maximum number of messages to store per user.
37    /// * `retention_period` - The duration for which messages are retained.
38    ///
39    /// # Errors
40    ///
41    /// This function will return an error if the mailbox storage cannot be created.
42    pub fn new(
43        identity: Arc<Identity>,
44        db: sled::Db,
45        encryption: Option<StorageEncryption>,
46        max_storage_per_user: usize,
47        retention_period: Duration,
48    ) -> Result<Self> {
49        let storage = Arc::new(SledMailboxStore::new(db, encryption, max_storage_per_user)?);
50
51        Ok(Self {
52            identity,
53            storage,
54            max_storage_per_user,
55            retention_period,
56        })
57    }
58
59    /// Runs the mailbox node with the given network layer.
60    ///
61    /// This function starts the mailbox node and its associated tasks, such as
62    /// the cleanup task and the network event loop.
63    ///
64    /// # Arguments
65    ///
66    /// * `network_layer` - The network layer to use for communication.
67    ///
68    /// # Errors
69    ///
70    /// This function will return an error if the mailbox node fails to run.
71    pub async fn run_with_network(&mut self, network_layer: NetworkLayer) -> Result<()> {
72        info!(
73            "Starting mailbox node with network layer: {}",
74            self.identity.peer_id
75        );
76        info!(
77            "Max storage per user: {} messages",
78            self.max_storage_per_user
79        );
80        info!("Retention period: {:?}", self.retention_period);
81
82        // Start the cleanup task.
83        let storage_clone = self.storage.clone();
84        let retention_period = self.retention_period;
85        tokio::spawn(async move {
86            Self::cleanup_task(storage_clone, retention_period).await;
87        });
88
89        // Channel for incoming messages (mailbox nodes don't need to handle chat messages).
90        let (_incoming_tx, _incoming_rx) = mpsc::unbounded_channel::<crate::types::Message>();
91
92        // Start network layer with mailbox request handling.
93        let storage_for_network = self.storage.clone();
94        tokio::spawn(async move {
95            // Custom network event loop for mailbox node.
96            if let Err(e) = Self::run_mailbox_network_loop(network_layer, storage_for_network).await
97            {
98                error!("Mailbox network loop error: {}", e);
99            }
100        });
101
102        // Keep the main task alive.
103        loop {
104            tokio::time::sleep(Duration::from_secs(60)).await;
105            info!("Mailbox node still running...");
106        }
107    }
108
109    /// Runs the network event loop for the mailbox node.
110    async fn run_mailbox_network_loop(
111        mut network_layer: NetworkLayer,
112        _storage: Arc<SledMailboxStore>,
113    ) -> Result<()> {
114        info!("Starting mailbox network event loop");
115
116        // Register as a general mailbox provider in the DHT.
117        if let Err(e) = network_layer.start_providing_mailbox() {
118            error!("Failed to register as mailbox provider: {}", e);
119        } else {
120            info!("Successfully registered as mailbox provider in DHT");
121        }
122
123        let (incoming_tx, mut incoming_rx) = mpsc::unbounded_channel();
124
125        tokio::spawn(async move {
126            while let Some(_message) = incoming_rx.recv().await {
127                // Mailbox nodes don't typically handle chat messages directly.
128                debug!("Received message in mailbox node (ignoring)");
129            }
130        });
131
132        // Run the network layer.
133        network_layer.run(incoming_tx).await
134    }
135
136    /// Periodically cleans up expired messages from the storage.
137    async fn cleanup_task(storage: Arc<SledMailboxStore>, retention_period: Duration) {
138        let mut cleanup_interval = interval(Duration::from_secs(60 * 60)); // 1 hour
139
140        info!(
141            "Starting cleanup task with retention period: {:?}",
142            retention_period
143        );
144
145        loop {
146            cleanup_interval.tick().await;
147
148            trace!("Running message cleanup");
149
150            if let Err(e) = storage.cleanup_expired(retention_period).await {
151                error!("Cleanup failed: {}", e);
152            } else {
153                trace!("Cleanup completed successfully");
154            }
155        }
156    }
157
158    /// Returns statistics about the mailbox node.
159    pub fn get_stats(&self) -> MailboxStats {
160        MailboxStats {
161            max_storage_per_user: self.max_storage_per_user,
162            retention_period: self.retention_period,
163        }
164    }
165}
166
167/// Contains statistics about a mailbox node.
168#[derive(Debug)]
169pub struct MailboxStats {
170    /// The maximum number of messages to store per user.
171    pub max_storage_per_user: usize,
172    /// The duration for which messages are retained.
173    pub retention_period: Duration,
174}
175
176/// Creates a Kademlia record key for discovering mailbox providers.
177pub fn make_mailbox_provider_key() -> kad::RecordKey {
178    kad::RecordKey::new(&b"mailbox-providers".to_vec())
179}
180
181/// Creates a Kademlia record key for discovering the mailboxes for a specific recipient.
182pub fn make_recipient_mailbox_key(recipient_hash: [u8; 32]) -> kad::RecordKey {
183    kad::RecordKey::new(&format!("recipient-mailbox/{}", hex::encode(recipient_hash)).into_bytes())
184}