p2p_chat/app/
client.rs

1//! This module contains the primary entry point for running the application in client mode.
2use crate::cli::commands::{Node, UiNotification};
3use crate::crypto::{Identity, StorageEncryption};
4use crate::network::NetworkLayer;
5use crate::storage::{
6    MessageHistory, SeenTracker, SledFriendsStore, SledKnownMailboxesStore, SledOutboxStore,
7    SledSeenTracker,
8};
9use crate::sync::{SyncEngine, SyncStores};
10use crate::types::Message;
11use crate::ui::run_tui;
12use anyhow::Result;
13use libp2p::Multiaddr;
14use std::str::FromStr;
15use std::sync::Arc;
16use std::time::Duration;
17use tokio::sync::{mpsc, Mutex};
18use tracing::{debug, error, info};
19
20/// Runs the application in client mode.
21///
22/// This function initializes and runs all the components required for the client
23/// to operate, including storage, networking, the synchronization engine,
24/// the terminal UI, and the web server.
25///
26/// # Arguments
27///
28/// * `identity` - The user's identity.
29/// * `db` - The database instance.
30/// * `encryption` - The encryption key for the storage, if enabled.
31/// * `port` - The port to listen on for P2P connections.
32/// * `web_port` - The port for the Web UI.
33///
34/// # Errors
35///
36/// This function will return an error if any of the components fail to initialize
37/// or run.
38pub async fn run(
39    identity: Arc<Identity>,
40    db: sled::Db,
41    encryption: Option<StorageEncryption>,
42    port: u16,
43    web_port: u16,
44) -> Result<()> {
45    println!("💬 Starting client mode");
46
47    // Initialize storage components.
48    let friends = Arc::new(SledFriendsStore::new(db.clone(), encryption.clone())?);
49    let history = Arc::new(MessageHistory::new(db.clone(), encryption.clone())?);
50    let outbox = Arc::new(SledOutboxStore::new(db.clone(), encryption.clone())?);
51    let seen = Arc::new(SledSeenTracker::new(db.clone())?);
52    let known_mailboxes = Arc::new(SledKnownMailboxesStore::new(
53        db.clone(),
54        encryption.clone(),
55    )?);
56
57    let listen_addr = Multiaddr::from_str(&format!("/ip4/0.0.0.0/tcp/{}", port))?;
58
59    let bootstrap_nodes = vec![];
60
61    // Initialize the network layer.
62    let (mut network_layer, network_handle) =
63        NetworkLayer::new(identity.clone(), listen_addr, false, bootstrap_nodes)?;
64
65    // Create channels for communication between components.
66    let (incoming_tx, mut incoming_rx) = mpsc::unbounded_channel::<Message>();
67    let (ui_notify_tx, ui_notify_rx) = mpsc::unbounded_channel::<UiNotification>();
68    let (web_notify_tx, web_notify_rx) = mpsc::unbounded_channel::<UiNotification>();
69    let (network_notify_tx, mut network_notify_rx) = mpsc::unbounded_channel::<UiNotification>();
70
71    let sync_stores = SyncStores::new(
72        friends.clone(),
73        outbox.clone(),
74        history.clone(),
75        seen.clone(),
76        known_mailboxes.clone(),
77    );
78
79    // Initialize the synchronization engine.
80    let (sync_engine_instance, sync_event_tx, mut sync_event_rx) = SyncEngine::new_with_network(
81        Duration::from_secs(30),
82        identity.clone(),
83        sync_stores,
84        network_handle.clone(),
85        ui_notify_tx.clone(),
86        Some(web_notify_tx.clone()),
87    )?;
88    let sync_engine = Arc::new(Mutex::new(sync_engine_instance));
89
90    network_layer.set_sync_event_sender(sync_event_tx.clone());
91
92    // Create the main application node context.
93    let node = Arc::new(Node {
94        identity,
95        friends: friends.clone(),
96        history: history.clone(),
97        outbox: outbox.clone(),
98        network: network_handle,
99        ui_notify_tx,
100        sync_engine: sync_engine.clone(),
101    });
102
103    println!("Client initialized. Starting network and TUI...\n");
104
105    // Spawn the synchronization engine task.
106    let sync_engine_clone = sync_engine.clone();
107    tokio::spawn(async move {
108        let interval_duration = {
109            let engine = sync_engine_clone.lock().await;
110            engine.interval
111        };
112        let mut interval_timer = tokio::time::interval(interval_duration);
113
114        info!("Starting sync engine with interval {:?}", interval_duration);
115
116        // Perform an initial discovery and sync cycle.
117        {
118            let mut engine = sync_engine_clone.lock().await;
119            if let Err(e) = engine.initial_discovery().await {
120                error!("Initial mailbox discovery failed: {}", e);
121            }
122            if let Err(e) = engine.sync_cycle().await {
123                error!("Initial sync cycle failed: {}", e);
124            }
125        }
126
127        // Main sync loop.
128        loop {
129            tokio::select! {
130                _ = interval_timer.tick() => {
131                    let mut engine = sync_engine_clone.lock().await;
132                    if let Err(e) = engine.sync_cycle().await {
133                        error!("Sync cycle failed: {}", e);
134                    }
135                }
136                event = sync_event_rx.recv() => {
137                    if let Some(event) = event {
138                        let mut engine = sync_engine_clone.lock().await;
139                        if let Err(e) = engine.handle_event(event).await {
140                            error!("Failed to handle sync event: {}", e);
141                        }
142                    } else {
143                        info!("Sync event channel closed, stopping engine.");
144                        break;
145                    }
146                }
147            }
148        }
149    });
150
151    network_layer.set_sync_event_sender(sync_event_tx);
152    network_layer.set_ui_notify_sender(network_notify_tx.clone());
153
154    // Spawn the network layer task.
155    tokio::spawn(async move {
156        if let Err(e) = network_layer.run(incoming_tx).await {
157            error!("Network layer error: {}", e);
158        }
159    });
160
161    // Handle network notifications (e.g., delivery confirmations).
162    let node_for_network = node.clone();
163    let web_notify_tx_for_network = web_notify_tx.clone();
164    tokio::spawn(async move {
165        while let Some(notification) = network_notify_rx.recv().await {
166            match notification {
167                UiNotification::DeliveryStatusUpdate { message_id, new_status } => {
168                    // Update the delivery status in the database.
169                    if let Err(e) = node_for_network.history.update_delivery_status(&message_id, new_status).await {
170                        error!("Failed to update delivery status in database: {}", e);
171                    }
172
173                    // Forward the notification to the web UI.
174                    let _ = web_notify_tx_for_network.send(UiNotification::DeliveryStatusUpdate {
175                        message_id,
176                        new_status,
177                    });
178                }
179                // Forward other notifications as-is.
180                other => {
181                    let _ = web_notify_tx_for_network.send(other);
182                }
183            }
184        }
185    });
186
187    // Handle incoming messages from the network.
188    let node_clone = node.clone();
189    let seen_clone = seen.clone();
190    let web_notify_tx_clone = web_notify_tx.clone();
191    tokio::spawn(async move {
192        while let Some(message) = incoming_rx.recv().await {
193            let already_seen = match seen_clone.is_seen(&message.id).await {
194                Ok(flag) => flag,
195                Err(e) => {
196                    error!("Failed to check if message {} was seen: {}", message.id, e);
197                    false
198                }
199            };
200
201            if already_seen {
202                debug!("Received duplicate message {}, ignoring", message.id);
203                continue;
204            }
205
206            if let Err(e) = node_clone.history.store_message(message.clone()).await {
207                error!("Failed to store incoming message {}: {}", message.id, e);
208                continue;
209            }
210
211            if let Err(e) = seen_clone.mark_seen(message.id).await {
212                error!("Failed to mark message {} as seen: {}", message.id, e);
213            }
214
215            // Send a delivery confirmation back to the sender.
216            let confirmation = crate::types::DeliveryConfirmation {
217                original_message_id: message.id,
218                timestamp: chrono::Utc::now().timestamp_millis(),
219            };
220
221            let confirmation_request = crate::types::ChatRequest::DeliveryConfirmation {
222                confirmation,
223            };
224
225            let network_clone = node_clone.network.clone();
226            let sender = message.sender;
227            tokio::spawn(async move {
228                if let Err(e) = network_clone.send_chat_request(sender, confirmation_request).await
229                {
230                    debug!("Failed to send delivery confirmation: {}", e);
231                }
232            });
233
234            // Notify the UI and web server of the new message.
235            let _ = node_clone
236                .ui_notify_tx
237                .send(UiNotification::NewMessage(message.clone()));
238            let _ = web_notify_tx_clone.send(UiNotification::NewMessage(message));
239        }
240    });
241
242    // Start the web server.
243    let node_for_web = node.clone();
244    tokio::spawn(async move {
245        if let Err(e) = crate::web::start_server(node_for_web, web_port, web_notify_rx).await {
246            error!("Web server error: {}", e);
247        }
248    });
249
250    // Run the terminal UI.
251    run_tui(node, ui_notify_rx, web_port).await
252}