p2p_chat/storage/mailbox/
operations.rs

1//! This module implements the `MailboxStore` trait for `SledMailboxStore`.
2use super::{MailboxStore, SledMailboxStore};
3use crate::types::EncryptedMessage;
4use anyhow::Result;
5use async_trait::async_trait;
6use chrono::Utc;
7use std::time::Duration;
8use tracing::warn;
9use uuid::Uuid;
10
11#[async_trait]
12impl MailboxStore for SledMailboxStore {
13    /// Stores an encrypted message for a recipient in the mailbox.
14    ///
15    /// This function also enforces storage limits by removing the oldest messages
16    /// if the maximum storage per user is exceeded.
17    ///
18    /// # Arguments
19    ///
20    /// * `recipient_hash` - The hash of the recipient's public key.
21    /// * `msg` - The `EncryptedMessage` to store.
22    ///
23    /// # Errors
24    ///
25    /// This function will return an error if the message cannot be stored or if
26    /// there are issues with the underlying `sled` database.
27    async fn store_message(&self, recipient_hash: [u8; 32], msg: EncryptedMessage) -> Result<()> {
28        let key = self.make_message_key(&recipient_hash, &msg.id);
29        let value = self.serialize_message(&msg)?;
30
31        self.tree.insert(key, value)?;
32
33        // Enforce storage limits by cleaning up old messages if necessary
34        let mut existing: Vec<(Vec<u8>, i64, u64)> = Vec::new();
35        for entry in self.tree.scan_prefix(recipient_hash) {
36            match entry {
37                Ok((key, value)) => match self.deserialize_message(&value) {
38                    Ok(existing_msg) => {
39                        existing.push((key.to_vec(), existing_msg.timestamp, existing_msg.nonce));
40                    }
41                    Err(err) => {
42                        warn!(
43                            "Dropping corrupt mailbox message for recipient {:?}: {}",
44                            &recipient_hash[..8],
45                            err
46                        );
47                        self.tree.remove(&key)?;
48                    }
49                },
50                Err(err) => {
51                    warn!(
52                        "Failed to iterate mailbox entries for recipient {:?}: {}",
53                        &recipient_hash[..8],
54                        err
55                    );
56                }
57            }
58        }
59
60        if existing.len() > self.max_storage_per_user {
61            existing.sort_by(|a, b| (a.1, a.2).cmp(&(b.1, b.2)));
62            let excess = existing.len() - self.max_storage_per_user;
63            for (key, _, _) in existing.into_iter().take(excess) {
64                self.tree.remove(key)?;
65            }
66        }
67
68        self.tree.flush_async().await?;
69        Ok(())
70    }
71
72    /// Fetches a limited number of messages for a recipient from the mailbox.
73    ///
74    /// The messages are sorted by timestamp and nonce for deterministic ordering.
75    /// Corrupt messages encountered during fetching are logged and removed.
76    ///
77    /// # Arguments
78    ///
79    /// * `recipient_hash` - The hash of the recipient's public key.
80    /// * `limit` - The maximum number of messages to fetch.
81    ///
82    /// # Returns
83    ///
84    /// A `Vec` of `EncryptedMessage`s.
85    ///
86    /// # Errors
87    ///
88    /// This function will return an error if there are issues with the underlying
89    /// `sled` database.
90    async fn fetch_messages(
91        &self,
92        recipient_hash: [u8; 32],
93        limit: usize,
94    ) -> Result<Vec<EncryptedMessage>> {
95        let mut messages = Vec::new();
96
97        for result in self.tree.scan_prefix(recipient_hash) {
98            match result {
99                Ok((key, value)) => match self.deserialize_message(&value) {
100                    Ok(msg) => {
101                        messages.push(msg);
102                        if messages.len() >= limit {
103                            break;
104                        }
105                    }
106                    Err(err) => {
107                        warn!(
108                            "Removing corrupt mailbox message for recipient {:?}: {}",
109                            &recipient_hash[..8],
110                            err
111                        );
112                        self.tree.remove(&key)?;
113                    }
114                },
115                Err(err) => {
116                    warn!(
117                        "Failed to iterate mailbox entries for recipient {:?}: {}",
118                        &recipient_hash[..8],
119                        err
120                    );
121                }
122            }
123        }
124
125        // Sort by timestamp/nonce for deterministic ordering.
126        messages.sort_by_key(|msg| (msg.timestamp, msg.nonce));
127        Ok(messages)
128    }
129
130    /// Deletes specific messages for a recipient from the mailbox.
131    ///
132    /// # Arguments
133    ///
134    /// * `recipient_hash` - The hash of the recipient's public key.
135    /// * `msg_ids` - A `Vec` of message IDs to delete.
136    ///
137    /// # Returns
138    ///
139    /// The number of messages successfully deleted.
140    ///
141    /// # Errors
142    ///
143    /// This function will return an error if there are issues with the underlying
144    /// `sled` database.
145    async fn delete_messages(&self, recipient_hash: [u8; 32], msg_ids: Vec<Uuid>) -> Result<usize> {
146        let mut deleted = 0;
147
148        for msg_id in msg_ids {
149            let key = self.make_message_key(&recipient_hash, &msg_id);
150            if self.tree.remove(key)?.is_some() {
151                deleted += 1;
152            }
153        }
154
155        self.tree.flush_async().await?;
156        Ok(deleted)
157    }
158
159    /// Cleans up messages older than `max_age` from the mailbox.
160    ///
161    /// Corrupt messages encountered during cleanup are logged and removed.
162    ///
163    /// # Arguments
164    ///
165    /// * `max_age` - The maximum age for messages to be retained. Messages older
166    ///               than this duration will be deleted.
167    ///
168    /// # Errors
169    ///
170    /// This function will return an error if there are issues with the underlying
171    /// `sled` database.
172    async fn cleanup_expired(&self, max_age: Duration) -> Result<()> {
173        let cutoff = Utc::now().timestamp_millis() - max_age.as_millis() as i64;
174        let mut keys_to_remove = Vec::new();
175
176        for result in self.tree.iter() {
177            match result {
178                Ok((key, value)) => match self.deserialize_message(&value) {
179                    Ok(msg) => {
180                        if msg.timestamp < cutoff {
181                            keys_to_remove.push(key.to_vec());
182                        }
183                    }
184                    Err(err) => {
185                        warn!("Removing corrupt mailbox entry during cleanup: {}", err);
186                        keys_to_remove.push(key.to_vec());
187                    }
188                },
189                Err(err) => {
190                    warn!("Failed to iterate mailbox for cleanup: {}", err);
191                }
192            }
193        }
194
195        for key in keys_to_remove {
196            self.tree.remove(key)?;
197        }
198
199        self.tree.flush_async().await?;
200        Ok(())
201    }
202}