p2p_chat/cli/commands.rs
1//! This module defines the core data structures and commands used throughout the
2//! application, particularly for the CLI and TUI.
3use crate::crypto::Identity;
4use crate::network::NetworkHandle;
5use crate::storage::{FriendsStore, MessageStore, OutboxStore};
6use crate::sync::SyncEngine;
7use crate::types::{EncryptedMessage, Friend, Message};
8use anyhow::Result;
9use libp2p::PeerId;
10use std::collections::HashSet;
11use std::sync::Arc;
12use tokio::sync::{mpsc, Mutex};
13use tracing::{debug, info};
14
15/// Represents the result of attempting to deliver a message to a mailbox.
16pub enum MailboxDeliveryResult {
17 /// The message was successfully delivered to the specified number of mailboxes.
18 Success(usize),
19 /// The message could not be delivered to any mailboxes.
20 Failure,
21}
22
23/// Represents the central state and functionality of the application node.
24///
25/// This struct holds all the necessary components for the application to run,
26/// such as the user's identity, storage, network handle, and synchronization engine.
27pub struct Node {
28 /// The user's identity.
29 pub identity: Arc<Identity>,
30 /// The store for managing friends.
31 pub friends: Arc<dyn FriendsStore + Send + Sync>,
32 /// The store for managing message history.
33 pub history: Arc<dyn MessageStore + Send + Sync>,
34 /// The store for managing outgoing messages.
35 pub outbox: Arc<dyn OutboxStore + Send + Sync>,
36 /// The handle for interacting with the network layer.
37 pub network: NetworkHandle,
38 /// The sender for sending notifications to the TUI.
39 pub ui_notify_tx: mpsc::UnboundedSender<UiNotification>,
40 /// The synchronization engine.
41 pub sync_engine: Arc<Mutex<SyncEngine>>,
42}
43
44/// Represents a notification to be sent to the UI.
45#[derive(Clone, Debug)]
46pub enum UiNotification {
47 /// A new message has been received.
48 NewMessage(Message),
49 /// A peer has connected.
50 PeerConnected(PeerId),
51 /// A peer has disconnected.
52 PeerDisconnected(PeerId),
53 /// The delivery status of a message has been updated.
54 DeliveryStatusUpdate {
55 /// The ID of the message.
56 message_id: uuid::Uuid,
57 /// The new delivery status.
58 new_status: crate::types::DeliveryStatus,
59 },
60}
61
62impl Node {
63 /// Forwards a message to a set of mailboxes.
64 ///
65 /// This function attempts to deliver a message to a set of mailboxes for a
66 /// given friend. It will try to deliver the message to at least two mailboxes
67 /// for redundancy.
68 ///
69 /// # Arguments
70 ///
71 /// * `message` - The message to forward.
72 /// * `friend` - The friend to whom the message is being sent.
73 /// * `providers` - The set of mailboxes to try.
74 ///
75 /// # Returns
76 ///
77 /// A `MailboxDeliveryResult` indicating whether the delivery was successful.
78 pub async fn forward_to_mailboxes(
79 &self,
80 message: &Message,
81 friend: &Friend,
82 providers: &HashSet<PeerId>,
83 ) -> Result<MailboxDeliveryResult> {
84 if providers.is_empty() {
85 return Ok(MailboxDeliveryResult::Failure);
86 }
87
88 info!(
89 "Attempting to forward message to {} known mailbox providers",
90 providers.len()
91 );
92
93 let recipient_hash =
94 crate::crypto::StorageEncryption::derive_recipient_hash(&friend.e2e_public_key);
95
96 // Re-encrypt the already-encrypted content for mailbox storage
97 // This matches the sync engine's behavior and ensures proper decryption
98 let encrypted_content = self
99 .identity
100 .encrypt_for(&friend.e2e_public_key, &message.content)?;
101
102 let encrypted_msg = EncryptedMessage {
103 id: message.id,
104 sender: self.identity.peer_id,
105 recipient_hash,
106 encrypted_content,
107 timestamp: message.timestamp,
108 nonce: message.nonce,
109 sender_pub_key: self.identity.hpke_public_key(),
110 };
111
112 // Try to send to at least 2 mailboxes for redundancy
113 let min_replicas = 2;
114 let max_attempts = providers.len().min(4); // Don't spam too many mailboxes
115 let mut forwarded_count = 0;
116 let mut failed_attempts = 0;
117
118 // Get mailboxes sorted by performance instead of random shuffle
119 let candidate_mailboxes = {
120 let sync_engine = self.sync_engine.lock().await;
121 sync_engine.rank_mailboxes_subset(providers)
122 };
123
124 for peer_id in candidate_mailboxes.iter().take(max_attempts) {
125 let start_time = std::time::Instant::now();
126 match self
127 .network
128 .mailbox_put(*peer_id, recipient_hash, encrypted_msg.clone())
129 .await
130 {
131 Ok(true) => {
132 let response_time = start_time.elapsed();
133 info!(
134 "Successfully forwarded message {} to mailbox {} ({}/{})",
135 message.id,
136 peer_id,
137 forwarded_count + 1,
138 min_replicas
139 );
140 forwarded_count += 1;
141
142 // Update performance tracking (fire and forget to avoid blocking)
143 let sync_engine_clone = self.sync_engine.clone();
144 let peer_id_copy = *peer_id;
145 tokio::spawn(async move {
146 if let Ok(mut sync_engine) = sync_engine_clone.try_lock() {
147 sync_engine.update_mailbox_performance(
148 peer_id_copy,
149 true,
150 response_time,
151 ).await;
152 }
153 });
154
155 // Continue until we reach minimum replicas
156 if forwarded_count >= min_replicas {
157 break;
158 }
159 }
160 Ok(false) => {
161 let response_time = start_time.elapsed();
162 debug!("Mailbox {} rejected message {}", peer_id, message.id);
163 failed_attempts += 1;
164
165 // Update performance tracking (fire and forget to avoid blocking)
166 let sync_engine_clone = self.sync_engine.clone();
167 let peer_id_copy = *peer_id;
168 tokio::spawn(async move {
169 if let Ok(mut sync_engine) = sync_engine_clone.try_lock() {
170 sync_engine.update_mailbox_performance(
171 peer_id_copy,
172 false,
173 response_time,
174 ).await;
175 }
176 });
177 }
178 Err(e) => {
179 let response_time = start_time.elapsed();
180 debug!("Failed to forward message to mailbox {}: {}", peer_id, e);
181 failed_attempts += 1;
182
183 // Update performance tracking (fire and forget to avoid blocking)
184 let sync_engine_clone = self.sync_engine.clone();
185 let peer_id_copy = *peer_id;
186 tokio::spawn(async move {
187 if let Ok(mut sync_engine) = sync_engine_clone.try_lock() {
188 sync_engine.update_mailbox_performance(
189 peer_id_copy,
190 false,
191 response_time,
192 ).await;
193 }
194 });
195 }
196 }
197 }
198
199 if forwarded_count > 0 {
200 info!(
201 "Message {} successfully stored in {} mailboxes",
202 message.id, forwarded_count
203 );
204 Ok(MailboxDeliveryResult::Success(forwarded_count))
205 } else {
206 debug!(
207 "Failed to store message {} in any mailboxes after {} attempts",
208 message.id, failed_attempts
209 );
210 Ok(MailboxDeliveryResult::Failure)
211 }
212 }
213}