pub struct SyncEngine {Show 15 fields
pub interval: Duration,
pub discovered_mailboxes: HashSet<PeerId>,
pub mailbox_performance: HashMap<PeerId, MailboxPerformance>,
pub backoff_manager: BackoffManager,
pub pending_dht_queries: HashMap<QueryId, DhtQueryState>,
pub last_discovery_time: Option<Instant>,
pub identity: Arc<Identity>,
pub friends: Arc<dyn FriendsStore + Send + Sync>,
pub outbox: Arc<dyn OutboxStore + Send + Sync>,
pub history: Arc<dyn MessageStore + Send + Sync>,
pub seen: Arc<dyn SeenTracker + Send + Sync>,
pub known_mailboxes: Arc<dyn KnownMailboxesStore + Send + Sync>,
pub network: Option<NetworkHandle>,
pub ui_notify_tx: UnboundedSender<UiNotification>,
pub web_notify_tx: Option<UnboundedSender<UiNotification>>,
}Expand description
The core synchronization engine.
This struct manages the discovery of mailbox providers, fetching and processing messages, and retrying message deliveries.
Fields§
§interval: DurationThe interval at which the synchronization cycle runs.
discovered_mailboxes: HashSet<PeerId>A set of discovered mailbox PeerIds.
mailbox_performance: HashMap<PeerId, MailboxPerformance>Performance metrics for each discovered mailbox.
backoff_manager: BackoffManagerManages backoff for failing peers.
pending_dht_queries: HashMap<QueryId, DhtQueryState>Stores the state of pending DHT queries.
last_discovery_time: Option<Instant>The Instant of the last mailbox discovery.
identity: Arc<Identity>The local node’s identity.
friends: Arc<dyn FriendsStore + Send + Sync>The store for managing friends.
outbox: Arc<dyn OutboxStore + Send + Sync>The store for managing outgoing messages.
history: Arc<dyn MessageStore + Send + Sync>The store for managing message history.
seen: Arc<dyn SeenTracker + Send + Sync>The tracker for seen messages.
known_mailboxes: Arc<dyn KnownMailboxesStore + Send + Sync>The store for known mailbox providers.
network: Option<NetworkHandle>The network handle for communicating with the NetworkLayer.
ui_notify_tx: UnboundedSender<UiNotification>Sender for UI notifications.
web_notify_tx: Option<UnboundedSender<UiNotification>>Sender for web UI notifications.
Implementations§
Source§impl SyncEngine
impl SyncEngine
Sourcepub fn get_mailbox_providers(&self) -> &HashSet<PeerId>
pub fn get_mailbox_providers(&self) -> &HashSet<PeerId>
Returns a reference to the set of discovered mailbox providers.
Sourcepub fn get_available_mailboxes(&self) -> Vec<PeerId>
pub fn get_available_mailboxes(&self) -> Vec<PeerId>
Returns a ranked list of available mailbox providers.
The ranking is based on performance metrics stored in the SyncEngine.
Sourcepub fn rank_mailboxes_subset(&self, providers: &HashSet<PeerId>) -> Vec<PeerId>
pub fn rank_mailboxes_subset(&self, providers: &HashSet<PeerId>) -> Vec<PeerId>
Returns a ranked list of a subset of mailbox providers.
§Arguments
providers- The subset ofPeerIds to rank.
Sourcepub async fn get_emergency_mailboxes(&self) -> Vec<PeerId>
pub async fn get_emergency_mailboxes(&self) -> Vec<PeerId>
Asynchronously retrieves a list of “emergency” mailboxes.
These are connected peers that are also known mailbox providers.
§Returns
A Vec of PeerIds representing the emergency mailboxes.
Sourcepub(crate) fn should_forget_mailbox(&self, peer_id: PeerId) -> bool
pub(crate) fn should_forget_mailbox(&self, peer_id: PeerId) -> bool
Determines if a mailbox should be forgotten due to poor performance.
This is based on consecutive failures or too many failures within a time window.
Source§impl SyncEngine
impl SyncEngine
Sourcepub async fn discover_mailboxes(&mut self) -> Result<()>
pub async fn discover_mailboxes(&mut self) -> Result<()>
Discovers mailbox providers.
This function attempts to discover new mailbox providers in the DHT.
It is a wrapper around discover_mailboxes_if_needed with force set to false.
§Errors
This function will return an error if the discovery process fails.
Sourceasync fn load_cached_mailboxes(&mut self) -> Result<()>
async fn load_cached_mailboxes(&mut self) -> Result<()>
Loads cached mailboxes from the database into discovered_mailboxes.
§Errors
This function will return an error if loading the cached mailboxes fails.
Sourcepub async fn discover_mailboxes_if_needed(&mut self, force: bool) -> Result<()>
pub async fn discover_mailboxes_if_needed(&mut self, force: bool) -> Result<()>
Discovers mailbox providers in the DHT if needed.
This function checks if there are enough available mailboxes or if a recent discovery has already been performed (rate-limiting). It can be forced to run a discovery regardless of these conditions.
§Arguments
force- Iftrue, a discovery will be performed even if conditions for skipping are met.
§Errors
This function will return an error if the discovery process fails.
Source§impl SyncEngine
impl SyncEngine
Sourcepub async fn handle_dht_query_result(
&mut self,
key: RecordKey,
result: DhtQueryResult,
) -> Result<()>
pub async fn handle_dht_query_result( &mut self, key: RecordKey, result: DhtQueryResult, ) -> Result<()>
Handles the result of a DHT query.
This function processes DhtQueryResults, typically updating the list
of discovered mailbox providers and triggering actions like retrying the outbox.
§Arguments
key- Thekad::RecordKeythat the query was performed for.result- TheDhtQueryResultcontaining the outcome of the query.
§Errors
This function will return an error if processing the result fails, e.g., if there are issues saving a new mailbox to the database.
Source§impl SyncEngine
impl SyncEngine
Sourcepub(crate) fn cleanup_stale_dht_queries(&mut self)
pub(crate) fn cleanup_stale_dht_queries(&mut self)
Cleans up stale DHT queries from the pending list.
Queries older than a defined stale_timeout are removed. This also triggers
a cleanup of old entries in the backoff manager.
Source§impl SyncEngine
impl SyncEngine
Sourcepub(super) fn rank_mailboxes<I>(&self, candidates: I) -> Vec<PeerId>where
I: IntoIterator<Item = PeerId>,
pub(super) fn rank_mailboxes<I>(&self, candidates: I) -> Vec<PeerId>where
I: IntoIterator<Item = PeerId>,
Ranks a given set of candidate mailbox providers.
Providers are filtered by whether they can be attempted (not backed off) and then sorted by their calculated score in descending order.
§Arguments
candidates- An iterator overPeerIds of potential mailbox providers.
§Returns
A Vec of PeerIds, ranked from best to worst.
Sourcefn calculate_mailbox_score(&self, peer_id: PeerId) -> f64
fn calculate_mailbox_score(&self, peer_id: PeerId) -> f64
Calculates a performance score for a single mailbox provider.
The score takes into account:
- Success rate (70% weight)
- Recency of last success (20% weight, with bonus for recent success)
- Average response time (10% weight, bonus for faster responses)
- Penalty for consecutive failures
- A significant penalty if the peer is currently in backoff.
The score is clamped between 0.0 and 1.0.
§Arguments
peer_id- ThePeerIdof the mailbox provider.
§Returns
A f64 representing the performance score of the mailbox.
Source§impl SyncEngine
impl SyncEngine
Sourcepub async fn acknowledge_mailbox_messages(
&self,
msg_ids: Vec<Uuid>,
) -> Result<()>
pub async fn acknowledge_mailbox_messages( &self, msg_ids: Vec<Uuid>, ) -> Result<()>
Acknowledges a list of messages across all known mailbox providers.
This function attempts to send an acknowledgment for each message ID to all discovered mailbox providers, ensuring that the messages are deleted from the mailboxes. It utilizes a retry policy for robustness.
§Arguments
msg_ids- AVecofUuids representing the messages to acknowledge.
§Errors
This function will return an error if network communication fails, but it attempts to acknowledge with multiple mailboxes for resilience.
Source§impl SyncEngine
impl SyncEngine
Sourcepub async fn fetch_from_mailboxes(&mut self) -> Result<()>
pub async fn fetch_from_mailboxes(&mut self) -> Result<()>
Fetches messages from all discovered and available mailbox providers.
This function iterates through available mailboxes, attempting to fetch messages from each. It skips mailboxes that are currently backed off.
§Errors
This function will return an error if fetching from any mailbox fails, but it continues to try other mailboxes.
Sourcepub async fn fetch_from_single_mailbox(
&mut self,
peer_id: PeerId,
) -> Result<Vec<Uuid>>
pub async fn fetch_from_single_mailbox( &mut self, peer_id: PeerId, ) -> Result<Vec<Uuid>>
Fetches messages from a single mailbox provider.
This function attempts to fetch messages from a specified mailbox, processes them, and then acknowledges their receipt. It updates the performance metrics for the mailbox based on the outcome.
§Arguments
peer_id- ThePeerIdof the mailbox provider to fetch from.
§Returns
A Vec of Uuids representing the IDs of processed messages.
§Errors
This function will return an error if fetching or processing messages fails.
Source§impl SyncEngine
impl SyncEngine
Sourcepub async fn process_mailbox_messages(
&self,
messages: Vec<EncryptedMessage>,
) -> Result<Vec<Uuid>>
pub async fn process_mailbox_messages( &self, messages: Vec<EncryptedMessage>, ) -> Result<Vec<Uuid>>
Processes a list of encrypted messages fetched from mailboxes.
This function iterates through the messages, decrypts them, marks them as seen, stores them in the history, sends delivery confirmations, and notifies the UI.
§Arguments
messages- AVecofEncryptedMessages fetched from a mailbox.
§Returns
A Vec of Uuids representing the IDs of messages that were successfully processed.
§Errors
This function will return an error if message decryption, storage, or processing fails.
Sourcepub async fn reconstruct_message_from_mailbox(
&self,
encrypted_msg: &EncryptedMessage,
) -> Result<Message>
pub async fn reconstruct_message_from_mailbox( &self, encrypted_msg: &EncryptedMessage, ) -> Result<Message>
Reconstructs a Message from an EncryptedMessage fetched from a mailbox.
This involves using the local identity’s HPKE context to decrypt the content.
§Arguments
encrypted_msg- TheEncryptedMessageto reconstruct.
§Returns
The reconstructed Message.
§Errors
This function will return an error if decryption fails.
Source§impl SyncEngine
impl SyncEngine
Sourcepub(crate) async fn update_mailbox_performance(
&mut self,
peer_id: PeerId,
success: bool,
response_time: Duration,
)
pub(crate) async fn update_mailbox_performance( &mut self, peer_id: PeerId, success: bool, response_time: Duration, )
Updates the performance metrics for a specific mailbox.
Records whether an interaction was successful or a failure, updates
success/failure counts, last seen timestamps, and average response time.
It also interacts with the BackoffManager and persistent storage.
§Arguments
peer_id- ThePeerIdof the mailbox.success-trueif the interaction was successful,falseotherwise.response_time- The duration of the interaction.
Sourcepub(crate) async fn forget_failing_mailbox(&mut self, peer_id: PeerId)
pub(crate) async fn forget_failing_mailbox(&mut self, peer_id: PeerId)
Temporarily forgets a failing mailbox.
If a mailbox is persistently failing, it is removed from the list of
discovered mailboxes and its failure is recorded in the BackoffManager
and persistent storage.
§Arguments
peer_id- ThePeerIdof the mailbox to forget.
Sourcepub(crate) async fn cleanup_failing_mailboxes(&mut self)
pub(crate) async fn cleanup_failing_mailboxes(&mut self)
Cleans up failing mailboxes.
Iterates through all discovered mailboxes and calls forget_failing_mailbox
for any that meet the criteria for being forgotten.
Source§impl SyncEngine
impl SyncEngine
Sourcepub async fn retry_outbox_for_peer(&self, target_peer: &PeerId) -> Result<()>
pub async fn retry_outbox_for_peer(&self, target_peer: &PeerId) -> Result<()>
Retries sending pending messages from the outbox to a specific peer.
This function fetches all pending messages and attempts to send those
destined for target_peer. Successfully sent messages are removed
from the outbox.
§Arguments
target_peer- ThePeerIdof the peer to retry sending messages to.
§Errors
This function will return an error if there are issues accessing the outbox or sending messages via the network.
Source§impl SyncEngine
impl SyncEngine
Sourcepub(super) async fn forward_pending_message(
&mut self,
network: &NetworkHandle,
message: &Message,
) -> Result<bool>
pub(super) async fn forward_pending_message( &mut self, network: &NetworkHandle, message: &Message, ) -> Result<bool>
Forwards a pending message to available mailbox providers.
This function attempts to encrypt and store a message in at least two mailbox providers for redundancy. It ranks available mailboxes and updates their performance metrics.
§Arguments
network- TheNetworkHandleto use for interacting with the network.message- The message to encrypt and forward.
§Returns
true if the message was successfully forwarded to at least one mailbox,
false otherwise.
§Errors
This function will return an error if the recipient is not a friend or if there are issues with encryption or storage.
Source§impl SyncEngine
impl SyncEngine
Sourcepub async fn retry_outbox(&mut self) -> Result<()>
pub async fn retry_outbox(&mut self) -> Result<()>
Retries sending all pending messages in the outbox.
This function attempts direct delivery to connected peers first. If direct delivery fails or the peer is not connected, it then attempts to forward the message to available mailbox providers.
§Errors
This function will return an error if there are issues accessing the outbox or network. Individual message delivery failures are logged but do not stop the overall retry process.
Sourceasync fn attempt_direct_delivery(
&mut self,
network: &NetworkHandle,
message: &Message,
) -> Result<bool>
async fn attempt_direct_delivery( &mut self, network: &NetworkHandle, message: &Message, ) -> Result<bool>
Attempts to directly deliver a message to its recipient.
This function checks the BackoffManager to see if a direct attempt
is allowed. If successful, the message is removed from the outbox.
§Arguments
network- TheNetworkHandleto use for sending the message.message- The message to attempt direct delivery for.
§Returns
true if direct delivery was successful, false otherwise.
§Errors
This function will return an error if there are issues sending the message.
Source§impl SyncEngine
impl SyncEngine
Sourcepub fn new_with_network(
interval: Duration,
identity: Arc<Identity>,
stores: SyncStores,
network: NetworkHandle,
ui_notify_tx: UnboundedSender<UiNotification>,
web_notify_tx: Option<UnboundedSender<UiNotification>>,
) -> Result<(Self, UnboundedSender<SyncEvent>, UnboundedReceiver<SyncEvent>)>
pub fn new_with_network( interval: Duration, identity: Arc<Identity>, stores: SyncStores, network: NetworkHandle, ui_notify_tx: UnboundedSender<UiNotification>, web_notify_tx: Option<UnboundedSender<UiNotification>>, ) -> Result<(Self, UnboundedSender<SyncEvent>, UnboundedReceiver<SyncEvent>)>
Creates a new SyncEngine with a network handle.
§Arguments
interval- The interval for the synchronization cycle.identity- The local node’s identity.stores- A collection of storage implementations.network- The network handle.ui_notify_tx- Sender for UI notifications.web_notify_tx- Sender for web UI notifications.
§Returns
A Result containing a tuple of the SyncEngine instance, an event sender,
and an event receiver.
Sourcepub async fn initial_discovery(&mut self) -> Result<()>
pub async fn initial_discovery(&mut self) -> Result<()>
Performs an initial discovery of mailbox providers on startup.
§Errors
Returns an error if the discovery process fails.
Sourcepub async fn sync_cycle(&mut self) -> Result<()>
pub async fn sync_cycle(&mut self) -> Result<()>
Runs a single synchronization cycle.
This includes discovering mailboxes, fetching messages, retrying outbox messages, and cleaning up old data.
§Errors
Returns an error if any part of the synchronization cycle fails.
Sourcepub async fn handle_event(&mut self, event: SyncEvent) -> Result<()>
pub async fn handle_event(&mut self, event: SyncEvent) -> Result<()>
Auto Trait Implementations§
impl Freeze for SyncEngine
impl !RefUnwindSafe for SyncEngine
impl Send for SyncEngine
impl Sync for SyncEngine
impl Unpin for SyncEngine
impl !UnwindSafe for SyncEngine
Blanket Implementations§
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Source§impl<T> Instrument for T
impl<T> Instrument for T
Source§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
Source§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
Source§impl<T> IntoEither for T
impl<T> IntoEither for T
Source§fn into_either(self, into_left: bool) -> Either<Self, Self>
fn into_either(self, into_left: bool) -> Either<Self, Self>
self into a Left variant of Either<Self, Self>
if into_left is true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read moreSource§fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
self into a Left variant of Either<Self, Self>
if into_left(&self) returns true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read more