p2p_chat/network/handlers/
chat.rs

1//! This module contains the handlers for chat-related network events.
2use super::super::{NetworkLayer, NetworkResponse};
3use crate::cli::commands::UiNotification;
4use crate::types::{ChatRequest, ChatResponse, DeliveryStatus, Message};
5use anyhow::Result;
6use libp2p::request_response::{self, OutboundRequestId, ResponseChannel};
7use tokio::sync::mpsc;
8use tracing::{error, info, warn};
9
10impl NetworkLayer {
11    /// Handles an event from the `ChatBehaviour`.
12    ///
13    /// This function is called when an event is received from the `ChatBehaviour`.
14    /// It dispatches the event to the appropriate handler.
15    ///
16    /// # Arguments
17    ///
18    /// * `event` - The `request_response::Event<ChatRequest, ChatResponse>` to handle.
19    /// * `incoming_messages` - The sender for incoming chat messages.
20    ///
21    /// # Errors
22    ///
23    /// This function will return an error if handling the event fails.
24    pub(super) async fn handle_chat_event(
25        &mut self,
26        event: request_response::Event<ChatRequest, ChatResponse>,
27        incoming_messages: &mpsc::UnboundedSender<Message>,
28    ) -> Result<()> {
29        match event {
30            request_response::Event::Message { message, .. } => match message {
31                request_response::Message::Request {
32                    request, channel, ..
33                } => {
34                    self.handle_chat_request(request, channel, incoming_messages)
35                        .await?;
36                }
37                request_response::Message::Response {
38                    request_id,
39                    response,
40                } => {
41                    self.handle_chat_response(request_id, response).await?;
42                }
43            },
44            request_response::Event::OutboundFailure {
45                request_id, error, ..
46            } => {
47                warn!("Chat request failed: {:?}", error);
48                if let Some(sender) = self.pending_requests.remove(&request_id) {
49                    let _ = sender.send(NetworkResponse::Error(format!(
50                        "Request failed: {:?}",
51                        error
52                    )));
53                }
54            }
55            request_response::Event::InboundFailure { error, .. } => {
56                warn!("Chat inbound failure: {:?}", error);
57            }
58            _ => {}
59        }
60
61        Ok(())
62    }
63
64    /// Handles an inbound chat request.
65    async fn handle_chat_request(
66        &mut self,
67        request: ChatRequest,
68        channel: ResponseChannel<ChatResponse>,
69        incoming_messages: &mpsc::UnboundedSender<Message>,
70    ) -> Result<()> {
71        match request {
72            ChatRequest::SendMessage { message } => {
73                info!("Received message from {}: {}", message.sender, message.id);
74
75                if let Err(e) = incoming_messages.send(message.clone()) {
76                    error!("Failed to forward incoming message: {}", e);
77                    let _ = self.swarm.behaviour_mut().chat.send_response(
78                        channel,
79                        ChatResponse::MessageResult {
80                            success: false,
81                            message_id: None,
82                        },
83                    );
84                } else {
85                    let _ = self.swarm.behaviour_mut().chat.send_response(
86                        channel,
87                        ChatResponse::MessageResult {
88                            success: true,
89                            message_id: Some(message.id),
90                        },
91                    );
92                }
93            }
94            ChatRequest::DeliveryConfirmation { confirmation } => {
95                info!(
96                    "Received delivery confirmation for message: {}",
97                    confirmation.original_message_id
98                );
99
100                // Notify the UI about the delivery status update.
101                if let Some(ref ui_tx) = self.ui_notify_tx {
102                    let _ = ui_tx.send(UiNotification::DeliveryStatusUpdate {
103                        message_id: confirmation.original_message_id,
104                        new_status: DeliveryStatus::Delivered,
105                    });
106                }
107
108                // Send a success response.
109                let _ = self.swarm.behaviour_mut().chat.send_response(
110                    channel,
111                    ChatResponse::MessageResult {
112                        success: true,
113                        message_id: Some(confirmation.original_message_id),
114                    },
115                );
116            }
117            ChatRequest::ReadReceipt { receipt } => {
118                info!("Received read receipt for message: {}", receipt.message_id);
119
120                // Notify the UI about the read status update.
121                if let Some(ref ui_tx) = self.ui_notify_tx {
122                    let _ = ui_tx.send(UiNotification::DeliveryStatusUpdate {
123                        message_id: receipt.message_id,
124                        new_status: DeliveryStatus::Read,
125                    });
126                }
127
128                // Send a success response.
129                let _ = self.swarm.behaviour_mut().chat.send_response(
130                    channel,
131                    ChatResponse::MessageResult {
132                        success: true,
133                        message_id: Some(receipt.message_id),
134                    },
135                );
136            }
137        }
138
139        Ok(())
140    }
141
142    /// Handles an outbound chat response.
143    async fn handle_chat_response(
144        &mut self,
145        request_id: OutboundRequestId,
146        response: ChatResponse,
147    ) -> Result<()> {
148        if let Some(sender) = self.pending_requests.remove(&request_id) {
149            match response {
150                ChatResponse::MessageResult { success, .. } => {
151                    if success {
152                        let _ = sender.send(NetworkResponse::MessageSent);
153                    } else {
154                        let _ = sender.send(NetworkResponse::Error(
155                            "Message rejected by peer".to_string(),
156                        ));
157                    }
158                }
159            }
160        }
161
162        Ok(())
163    }
164}