p2p_chat/storage/
outbox.rs

1//! This module defines the storage interface and implementation for managing
2//! outgoing messages that are pending delivery.
3use crate::crypto::StorageEncryption;
4use crate::types::Message;
5use anyhow::Result;
6use async_trait::async_trait;
7use sled::Db;
8use uuid::Uuid;
9
10/// A trait for managing outgoing messages that are pending delivery.
11#[async_trait]
12pub trait OutboxStore {
13    /// Adds a new message to the outbox.
14    ///
15    /// # Arguments
16    ///
17    /// * `msg` - The `Message` to add.
18    ///
19    /// # Errors
20    ///
21    /// This function will return an error if the message cannot be added.
22    async fn add_pending(&self, msg: Message) -> Result<()>;
23
24    /// Retrieves all pending messages from the outbox.
25    ///
26    /// Messages are sorted by timestamp for consistent ordering.
27    ///
28    /// # Returns
29    ///
30    /// A `Vec` of `Message`s that are pending delivery.
31    ///
32    /// # Errors
33    ///
34    /// This function will return an error if the messages cannot be retrieved.
35    async fn get_pending(&self) -> Result<Vec<Message>>;
36
37    /// Removes a pending message from the outbox.
38    ///
39    /// # Arguments
40    ///
41    /// * `msg_id` - The `Uuid` of the message to remove.
42    ///
43    /// # Errors
44    ///
45    /// This function will return an error if the message cannot be removed.
46    async fn remove_pending(&self, msg_id: &Uuid) -> Result<()>;
47
48    /// Returns the number of pending messages in the outbox.
49    ///
50    /// # Errors
51    ///
52    /// This function will return an error if the count cannot be retrieved.
53    async fn count_pending(&self) -> Result<usize>;
54}
55
56/// An `OutboxStore` implementation using `sled` for storage.
57pub struct SledOutboxStore {
58    tree: sled::Tree,
59    encryption: Option<StorageEncryption>,
60}
61
62impl SledOutboxStore {
63    /// Creates a new `SledOutboxStore`.
64    ///
65    /// # Arguments
66    ///
67    /// * `db` - The `sled::Db` instance to use for storage.
68    /// * `encryption` - The optional `StorageEncryption` to use for encrypting messages.
69    ///
70    /// # Errors
71    ///
72    /// This function will return an error if the `outbox` tree cannot be opened.
73    pub fn new(db: Db, encryption: Option<StorageEncryption>) -> Result<Self> {
74        let tree = db.open_tree("outbox")?;
75        Ok(Self { tree, encryption })
76    }
77
78    /// Serializes a `Message` and encrypts it if encryption is enabled.
79    fn serialize_message(&self, msg: &Message) -> Result<Vec<u8>> {
80        let serialized = serde_json::to_vec(msg)?;
81
82        if let Some(ref encryption) = self.encryption {
83            encryption.encrypt_value(&serialized)
84        } else {
85            Ok(serialized)
86        }
87    }
88
89    /// Decrypts and deserializes a `Message`.
90    fn deserialize_message(&self, data: &[u8]) -> Result<Message> {
91        let decrypted = if let Some(ref encryption) = self.encryption {
92            encryption.decrypt_value(data)?
93        } else {
94            data.to_vec()
95        };
96
97        Ok(serde_json::from_slice(&decrypted)?)
98    }
99}
100
101#[async_trait]
102impl OutboxStore for SledOutboxStore {
103    async fn add_pending(&self, msg: Message) -> Result<()> {
104        let key = msg.id.to_string();
105        let value = self.serialize_message(&msg)?;
106
107        self.tree.insert(key.as_bytes(), value)?;
108        self.tree.flush_async().await?;
109        Ok(())
110    }
111
112    async fn get_pending(&self) -> Result<Vec<Message>> {
113        let mut messages = Vec::new();
114
115        for result in self.tree.iter() {
116            let (_key, value) = result?;
117            messages.push(self.deserialize_message(&value)?);
118        }
119
120        // Sort by timestamp for consistent ordering.
121        messages.sort_by_key(|msg| msg.timestamp);
122        Ok(messages)
123    }
124
125    async fn remove_pending(&self, msg_id: &Uuid) -> Result<()> {
126        let key = msg_id.to_string();
127        self.tree.remove(key.as_bytes())?;
128        self.tree.flush_async().await?;
129        Ok(())
130    }
131
132    async fn count_pending(&self) -> Result<usize> {
133        Ok(self.tree.len())
134    }
135}