p2p_chat/storage/mailbox/operations.rs
1//! This module implements the `MailboxStore` trait for `SledMailboxStore`.
2use super::{MailboxStore, SledMailboxStore};
3use crate::types::EncryptedMessage;
4use anyhow::Result;
5use async_trait::async_trait;
6use chrono::Utc;
7use std::time::Duration;
8use tracing::warn;
9use uuid::Uuid;
10
11#[async_trait]
12impl MailboxStore for SledMailboxStore {
13 /// Stores an encrypted message for a recipient in the mailbox.
14 ///
15 /// This function also enforces storage limits by removing the oldest messages
16 /// if the maximum storage per user is exceeded.
17 ///
18 /// # Arguments
19 ///
20 /// * `recipient_hash` - The hash of the recipient's public key.
21 /// * `msg` - The `EncryptedMessage` to store.
22 ///
23 /// # Errors
24 ///
25 /// This function will return an error if the message cannot be stored or if
26 /// there are issues with the underlying `sled` database.
27 async fn store_message(&self, recipient_hash: [u8; 32], msg: EncryptedMessage) -> Result<()> {
28 let key = self.make_message_key(&recipient_hash, &msg.id);
29 let value = self.serialize_message(&msg)?;
30
31 self.tree.insert(key, value)?;
32
33 // Enforce storage limits by cleaning up old messages if necessary
34 let mut existing: Vec<(Vec<u8>, i64, u64)> = Vec::new();
35 for entry in self.tree.scan_prefix(recipient_hash) {
36 match entry {
37 Ok((key, value)) => match self.deserialize_message(&value) {
38 Ok(existing_msg) => {
39 existing.push((key.to_vec(), existing_msg.timestamp, existing_msg.nonce));
40 }
41 Err(err) => {
42 warn!(
43 "Dropping corrupt mailbox message for recipient {:?}: {}",
44 &recipient_hash[..8],
45 err
46 );
47 self.tree.remove(&key)?;
48 }
49 },
50 Err(err) => {
51 warn!(
52 "Failed to iterate mailbox entries for recipient {:?}: {}",
53 &recipient_hash[..8],
54 err
55 );
56 }
57 }
58 }
59
60 if existing.len() > self.max_storage_per_user {
61 existing.sort_by(|a, b| (a.1, a.2).cmp(&(b.1, b.2)));
62 let excess = existing.len() - self.max_storage_per_user;
63 for (key, _, _) in existing.into_iter().take(excess) {
64 self.tree.remove(key)?;
65 }
66 }
67
68 self.tree.flush_async().await?;
69 Ok(())
70 }
71
72 /// Fetches a limited number of messages for a recipient from the mailbox.
73 ///
74 /// The messages are sorted by timestamp and nonce for deterministic ordering.
75 /// Corrupt messages encountered during fetching are logged and removed.
76 ///
77 /// # Arguments
78 ///
79 /// * `recipient_hash` - The hash of the recipient's public key.
80 /// * `limit` - The maximum number of messages to fetch.
81 ///
82 /// # Returns
83 ///
84 /// A `Vec` of `EncryptedMessage`s.
85 ///
86 /// # Errors
87 ///
88 /// This function will return an error if there are issues with the underlying
89 /// `sled` database.
90 async fn fetch_messages(
91 &self,
92 recipient_hash: [u8; 32],
93 limit: usize,
94 ) -> Result<Vec<EncryptedMessage>> {
95 let mut messages = Vec::new();
96
97 for result in self.tree.scan_prefix(recipient_hash) {
98 match result {
99 Ok((key, value)) => match self.deserialize_message(&value) {
100 Ok(msg) => {
101 messages.push(msg);
102 if messages.len() >= limit {
103 break;
104 }
105 }
106 Err(err) => {
107 warn!(
108 "Removing corrupt mailbox message for recipient {:?}: {}",
109 &recipient_hash[..8],
110 err
111 );
112 self.tree.remove(&key)?;
113 }
114 },
115 Err(err) => {
116 warn!(
117 "Failed to iterate mailbox entries for recipient {:?}: {}",
118 &recipient_hash[..8],
119 err
120 );
121 }
122 }
123 }
124
125 // Sort by timestamp/nonce for deterministic ordering.
126 messages.sort_by_key(|msg| (msg.timestamp, msg.nonce));
127 Ok(messages)
128 }
129
130 /// Deletes specific messages for a recipient from the mailbox.
131 ///
132 /// # Arguments
133 ///
134 /// * `recipient_hash` - The hash of the recipient's public key.
135 /// * `msg_ids` - A `Vec` of message IDs to delete.
136 ///
137 /// # Returns
138 ///
139 /// The number of messages successfully deleted.
140 ///
141 /// # Errors
142 ///
143 /// This function will return an error if there are issues with the underlying
144 /// `sled` database.
145 async fn delete_messages(&self, recipient_hash: [u8; 32], msg_ids: Vec<Uuid>) -> Result<usize> {
146 let mut deleted = 0;
147
148 for msg_id in msg_ids {
149 let key = self.make_message_key(&recipient_hash, &msg_id);
150 if self.tree.remove(key)?.is_some() {
151 deleted += 1;
152 }
153 }
154
155 self.tree.flush_async().await?;
156 Ok(deleted)
157 }
158
159 /// Cleans up messages older than `max_age` from the mailbox.
160 ///
161 /// Corrupt messages encountered during cleanup are logged and removed.
162 ///
163 /// # Arguments
164 ///
165 /// * `max_age` - The maximum age for messages to be retained. Messages older
166 /// than this duration will be deleted.
167 ///
168 /// # Errors
169 ///
170 /// This function will return an error if there are issues with the underlying
171 /// `sled` database.
172 async fn cleanup_expired(&self, max_age: Duration) -> Result<()> {
173 let cutoff = Utc::now().timestamp_millis() - max_age.as_millis() as i64;
174 let mut keys_to_remove = Vec::new();
175
176 for result in self.tree.iter() {
177 match result {
178 Ok((key, value)) => match self.deserialize_message(&value) {
179 Ok(msg) => {
180 if msg.timestamp < cutoff {
181 keys_to_remove.push(key.to_vec());
182 }
183 }
184 Err(err) => {
185 warn!("Removing corrupt mailbox entry during cleanup: {}", err);
186 keys_to_remove.push(key.to_vec());
187 }
188 },
189 Err(err) => {
190 warn!("Failed to iterate mailbox for cleanup: {}", err);
191 }
192 }
193 }
194
195 for key in keys_to_remove {
196 self.tree.remove(key)?;
197 }
198
199 self.tree.flush_async().await?;
200 Ok(())
201 }
202}