p2p_chat/network/handlers/
chat.rs1use super::super::{NetworkLayer, NetworkResponse};
3use crate::cli::commands::UiNotification;
4use crate::types::{ChatRequest, ChatResponse, DeliveryStatus, Message};
5use anyhow::Result;
6use libp2p::request_response::{self, OutboundRequestId, ResponseChannel};
7use tokio::sync::mpsc;
8use tracing::{error, info, warn};
9
10impl NetworkLayer {
11 pub(super) async fn handle_chat_event(
25 &mut self,
26 event: request_response::Event<ChatRequest, ChatResponse>,
27 incoming_messages: &mpsc::UnboundedSender<Message>,
28 ) -> Result<()> {
29 match event {
30 request_response::Event::Message { message, .. } => match message {
31 request_response::Message::Request {
32 request, channel, ..
33 } => {
34 self.handle_chat_request(request, channel, incoming_messages)
35 .await?;
36 }
37 request_response::Message::Response {
38 request_id,
39 response,
40 } => {
41 self.handle_chat_response(request_id, response).await?;
42 }
43 },
44 request_response::Event::OutboundFailure {
45 request_id, error, ..
46 } => {
47 warn!("Chat request failed: {:?}", error);
48 if let Some(sender) = self.pending_requests.remove(&request_id) {
49 let _ = sender.send(NetworkResponse::Error(format!(
50 "Request failed: {:?}",
51 error
52 )));
53 }
54 }
55 request_response::Event::InboundFailure { error, .. } => {
56 warn!("Chat inbound failure: {:?}", error);
57 }
58 _ => {}
59 }
60
61 Ok(())
62 }
63
64 async fn handle_chat_request(
66 &mut self,
67 request: ChatRequest,
68 channel: ResponseChannel<ChatResponse>,
69 incoming_messages: &mpsc::UnboundedSender<Message>,
70 ) -> Result<()> {
71 match request {
72 ChatRequest::SendMessage { message } => {
73 info!("Received message from {}: {}", message.sender, message.id);
74
75 if let Err(e) = incoming_messages.send(message.clone()) {
76 error!("Failed to forward incoming message: {}", e);
77 let _ = self.swarm.behaviour_mut().chat.send_response(
78 channel,
79 ChatResponse::MessageResult {
80 success: false,
81 message_id: None,
82 },
83 );
84 } else {
85 let _ = self.swarm.behaviour_mut().chat.send_response(
86 channel,
87 ChatResponse::MessageResult {
88 success: true,
89 message_id: Some(message.id),
90 },
91 );
92 }
93 }
94 ChatRequest::DeliveryConfirmation { confirmation } => {
95 info!(
96 "Received delivery confirmation for message: {}",
97 confirmation.original_message_id
98 );
99
100 if let Some(ref ui_tx) = self.ui_notify_tx {
102 let _ = ui_tx.send(UiNotification::DeliveryStatusUpdate {
103 message_id: confirmation.original_message_id,
104 new_status: DeliveryStatus::Delivered,
105 });
106 }
107
108 let _ = self.swarm.behaviour_mut().chat.send_response(
110 channel,
111 ChatResponse::MessageResult {
112 success: true,
113 message_id: Some(confirmation.original_message_id),
114 },
115 );
116 }
117 ChatRequest::ReadReceipt { receipt } => {
118 info!("Received read receipt for message: {}", receipt.message_id);
119
120 if let Some(ref ui_tx) = self.ui_notify_tx {
122 let _ = ui_tx.send(UiNotification::DeliveryStatusUpdate {
123 message_id: receipt.message_id,
124 new_status: DeliveryStatus::Read,
125 });
126 }
127
128 let _ = self.swarm.behaviour_mut().chat.send_response(
130 channel,
131 ChatResponse::MessageResult {
132 success: true,
133 message_id: Some(receipt.message_id),
134 },
135 );
136 }
137 }
138
139 Ok(())
140 }
141
142 async fn handle_chat_response(
144 &mut self,
145 request_id: OutboundRequestId,
146 response: ChatResponse,
147 ) -> Result<()> {
148 if let Some(sender) = self.pending_requests.remove(&request_id) {
149 match response {
150 ChatResponse::MessageResult { success, .. } => {
151 if success {
152 let _ = sender.send(NetworkResponse::MessageSent);
153 } else {
154 let _ = sender.send(NetworkResponse::Error(
155 "Message rejected by peer".to_string(),
156 ));
157 }
158 }
159 }
160 }
161
162 Ok(())
163 }
164}