p2p_chat/sync/engine/outbox/
retry.rs1use anyhow::{anyhow, Result};
3use tracing::{debug, info, warn};
4
5use crate::network::NetworkHandle;
6
7use super::super::SyncEngine;
8
9impl SyncEngine {
10 pub async fn retry_outbox(&mut self) -> Result<()> {
22 let pending_messages = self.outbox.get_pending().await?;
23 if pending_messages.is_empty() {
24 return Ok(());
25 }
26
27 let Some(network) = self.network.clone() else {
28 debug!("No network handle available for outbox retry");
29 return Ok(());
30 };
31
32 let available_mailboxes = self.get_available_mailboxes();
33
34 if self.discovered_mailboxes.is_empty() {
35 debug!(
36 "Have {} pending messages but no discovered mailboxes, triggering forced discovery",
37 pending_messages.len()
38 );
39 if let Err(e) = self.discover_mailboxes_if_needed(true).await {
40 warn!(
41 "Failed to trigger mailbox discovery for pending messages: {}",
42 e
43 );
44 }
45 } else if available_mailboxes.is_empty() {
46 debug!(
47 "Have {} pending messages but all {} mailboxes are backed off",
48 pending_messages.len(),
49 self.discovered_mailboxes.len()
50 );
51 }
52
53 debug!("Retrying {} pending messages", pending_messages.len());
54
55 for message in pending_messages {
56 let delivered_direct = self.attempt_direct_delivery(&network, &message).await?;
57
58 if delivered_direct {
59 continue;
60 }
61
62 if self.discovered_mailboxes.is_empty() {
63 debug!("No mailboxes discovered to forward message {}.", message.id);
64 continue;
65 }
66
67 match self.forward_pending_message(&network, &message).await {
68 Ok(true) => {
69 self.outbox.remove_pending(&message.id).await?;
72 info!(
73 "Removed message {} from outbox after successful mailbox forward.",
74 message.id
75 );
76 }
77 Ok(false) => {
78 debug!(
79 "Failed to forward message {} to any mailboxes, will retry later.",
80 message.id
81 );
82 }
83 Err(e) => {
84 warn!(
85 "Unable to forward message {} via mailbox: {}",
86 message.id, e
87 );
88 }
89 }
90 }
91
92 Ok(())
93 }
94
95 async fn attempt_direct_delivery(
113 &mut self,
114 network: &NetworkHandle,
115 message: &crate::types::Message,
116 ) -> Result<bool> {
117 let should_try_direct = self.backoff_manager.can_attempt(&message.recipient);
118
119 let direct_result = if should_try_direct {
120 debug!("Attempting direct delivery to peer {}", message.recipient);
121 self.backoff_manager.record_attempt(message.recipient);
122 network
123 .send_message(message.recipient, message.clone())
124 .await
125 } else {
126 debug!(
127 "Skipping direct delivery attempt to backed-off peer {}",
128 message.recipient
129 );
130 Err(anyhow!("Peer is backed off"))
131 };
132
133 match direct_result {
134 Ok(()) => {
135 if should_try_direct {
136 self.backoff_manager.record_success(&message.recipient);
137 }
138
139 self.outbox.remove_pending(&message.id).await?;
142 info!(
143 "Successfully delivered message {} directly to {}",
144 message.id, message.recipient
145 );
146 Ok(true)
147 }
148 Err(e) => {
149 if should_try_direct {
150 self.backoff_manager.record_failure(message.recipient);
151 }
152 debug!(
153 "Direct retry for message {} to {} failed: {}. Attempting mailbox forward.",
154 message.id, message.recipient, e
155 );
156 Ok(false)
157 }
158 }
159 }
160}