1use crate::cli::commands::{Node, UiNotification};
3use crate::crypto::{Identity, StorageEncryption};
4use crate::network::NetworkLayer;
5use crate::storage::{
6 MessageHistory, SeenTracker, SledFriendsStore, SledKnownMailboxesStore, SledOutboxStore,
7 SledSeenTracker,
8};
9use crate::sync::{SyncEngine, SyncStores};
10use crate::types::Message;
11use crate::ui::run_tui;
12use anyhow::Result;
13use libp2p::Multiaddr;
14use std::str::FromStr;
15use std::sync::Arc;
16use std::time::Duration;
17use tokio::sync::{mpsc, Mutex};
18use tracing::{debug, error, info};
19
20pub async fn run(
39 identity: Arc<Identity>,
40 db: sled::Db,
41 encryption: Option<StorageEncryption>,
42 port: u16,
43 web_port: u16,
44) -> Result<()> {
45 println!("💬 Starting client mode");
46
47 let friends = Arc::new(SledFriendsStore::new(db.clone(), encryption.clone())?);
49 let history = Arc::new(MessageHistory::new(db.clone(), encryption.clone())?);
50 let outbox = Arc::new(SledOutboxStore::new(db.clone(), encryption.clone())?);
51 let seen = Arc::new(SledSeenTracker::new(db.clone())?);
52 let known_mailboxes = Arc::new(SledKnownMailboxesStore::new(
53 db.clone(),
54 encryption.clone(),
55 )?);
56
57 let listen_addr = Multiaddr::from_str(&format!("/ip4/0.0.0.0/tcp/{}", port))?;
58
59 let bootstrap_nodes = vec![];
60
61 let (mut network_layer, network_handle) =
63 NetworkLayer::new(identity.clone(), listen_addr, false, bootstrap_nodes)?;
64
65 let (incoming_tx, mut incoming_rx) = mpsc::unbounded_channel::<Message>();
67 let (ui_notify_tx, ui_notify_rx) = mpsc::unbounded_channel::<UiNotification>();
68 let (web_notify_tx, web_notify_rx) = mpsc::unbounded_channel::<UiNotification>();
69 let (network_notify_tx, mut network_notify_rx) = mpsc::unbounded_channel::<UiNotification>();
70
71 let sync_stores = SyncStores::new(
72 friends.clone(),
73 outbox.clone(),
74 history.clone(),
75 seen.clone(),
76 known_mailboxes.clone(),
77 );
78
79 let (sync_engine_instance, sync_event_tx, mut sync_event_rx) = SyncEngine::new_with_network(
81 Duration::from_secs(30),
82 identity.clone(),
83 sync_stores,
84 network_handle.clone(),
85 ui_notify_tx.clone(),
86 Some(web_notify_tx.clone()),
87 )?;
88 let sync_engine = Arc::new(Mutex::new(sync_engine_instance));
89
90 network_layer.set_sync_event_sender(sync_event_tx.clone());
91
92 let node = Arc::new(Node {
94 identity,
95 friends: friends.clone(),
96 history: history.clone(),
97 outbox: outbox.clone(),
98 network: network_handle,
99 ui_notify_tx,
100 sync_engine: sync_engine.clone(),
101 });
102
103 println!("Client initialized. Starting network and TUI...\n");
104
105 let sync_engine_clone = sync_engine.clone();
107 tokio::spawn(async move {
108 let interval_duration = {
109 let engine = sync_engine_clone.lock().await;
110 engine.interval
111 };
112 let mut interval_timer = tokio::time::interval(interval_duration);
113
114 info!("Starting sync engine with interval {:?}", interval_duration);
115
116 {
118 let mut engine = sync_engine_clone.lock().await;
119 if let Err(e) = engine.initial_discovery().await {
120 error!("Initial mailbox discovery failed: {}", e);
121 }
122 if let Err(e) = engine.sync_cycle().await {
123 error!("Initial sync cycle failed: {}", e);
124 }
125 }
126
127 loop {
129 tokio::select! {
130 _ = interval_timer.tick() => {
131 let mut engine = sync_engine_clone.lock().await;
132 if let Err(e) = engine.sync_cycle().await {
133 error!("Sync cycle failed: {}", e);
134 }
135 }
136 event = sync_event_rx.recv() => {
137 if let Some(event) = event {
138 let mut engine = sync_engine_clone.lock().await;
139 if let Err(e) = engine.handle_event(event).await {
140 error!("Failed to handle sync event: {}", e);
141 }
142 } else {
143 info!("Sync event channel closed, stopping engine.");
144 break;
145 }
146 }
147 }
148 }
149 });
150
151 network_layer.set_sync_event_sender(sync_event_tx);
152 network_layer.set_ui_notify_sender(network_notify_tx.clone());
153
154 tokio::spawn(async move {
156 if let Err(e) = network_layer.run(incoming_tx).await {
157 error!("Network layer error: {}", e);
158 }
159 });
160
161 let node_for_network = node.clone();
163 let web_notify_tx_for_network = web_notify_tx.clone();
164 tokio::spawn(async move {
165 while let Some(notification) = network_notify_rx.recv().await {
166 match notification {
167 UiNotification::DeliveryStatusUpdate { message_id, new_status } => {
168 if let Err(e) = node_for_network.history.update_delivery_status(&message_id, new_status).await {
170 error!("Failed to update delivery status in database: {}", e);
171 }
172
173 let _ = web_notify_tx_for_network.send(UiNotification::DeliveryStatusUpdate {
175 message_id,
176 new_status,
177 });
178 }
179 other => {
181 let _ = web_notify_tx_for_network.send(other);
182 }
183 }
184 }
185 });
186
187 let node_clone = node.clone();
189 let seen_clone = seen.clone();
190 let web_notify_tx_clone = web_notify_tx.clone();
191 tokio::spawn(async move {
192 while let Some(message) = incoming_rx.recv().await {
193 let already_seen = match seen_clone.is_seen(&message.id).await {
194 Ok(flag) => flag,
195 Err(e) => {
196 error!("Failed to check if message {} was seen: {}", message.id, e);
197 false
198 }
199 };
200
201 if already_seen {
202 debug!("Received duplicate message {}, ignoring", message.id);
203 continue;
204 }
205
206 if let Err(e) = node_clone.history.store_message(message.clone()).await {
207 error!("Failed to store incoming message {}: {}", message.id, e);
208 continue;
209 }
210
211 if let Err(e) = seen_clone.mark_seen(message.id).await {
212 error!("Failed to mark message {} as seen: {}", message.id, e);
213 }
214
215 let confirmation = crate::types::DeliveryConfirmation {
217 original_message_id: message.id,
218 timestamp: chrono::Utc::now().timestamp_millis(),
219 };
220
221 let confirmation_request = crate::types::ChatRequest::DeliveryConfirmation {
222 confirmation,
223 };
224
225 let network_clone = node_clone.network.clone();
226 let sender = message.sender;
227 tokio::spawn(async move {
228 if let Err(e) = network_clone.send_chat_request(sender, confirmation_request).await
229 {
230 debug!("Failed to send delivery confirmation: {}", e);
231 }
232 });
233
234 let _ = node_clone
236 .ui_notify_tx
237 .send(UiNotification::NewMessage(message.clone()));
238 let _ = web_notify_tx_clone.send(UiNotification::NewMessage(message));
239 }
240 });
241
242 let node_for_web = node.clone();
244 tokio::spawn(async move {
245 if let Err(e) = crate::web::start_server(node_for_web, web_port, web_notify_rx).await {
246 error!("Web server error: {}", e);
247 }
248 });
249
250 run_tui(node, ui_notify_rx, web_port).await
252}