1use crate::crypto::{Identity, StorageEncryption};
4use crate::network::NetworkLayer;
5use crate::storage::{MailboxStore, SledMailboxStore};
6use anyhow::Result;
7use libp2p::kad;
8use std::sync::Arc;
9use std::time::Duration;
10use tokio::sync::mpsc;
11use tokio::time::interval;
12use tracing::{debug, error, info, trace};
13
14pub struct MailboxNode {
18 pub identity: Arc<Identity>,
20 pub storage: Arc<SledMailboxStore>,
22 pub max_storage_per_user: usize,
24 pub retention_period: Duration,
26}
27
28impl MailboxNode {
29 pub fn new(
43 identity: Arc<Identity>,
44 db: sled::Db,
45 encryption: Option<StorageEncryption>,
46 max_storage_per_user: usize,
47 retention_period: Duration,
48 ) -> Result<Self> {
49 let storage = Arc::new(SledMailboxStore::new(db, encryption, max_storage_per_user)?);
50
51 Ok(Self {
52 identity,
53 storage,
54 max_storage_per_user,
55 retention_period,
56 })
57 }
58
59 pub async fn run_with_network(&mut self, network_layer: NetworkLayer) -> Result<()> {
72 info!(
73 "Starting mailbox node with network layer: {}",
74 self.identity.peer_id
75 );
76 info!(
77 "Max storage per user: {} messages",
78 self.max_storage_per_user
79 );
80 info!("Retention period: {:?}", self.retention_period);
81
82 let storage_clone = self.storage.clone();
84 let retention_period = self.retention_period;
85 tokio::spawn(async move {
86 Self::cleanup_task(storage_clone, retention_period).await;
87 });
88
89 let (_incoming_tx, _incoming_rx) = mpsc::unbounded_channel::<crate::types::Message>();
91
92 let storage_for_network = self.storage.clone();
94 tokio::spawn(async move {
95 if let Err(e) = Self::run_mailbox_network_loop(network_layer, storage_for_network).await
97 {
98 error!("Mailbox network loop error: {}", e);
99 }
100 });
101
102 loop {
104 tokio::time::sleep(Duration::from_secs(60)).await;
105 info!("Mailbox node still running...");
106 }
107 }
108
109 async fn run_mailbox_network_loop(
111 mut network_layer: NetworkLayer,
112 _storage: Arc<SledMailboxStore>,
113 ) -> Result<()> {
114 info!("Starting mailbox network event loop");
115
116 if let Err(e) = network_layer.start_providing_mailbox() {
118 error!("Failed to register as mailbox provider: {}", e);
119 } else {
120 info!("Successfully registered as mailbox provider in DHT");
121 }
122
123 let (incoming_tx, mut incoming_rx) = mpsc::unbounded_channel();
124
125 tokio::spawn(async move {
126 while let Some(_message) = incoming_rx.recv().await {
127 debug!("Received message in mailbox node (ignoring)");
129 }
130 });
131
132 network_layer.run(incoming_tx).await
134 }
135
136 async fn cleanup_task(storage: Arc<SledMailboxStore>, retention_period: Duration) {
138 let mut cleanup_interval = interval(Duration::from_secs(60 * 60)); info!(
141 "Starting cleanup task with retention period: {:?}",
142 retention_period
143 );
144
145 loop {
146 cleanup_interval.tick().await;
147
148 trace!("Running message cleanup");
149
150 if let Err(e) = storage.cleanup_expired(retention_period).await {
151 error!("Cleanup failed: {}", e);
152 } else {
153 trace!("Cleanup completed successfully");
154 }
155 }
156 }
157
158 pub fn get_stats(&self) -> MailboxStats {
160 MailboxStats {
161 max_storage_per_user: self.max_storage_per_user,
162 retention_period: self.retention_period,
163 }
164 }
165}
166
167#[derive(Debug)]
169pub struct MailboxStats {
170 pub max_storage_per_user: usize,
172 pub retention_period: Duration,
174}
175
176pub fn make_mailbox_provider_key() -> kad::RecordKey {
178 kad::RecordKey::new(&b"mailbox-providers".to_vec())
179}
180
181pub fn make_recipient_mailbox_key(recipient_hash: [u8; 32]) -> kad::RecordKey {
183 kad::RecordKey::new(&format!("recipient-mailbox/{}", hex::encode(recipient_hash)).into_bytes())
184}