p2p_chat/cli/
commands.rs

1//! This module defines the core data structures and commands used throughout the
2//! application, particularly for the CLI and TUI.
3use crate::crypto::Identity;
4use crate::network::NetworkHandle;
5use crate::storage::{FriendsStore, MessageStore, OutboxStore};
6use crate::sync::SyncEngine;
7use crate::types::{EncryptedMessage, Friend, Message};
8use anyhow::Result;
9use libp2p::PeerId;
10use std::collections::HashSet;
11use std::sync::Arc;
12use tokio::sync::{mpsc, Mutex};
13use tracing::{debug, info};
14
15/// Represents the result of attempting to deliver a message to a mailbox.
16pub enum MailboxDeliveryResult {
17    /// The message was successfully delivered to the specified number of mailboxes.
18    Success(usize),
19    /// The message could not be delivered to any mailboxes.
20    Failure,
21}
22
23/// Represents the central state and functionality of the application node.
24///
25/// This struct holds all the necessary components for the application to run,
26/// such as the user's identity, storage, network handle, and synchronization engine.
27pub struct Node {
28    /// The user's identity.
29    pub identity: Arc<Identity>,
30    /// The store for managing friends.
31    pub friends: Arc<dyn FriendsStore + Send + Sync>,
32    /// The store for managing message history.
33    pub history: Arc<dyn MessageStore + Send + Sync>,
34    /// The store for managing outgoing messages.
35    pub outbox: Arc<dyn OutboxStore + Send + Sync>,
36    /// The handle for interacting with the network layer.
37    pub network: NetworkHandle,
38    /// The sender for sending notifications to the TUI.
39    pub ui_notify_tx: mpsc::UnboundedSender<UiNotification>,
40    /// The synchronization engine.
41    pub sync_engine: Arc<Mutex<SyncEngine>>,
42}
43
44/// Represents a notification to be sent to the UI.
45#[derive(Clone, Debug)]
46pub enum UiNotification {
47    /// A new message has been received.
48    NewMessage(Message),
49    /// A peer has connected.
50    PeerConnected(PeerId),
51    /// A peer has disconnected.
52    PeerDisconnected(PeerId),
53    /// The delivery status of a message has been updated.
54    DeliveryStatusUpdate {
55        /// The ID of the message.
56        message_id: uuid::Uuid,
57        /// The new delivery status.
58        new_status: crate::types::DeliveryStatus,
59    },
60}
61
62impl Node {
63    /// Forwards a message to a set of mailboxes.
64    ///
65    /// This function attempts to deliver a message to a set of mailboxes for a
66    /// given friend. It will try to deliver the message to at least two mailboxes
67    /// for redundancy.
68    ///
69    /// # Arguments
70    ///
71    /// * `message` - The message to forward.
72    /// * `friend` - The friend to whom the message is being sent.
73    /// * `providers` - The set of mailboxes to try.
74    ///
75    /// # Returns
76    ///
77    /// A `MailboxDeliveryResult` indicating whether the delivery was successful.
78    pub async fn forward_to_mailboxes(
79        &self,
80        message: &Message,
81        friend: &Friend,
82        providers: &HashSet<PeerId>,
83    ) -> Result<MailboxDeliveryResult> {
84        if providers.is_empty() {
85            return Ok(MailboxDeliveryResult::Failure);
86        }
87
88        info!(
89            "Attempting to forward message to {} known mailbox providers",
90            providers.len()
91        );
92
93        let recipient_hash =
94            crate::crypto::StorageEncryption::derive_recipient_hash(&friend.e2e_public_key);
95
96        // Re-encrypt the already-encrypted content for mailbox storage
97        // This matches the sync engine's behavior and ensures proper decryption
98        let encrypted_content = self
99            .identity
100            .encrypt_for(&friend.e2e_public_key, &message.content)?;
101
102        let encrypted_msg = EncryptedMessage {
103            id: message.id,
104            sender: self.identity.peer_id,
105            recipient_hash,
106            encrypted_content,
107            timestamp: message.timestamp,
108            nonce: message.nonce,
109            sender_pub_key: self.identity.hpke_public_key(),
110        };
111
112        // Try to send to at least 2 mailboxes for redundancy
113        let min_replicas = 2;
114        let max_attempts = providers.len().min(4); // Don't spam too many mailboxes
115        let mut forwarded_count = 0;
116        let mut failed_attempts = 0;
117
118        // Get mailboxes sorted by performance instead of random shuffle
119        let candidate_mailboxes = {
120            let sync_engine = self.sync_engine.lock().await;
121            sync_engine.rank_mailboxes_subset(providers)
122        };
123
124        for peer_id in candidate_mailboxes.iter().take(max_attempts) {
125            let start_time = std::time::Instant::now();
126            match self
127                .network
128                .mailbox_put(*peer_id, recipient_hash, encrypted_msg.clone())
129                .await
130            {
131                Ok(true) => {
132                    let response_time = start_time.elapsed();
133                    info!(
134                        "Successfully forwarded message {} to mailbox {} ({}/{})",
135                        message.id,
136                        peer_id,
137                        forwarded_count + 1,
138                        min_replicas
139                    );
140                    forwarded_count += 1;
141
142                    // Update performance tracking (fire and forget to avoid blocking)
143                    let sync_engine_clone = self.sync_engine.clone();
144                    let peer_id_copy = *peer_id;
145                    tokio::spawn(async move {
146                        if let Ok(mut sync_engine) = sync_engine_clone.try_lock() {
147                            sync_engine.update_mailbox_performance(
148                                peer_id_copy,
149                                true,
150                                response_time,
151                            ).await;
152                        }
153                    });
154
155                    // Continue until we reach minimum replicas
156                    if forwarded_count >= min_replicas {
157                        break;
158                    }
159                }
160                Ok(false) => {
161                    let response_time = start_time.elapsed();
162                    debug!("Mailbox {} rejected message {}", peer_id, message.id);
163                    failed_attempts += 1;
164
165                    // Update performance tracking (fire and forget to avoid blocking)
166                    let sync_engine_clone = self.sync_engine.clone();
167                    let peer_id_copy = *peer_id;
168                    tokio::spawn(async move {
169                        if let Ok(mut sync_engine) = sync_engine_clone.try_lock() {
170                            sync_engine.update_mailbox_performance(
171                                peer_id_copy,
172                                false,
173                                response_time,
174                            ).await;
175                        }
176                    });
177                }
178                Err(e) => {
179                    let response_time = start_time.elapsed();
180                    debug!("Failed to forward message to mailbox {}: {}", peer_id, e);
181                    failed_attempts += 1;
182
183                    // Update performance tracking (fire and forget to avoid blocking)
184                    let sync_engine_clone = self.sync_engine.clone();
185                    let peer_id_copy = *peer_id;
186                    tokio::spawn(async move {
187                        if let Ok(mut sync_engine) = sync_engine_clone.try_lock() {
188                            sync_engine.update_mailbox_performance(
189                                peer_id_copy,
190                                false,
191                                response_time,
192                            ).await;
193                        }
194                    });
195                }
196            }
197        }
198
199        if forwarded_count > 0 {
200            info!(
201                "Message {} successfully stored in {} mailboxes",
202                message.id, forwarded_count
203            );
204            Ok(MailboxDeliveryResult::Success(forwarded_count))
205        } else {
206            debug!(
207                "Failed to store message {} in any mailboxes after {} attempts",
208                message.id, failed_attempts
209            );
210            Ok(MailboxDeliveryResult::Failure)
211        }
212    }
213}