p2p_chat/sync/engine/outbox/
forward.rs

1use std::time::Instant;
2use std::ops::Deref;
3
4use anyhow::{anyhow, Result};
5use tracing::{debug, info};
6
7use crate::crypto::StorageEncryption;
8use crate::network::NetworkHandle;
9use crate::types::EncryptedMessage;
10
11use super::super::SyncEngine;
12
13impl SyncEngine {
14    /// Forwards a pending message to available mailbox providers.
15    ///
16    /// This function attempts to encrypt and store a message in at least two
17    /// mailbox providers for redundancy. It ranks available mailboxes and updates
18    /// their performance metrics.
19    ///
20    /// # Arguments
21    ///
22    /// * `network` - The `NetworkHandle` to use for interacting with the network.
23    /// * `message` - The message to encrypt and forward.
24    ///
25    /// # Returns
26    ///
27    /// `true` if the message was successfully forwarded to at least one mailbox,
28    /// `false` otherwise.
29    ///
30    /// # Errors
31    ///
32    /// This function will return an error if the recipient is not a friend or if
33    /// there are issues with encryption or storage.
34    pub(super) async fn forward_pending_message(
35        &mut self,
36        network: &NetworkHandle,
37        message: &crate::types::Message,
38    ) -> Result<bool> {
39        let Some(friend) = self.friends.get_friend(&message.recipient).await? else {
40            return Err(anyhow!(
41                "Cannot forward message {}: recipient {} not in friends list.",
42                message.id,
43                message.recipient
44            ));
45        };
46
47        let recipient_hash = StorageEncryption::derive_recipient_hash(&friend.e2e_public_key);
48        // Encrypt the message content.
49        let encrypted_content = self
50            .identity
51            .deref()
52            .encrypt_for(&friend.e2e_public_key, &message.content)?;
53
54        let encrypted_msg = EncryptedMessage {
55            id: message.id,
56            sender: self.identity.peer_id,
57            recipient_hash,
58            encrypted_content,
59            timestamp: message.timestamp,
60            nonce: message.nonce,
61            sender_pub_key: self.identity.hpke_public_key(),
62        };
63
64        let candidate_mailboxes = self.rank_mailboxes_subset(&self.discovered_mailboxes);
65        if candidate_mailboxes.is_empty() {
66            debug!(
67                "No available (non-backed-off) mailboxes to forward message {}.",
68                message.id
69            );
70            return Ok(false);
71        }
72
73        let min_replicas = 2;
74        let max_attempts = candidate_mailboxes.len().min(4);
75        let mut forwarded_count = 0;
76        let mut mailboxes_to_forget = Vec::new();
77
78        for peer_id in candidate_mailboxes.iter().take(max_attempts) {
79            if !self.discovered_mailboxes.contains(peer_id) {
80                debug!(
81                    "Skipping mailbox forwarding to {} - was removed during iteration",
82                    peer_id
83                );
84                continue;
85            }
86
87            let start_time = Instant::now();
88            match network
89                .mailbox_put(*peer_id, recipient_hash, encrypted_msg.clone())
90                .await
91            {
92                Ok(true) => {
93                    self.update_mailbox_performance(*peer_id, true, start_time.elapsed()).await;
94                    info!(
95                        "Successfully forwarded pending message {} to mailbox {} ({}/{})",
96                        message.id,
97                        peer_id,
98                        forwarded_count + 1,
99                        min_replicas
100                    );
101                    forwarded_count += 1;
102
103                    if forwarded_count >= min_replicas {
104                        break;
105                    }
106                }
107                Ok(false) => {
108                    self.update_mailbox_performance(*peer_id, false, start_time.elapsed()).await;
109                    debug!(
110                        "Mailbox {} rejected pending message {}",
111                        peer_id, message.id
112                    );
113
114                    if self.should_forget_mailbox(*peer_id) {
115                        mailboxes_to_forget.push(*peer_id);
116                    }
117                }
118                Err(err) => {
119                    self.update_mailbox_performance(*peer_id, false, start_time.elapsed()).await;
120                    debug!(
121                        "Failed to forward pending message {} to mailbox {}: {}",
122                        message.id, peer_id, err
123                    );
124
125                    if self.should_forget_mailbox(*peer_id) {
126                        mailboxes_to_forget.push(*peer_id);
127                    }
128                }
129            }
130        }
131
132        for mailbox_id in mailboxes_to_forget {
133            self.forget_failing_mailbox(mailbox_id).await;
134        }
135
136        Ok(forwarded_count > 0)
137    }
138}