p2p_chat/storage/outbox.rs
1//! This module defines the storage interface and implementation for managing
2//! outgoing messages that are pending delivery.
3use crate::crypto::StorageEncryption;
4use crate::types::Message;
5use anyhow::Result;
6use async_trait::async_trait;
7use sled::Db;
8use uuid::Uuid;
9
10/// A trait for managing outgoing messages that are pending delivery.
11#[async_trait]
12pub trait OutboxStore {
13 /// Adds a new message to the outbox.
14 ///
15 /// # Arguments
16 ///
17 /// * `msg` - The `Message` to add.
18 ///
19 /// # Errors
20 ///
21 /// This function will return an error if the message cannot be added.
22 async fn add_pending(&self, msg: Message) -> Result<()>;
23
24 /// Retrieves all pending messages from the outbox.
25 ///
26 /// Messages are sorted by timestamp for consistent ordering.
27 ///
28 /// # Returns
29 ///
30 /// A `Vec` of `Message`s that are pending delivery.
31 ///
32 /// # Errors
33 ///
34 /// This function will return an error if the messages cannot be retrieved.
35 async fn get_pending(&self) -> Result<Vec<Message>>;
36
37 /// Removes a pending message from the outbox.
38 ///
39 /// # Arguments
40 ///
41 /// * `msg_id` - The `Uuid` of the message to remove.
42 ///
43 /// # Errors
44 ///
45 /// This function will return an error if the message cannot be removed.
46 async fn remove_pending(&self, msg_id: &Uuid) -> Result<()>;
47
48 /// Returns the number of pending messages in the outbox.
49 ///
50 /// # Errors
51 ///
52 /// This function will return an error if the count cannot be retrieved.
53 async fn count_pending(&self) -> Result<usize>;
54}
55
56/// An `OutboxStore` implementation using `sled` for storage.
57pub struct SledOutboxStore {
58 tree: sled::Tree,
59 encryption: Option<StorageEncryption>,
60}
61
62impl SledOutboxStore {
63 /// Creates a new `SledOutboxStore`.
64 ///
65 /// # Arguments
66 ///
67 /// * `db` - The `sled::Db` instance to use for storage.
68 /// * `encryption` - The optional `StorageEncryption` to use for encrypting messages.
69 ///
70 /// # Errors
71 ///
72 /// This function will return an error if the `outbox` tree cannot be opened.
73 pub fn new(db: Db, encryption: Option<StorageEncryption>) -> Result<Self> {
74 let tree = db.open_tree("outbox")?;
75 Ok(Self { tree, encryption })
76 }
77
78 /// Serializes a `Message` and encrypts it if encryption is enabled.
79 fn serialize_message(&self, msg: &Message) -> Result<Vec<u8>> {
80 let serialized = serde_json::to_vec(msg)?;
81
82 if let Some(ref encryption) = self.encryption {
83 encryption.encrypt_value(&serialized)
84 } else {
85 Ok(serialized)
86 }
87 }
88
89 /// Decrypts and deserializes a `Message`.
90 fn deserialize_message(&self, data: &[u8]) -> Result<Message> {
91 let decrypted = if let Some(ref encryption) = self.encryption {
92 encryption.decrypt_value(data)?
93 } else {
94 data.to_vec()
95 };
96
97 Ok(serde_json::from_slice(&decrypted)?)
98 }
99}
100
101#[async_trait]
102impl OutboxStore for SledOutboxStore {
103 async fn add_pending(&self, msg: Message) -> Result<()> {
104 let key = msg.id.to_string();
105 let value = self.serialize_message(&msg)?;
106
107 self.tree.insert(key.as_bytes(), value)?;
108 self.tree.flush_async().await?;
109 Ok(())
110 }
111
112 async fn get_pending(&self) -> Result<Vec<Message>> {
113 let mut messages = Vec::new();
114
115 for result in self.tree.iter() {
116 let (_key, value) = result?;
117 messages.push(self.deserialize_message(&value)?);
118 }
119
120 // Sort by timestamp for consistent ordering.
121 messages.sort_by_key(|msg| msg.timestamp);
122 Ok(messages)
123 }
124
125 async fn remove_pending(&self, msg_id: &Uuid) -> Result<()> {
126 let key = msg_id.to_string();
127 self.tree.remove(key.as_bytes())?;
128 self.tree.flush_async().await?;
129 Ok(())
130 }
131
132 async fn count_pending(&self) -> Result<usize> {
133 Ok(self.tree.len())
134 }
135}