p2p_chat/sync/engine/outbox/
forward.rs1use std::time::Instant;
2use std::ops::Deref;
3
4use anyhow::{anyhow, Result};
5use tracing::{debug, info};
6
7use crate::crypto::StorageEncryption;
8use crate::network::NetworkHandle;
9use crate::types::EncryptedMessage;
10
11use super::super::SyncEngine;
12
13impl SyncEngine {
14 pub(super) async fn forward_pending_message(
35 &mut self,
36 network: &NetworkHandle,
37 message: &crate::types::Message,
38 ) -> Result<bool> {
39 let Some(friend) = self.friends.get_friend(&message.recipient).await? else {
40 return Err(anyhow!(
41 "Cannot forward message {}: recipient {} not in friends list.",
42 message.id,
43 message.recipient
44 ));
45 };
46
47 let recipient_hash = StorageEncryption::derive_recipient_hash(&friend.e2e_public_key);
48 let encrypted_content = self
50 .identity
51 .deref()
52 .encrypt_for(&friend.e2e_public_key, &message.content)?;
53
54 let encrypted_msg = EncryptedMessage {
55 id: message.id,
56 sender: self.identity.peer_id,
57 recipient_hash,
58 encrypted_content,
59 timestamp: message.timestamp,
60 nonce: message.nonce,
61 sender_pub_key: self.identity.hpke_public_key(),
62 };
63
64 let candidate_mailboxes = self.rank_mailboxes_subset(&self.discovered_mailboxes);
65 if candidate_mailboxes.is_empty() {
66 debug!(
67 "No available (non-backed-off) mailboxes to forward message {}.",
68 message.id
69 );
70 return Ok(false);
71 }
72
73 let min_replicas = 2;
74 let max_attempts = candidate_mailboxes.len().min(4);
75 let mut forwarded_count = 0;
76 let mut mailboxes_to_forget = Vec::new();
77
78 for peer_id in candidate_mailboxes.iter().take(max_attempts) {
79 if !self.discovered_mailboxes.contains(peer_id) {
80 debug!(
81 "Skipping mailbox forwarding to {} - was removed during iteration",
82 peer_id
83 );
84 continue;
85 }
86
87 let start_time = Instant::now();
88 match network
89 .mailbox_put(*peer_id, recipient_hash, encrypted_msg.clone())
90 .await
91 {
92 Ok(true) => {
93 self.update_mailbox_performance(*peer_id, true, start_time.elapsed()).await;
94 info!(
95 "Successfully forwarded pending message {} to mailbox {} ({}/{})",
96 message.id,
97 peer_id,
98 forwarded_count + 1,
99 min_replicas
100 );
101 forwarded_count += 1;
102
103 if forwarded_count >= min_replicas {
104 break;
105 }
106 }
107 Ok(false) => {
108 self.update_mailbox_performance(*peer_id, false, start_time.elapsed()).await;
109 debug!(
110 "Mailbox {} rejected pending message {}",
111 peer_id, message.id
112 );
113
114 if self.should_forget_mailbox(*peer_id) {
115 mailboxes_to_forget.push(*peer_id);
116 }
117 }
118 Err(err) => {
119 self.update_mailbox_performance(*peer_id, false, start_time.elapsed()).await;
120 debug!(
121 "Failed to forward pending message {} to mailbox {}: {}",
122 message.id, peer_id, err
123 );
124
125 if self.should_forget_mailbox(*peer_id) {
126 mailboxes_to_forget.push(*peer_id);
127 }
128 }
129 }
130 }
131
132 for mailbox_id in mailboxes_to_forget {
133 self.forget_failing_mailbox(mailbox_id).await;
134 }
135
136 Ok(forwarded_count > 0)
137 }
138}