p2p_chat/sync/engine/
mod.rs

1//! This module contains the core synchronization engine of the application.
2//!
3//! The `SyncEngine` is responsible for discovering mailbox providers, fetching
4//! and processing messages from mailboxes, retrying failed message deliveries,
5//! and maintaining the reliability of mailbox interactions.
6use crate::cli::UiNotification;
7use crate::crypto::Identity;
8use crate::network::NetworkHandle;
9use crate::storage::{FriendsStore, KnownMailboxesStore, MessageStore, OutboxStore, SeenTracker};
10use crate::sync::backoff::BackoffManager;
11use anyhow::Result;
12use libp2p::{kad, PeerId};
13use std::collections::{HashMap, HashSet};
14use std::sync::Arc;
15use std::time::{Duration, Instant};
16use tokio::sync::mpsc;
17use tracing::{debug, error, info, trace, warn};
18
19mod discovery;
20mod events;
21mod mailbox;
22mod outbox;
23mod performance;
24
25pub use events::{DhtQueryResult, SyncEvent};
26use performance::MailboxPerformance;
27
28/// The core synchronization engine.
29///
30/// This struct manages the discovery of mailbox providers, fetching and processing
31/// messages, and retrying message deliveries.
32pub struct SyncEngine {
33    /// The interval at which the synchronization cycle runs.
34    pub interval: Duration,
35    /// A set of discovered mailbox `PeerId`s.
36    pub discovered_mailboxes: HashSet<PeerId>,
37    /// Performance metrics for each discovered mailbox.
38    pub mailbox_performance: HashMap<PeerId, MailboxPerformance>,
39    /// Manages backoff for failing peers.
40    pub backoff_manager: BackoffManager,
41    /// Stores the state of pending DHT queries.
42    pub pending_dht_queries: HashMap<kad::QueryId, DhtQueryState>,
43    /// The `Instant` of the last mailbox discovery.
44    pub last_discovery_time: Option<Instant>,
45    /// The local node's identity.
46    pub identity: Arc<Identity>,
47    /// The store for managing friends.
48    pub friends: Arc<dyn FriendsStore + Send + Sync>,
49    /// The store for managing outgoing messages.
50    pub outbox: Arc<dyn OutboxStore + Send + Sync>,
51    /// The store for managing message history.
52    pub history: Arc<dyn MessageStore + Send + Sync>,
53    /// The tracker for seen messages.
54    pub seen: Arc<dyn SeenTracker + Send + Sync>,
55    /// The store for known mailbox providers.
56    pub known_mailboxes: Arc<dyn KnownMailboxesStore + Send + Sync>,
57    /// The network handle for communicating with the `NetworkLayer`.
58    pub network: Option<NetworkHandle>,
59    /// Sender for UI notifications.
60    pub ui_notify_tx: mpsc::UnboundedSender<UiNotification>,
61    /// Sender for web UI notifications.
62    pub web_notify_tx: Option<mpsc::UnboundedSender<UiNotification>>,
63}
64
65/// A collection of storage traits used by the `SyncEngine`.
66#[derive(Clone)]
67pub struct SyncStores {
68    /// The friends store.
69    pub friends: Arc<dyn FriendsStore + Send + Sync>,
70    /// The outbox store.
71    pub outbox: Arc<dyn OutboxStore + Send + Sync>,
72    /// The message history store.
73    pub history: Arc<dyn MessageStore + Send + Sync>,
74    /// The seen messages tracker.
75    pub seen: Arc<dyn SeenTracker + Send + Sync>,
76    /// The known mailboxes store.
77    pub known_mailboxes: Arc<dyn KnownMailboxesStore + Send + Sync>,
78}
79
80impl SyncStores {
81    /// Creates a new `SyncStores` instance.
82    pub fn new(
83        friends: Arc<dyn FriendsStore + Send + Sync>,
84        outbox: Arc<dyn OutboxStore + Send + Sync>,
85        history: Arc<dyn MessageStore + Send + Sync>,
86        seen: Arc<dyn SeenTracker + Send + Sync>,
87        known_mailboxes: Arc<dyn KnownMailboxesStore + Send + Sync>,
88    ) -> Self {
89        Self {
90            friends,
91            outbox,
92            history,
93            seen,
94            known_mailboxes,
95        }
96    }
97}
98
99/// Represents the state of a pending Kademlia DHT query.
100#[derive(Debug, Clone)]
101pub struct DhtQueryState {
102    /// The key being queried.
103    pub key: kad::RecordKey,
104    /// The `Instant` when the query was started.
105    pub started_at: Instant,
106    /// Whether any results have been received for this query.
107    pub received_results: bool,
108}
109
110impl SyncEngine {
111    /// Creates a new `SyncEngine` with a network handle.
112    ///
113    /// # Arguments
114    ///
115    /// * `interval` - The interval for the synchronization cycle.
116    /// * `identity` - The local node's identity.
117    /// * `stores` - A collection of storage implementations.
118    /// * `network` - The network handle.
119    /// * `ui_notify_tx` - Sender for UI notifications.
120    /// * `web_notify_tx` - Sender for web UI notifications.
121    ///
122    /// # Returns
123    ///
124    /// A `Result` containing a tuple of the `SyncEngine` instance, an event sender,
125    /// and an event receiver.
126    pub fn new_with_network(
127        interval: Duration,
128        identity: Arc<Identity>,
129        stores: SyncStores,
130        network: NetworkHandle,
131        ui_notify_tx: mpsc::UnboundedSender<UiNotification>,
132        web_notify_tx: Option<mpsc::UnboundedSender<UiNotification>>,
133    ) -> Result<(
134        Self,
135        mpsc::UnboundedSender<SyncEvent>,
136        mpsc::UnboundedReceiver<SyncEvent>,
137    )> {
138        let SyncStores {
139            friends,
140            outbox,
141            history,
142            seen,
143            known_mailboxes,
144        } = stores;
145        let (event_tx, event_rx) = mpsc::unbounded_channel();
146        let engine = Self {
147            interval: if interval.is_zero() {
148                Duration::from_secs(5)
149            } else {
150                interval
151            },
152            discovered_mailboxes: HashSet::new(),
153            mailbox_performance: HashMap::new(),
154            backoff_manager: BackoffManager::new(),
155            pending_dht_queries: HashMap::new(),
156            last_discovery_time: None,
157            identity,
158            friends,
159            outbox,
160            history,
161            seen,
162            known_mailboxes,
163            network: Some(network),
164            ui_notify_tx,
165            web_notify_tx,
166        };
167        Ok((engine, event_tx, event_rx))
168    }
169
170    /// Performs an initial discovery of mailbox providers on startup.
171    ///
172    /// # Errors
173    ///
174    /// Returns an error if the discovery process fails.
175    pub async fn initial_discovery(&mut self) -> Result<()> {
176        debug!("Performing initial mailbox discovery on startup");
177
178        if let Err(e) = self.discover_mailboxes_if_needed(true).await {
179            warn!("Initial mailbox discovery failed: {}", e);
180        }
181
182        Ok(())
183    }
184
185    /// Runs a single synchronization cycle.
186    ///
187    /// This includes discovering mailboxes, fetching messages, retrying outbox
188    /// messages, and cleaning up old data.
189    ///
190    /// # Errors
191    ///
192    /// Returns an error if any part of the synchronization cycle fails.
193    pub async fn sync_cycle(&mut self) -> Result<()> {
194        trace!("Starting sync cycle");
195
196        if let Err(e) = self.discover_mailboxes_if_needed(false).await {
197            error!("Failed to discover mailboxes: {}", e);
198        }
199
200        if let Err(e) = self.fetch_from_mailboxes().await {
201            error!("Failed to fetch from mailboxes: {}", e);
202        }
203
204        if let Err(e) = self.retry_outbox().await {
205            error!("Failed to retry outbox: {}", e);
206        }
207
208        if let Err(e) = self
209            .seen
210            .cleanup_old(Duration::from_secs(7 * 24 * 60 * 60))
211            .await
212        {
213            error!("Failed to cleanup seen entries: {}", e);
214        }
215
216        self.cleanup_failing_mailboxes().await;
217        self.cleanup_stale_dht_queries();
218
219        trace!("Sync cycle completed");
220        Ok(())
221    }
222
223    /// Handles an incoming `SyncEvent`.
224    ///
225    /// This function processes various events related to peer connections and
226    /// DHT query results, triggering appropriate synchronization actions.
227    ///
228    /// # Arguments
229    ///
230    /// * `event` - The `SyncEvent` to handle.
231    ///
232    /// # Errors
233    ///
234    /// Returns an error if handling the event fails.
235    pub async fn handle_event(&mut self, event: SyncEvent) -> Result<()> {
236        match event {
237            SyncEvent::PeerConnected(peer_id) => {
238                debug!(
239                    "Peer {} connected, retrying outbox messages and checking for mailboxes",
240                    peer_id
241                );
242
243                self.discover_mailboxes_if_needed(false).await?;
244                self.retry_outbox_for_peer(&peer_id).await?;
245
246                if self.discovered_mailboxes.contains(&peer_id) {
247                    info!(
248                        "Connected to known mailbox provider {}, triggering instant fetch.",
249                        peer_id
250                    );
251                    if let Err(e) = self.fetch_from_single_mailbox(peer_id).await {
252                        error!("Instant fetch from mailbox {} failed: {}", peer_id, e);
253                    }
254                }
255            }
256            SyncEvent::PeerConnectionFailed(peer_id) => {
257                if self.discovered_mailboxes.contains(&peer_id) {
258                    debug!(
259                        "Connection failed to known mailbox {}, tracking failure",
260                        peer_id
261                    );
262
263                    self.update_mailbox_performance(peer_id, false, Duration::from_millis(2000)).await;
264
265                    if self.should_forget_mailbox(peer_id) {
266                        self.forget_failing_mailbox(peer_id).await;
267                    }
268                } else {
269                    trace!(
270                        "Connection failed to peer {} (not a known mailbox)",
271                        peer_id
272                    );
273                }
274            }
275            SyncEvent::DhtQueryResult { query_id, result } => {
276                if let Some(query_state) = self.pending_dht_queries.get_mut(&query_id) {
277                    query_state.received_results = true;
278                    let key = query_state.key.clone();
279
280                    let should_remove = match &result {
281                        DhtQueryResult::ProvidersFound { finished, .. } => *finished,
282                        DhtQueryResult::QueryFailed { .. } => true,
283                    };
284
285                    if should_remove {
286                        self.pending_dht_queries.remove(&query_id);
287                    }
288
289                    self.handle_dht_query_result(key, result).await?;
290                } else {
291                    debug!(
292                        "Received DHT query result for unknown query: {:?}",
293                        query_id
294                    );
295                }
296            }
297        }
298        Ok(())
299    }
300}