yamux/connection/stream/
flow_control.rs1use std::{cmp, sync::Arc};
2
3use parking_lot::Mutex;
4use web_time::Instant;
5
6use crate::{connection::rtt::Rtt, Config, DEFAULT_CREDIT};
7
8#[derive(Debug)]
9pub(crate) struct FlowController {
10 config: Arc<Config>,
11 last_window_update: Instant,
12 rtt: Rtt,
14 accumulated_max_stream_windows: Arc<Mutex<usize>>,
16 receive_window: u32,
17 max_receive_window: u32,
18 send_window: u32,
19}
20
21impl FlowController {
22 pub(crate) fn new(
23 receive_window: u32,
24 send_window: u32,
25 accumulated_max_stream_windows: Arc<Mutex<usize>>,
26 rtt: Rtt,
27 config: Arc<Config>,
28 ) -> Self {
29 Self {
30 receive_window,
31 send_window,
32 config,
33 rtt,
34 accumulated_max_stream_windows,
35 max_receive_window: DEFAULT_CREDIT,
36 last_window_update: Instant::now(),
37 }
38 }
39
40 pub(crate) fn next_window_update(&mut self, buffer_len: usize) -> Option<u32> {
45 self.assert_invariants(buffer_len);
46
47 let bytes_received = self.max_receive_window - self.receive_window;
48 let mut next_window_update =
49 bytes_received.saturating_sub(buffer_len.try_into().unwrap_or(u32::MAX));
50
51 if next_window_update < self.max_receive_window / 2 {
53 return None;
54 }
55
56 log::trace!(
57 "received {} mb in {} seconds ({} mbit/s)",
58 next_window_update as f64 / crate::MIB as f64,
59 self.last_window_update.elapsed().as_secs_f64(),
60 next_window_update as f64 / crate::MIB as f64 * 8.0
61 / self.last_window_update.elapsed().as_secs_f64()
62 );
63
64 if self
82 .rtt
83 .get()
84 .map(|rtt| self.last_window_update.elapsed() < rtt * 2)
85 .unwrap_or(false)
86 {
87 let mut accumulated_max_stream_windows = self.accumulated_max_stream_windows.lock();
88
89 let new_max = self.max_receive_window.saturating_mul(2);
91
92 let new_max = {
94 let connection_limit: usize = self.max_receive_window as usize +
95 (self.config.max_connection_receive_window.unwrap_or(usize::MAX)
97 - self.config.max_num_streams * DEFAULT_CREDIT as usize
99 - *accumulated_max_stream_windows);
102
103 cmp::min(new_max, connection_limit.try_into().unwrap_or(u32::MAX))
104 };
105
106 *accumulated_max_stream_windows += (new_max - self.max_receive_window) as usize;
108 drop(accumulated_max_stream_windows);
109
110 log::debug!(
111 "old window_max: {} mb, new window_max: {} mb",
112 self.max_receive_window as f64 / crate::MIB as f64,
113 new_max as f64 / crate::MIB as f64
114 );
115
116 self.max_receive_window = new_max;
117
118 let bytes_received = self.max_receive_window - self.receive_window;
120 next_window_update =
121 bytes_received.saturating_sub(buffer_len.try_into().unwrap_or(u32::MAX));
122 }
123
124 self.last_window_update = Instant::now();
125 self.receive_window += next_window_update;
126
127 self.assert_invariants(buffer_len);
128
129 Some(next_window_update)
130 }
131
132 fn assert_invariants(&self, buffer_len: usize) {
133 if !cfg!(debug_assertions) {
134 return;
135 }
136
137 let config = &self.config;
138 let rtt = self.rtt.get();
139 let accumulated_max_stream_windows = *self.accumulated_max_stream_windows.lock();
140
141 assert!(
142 buffer_len <= self.max_receive_window as usize,
143 "The current buffer size never exceeds the maximum stream receive window."
144 );
145 assert!(
146 self.receive_window <= self.max_receive_window,
147 "The current window never exceeds the maximum."
148 );
149 assert!(
150 (self.max_receive_window - DEFAULT_CREDIT) as usize
151 <= config.max_connection_receive_window.unwrap_or(usize::MAX)
152 - config.max_num_streams * DEFAULT_CREDIT as usize,
153 "The maximum never exceeds its maximum portion of the configured connection limit."
154 );
155 assert!(
156 (self.max_receive_window - DEFAULT_CREDIT) as usize
157 <= accumulated_max_stream_windows,
158 "The amount by which the stream maximum exceeds DEFAULT_CREDIT is tracked in accumulated_max_stream_windows."
159 );
160 if rtt.is_none() {
161 assert_eq!(
162 self.max_receive_window, DEFAULT_CREDIT,
163 "The maximum is only increased iff an rtt measurement is available."
164 );
165 }
166 }
167
168 pub(crate) fn send_window(&self) -> u32 {
169 self.send_window
170 }
171
172 pub(crate) fn consume_send_window(&mut self, i: u32) {
173 self.send_window = self
174 .send_window
175 .checked_sub(i)
176 .expect("not exceed send window");
177 }
178
179 pub(crate) fn increase_send_window_by(&mut self, i: u32) {
180 self.send_window = self
181 .send_window
182 .checked_add(i)
183 .expect("send window not to exceed u32");
184 }
185
186 pub(crate) fn receive_window(&self) -> u32 {
187 self.receive_window
188 }
189
190 pub(crate) fn consume_receive_window(&mut self, i: u32) {
191 self.receive_window = self
192 .receive_window
193 .checked_sub(i)
194 .expect("not exceed receive window");
195 }
196}
197
198impl Drop for FlowController {
199 fn drop(&mut self) {
200 let mut accumulated_max_stream_windows = self.accumulated_max_stream_windows.lock();
201
202 debug_assert!(
203 *accumulated_max_stream_windows >= (self.max_receive_window - DEFAULT_CREDIT) as usize,
204 "{accumulated_max_stream_windows} {}",
205 self.max_receive_window
206 );
207
208 *accumulated_max_stream_windows -= (self.max_receive_window - DEFAULT_CREDIT) as usize;
209 }
210}
211
212#[cfg(test)]
213mod tests {
214 use super::*;
215 use quickcheck::{GenRange, QuickCheck};
216 use web_time::Duration;
217
218 #[derive(Debug)]
219 struct Input {
220 controller: FlowController,
221 buffer_len: usize,
222 }
223
224 #[cfg(test)]
225 impl Clone for Input {
226 fn clone(&self) -> Self {
227 Self {
228 controller: FlowController {
229 config: self.controller.config.clone(),
230 accumulated_max_stream_windows: Arc::new(Mutex::new(
231 *self.controller.accumulated_max_stream_windows.lock(),
232 )),
233 rtt: self.controller.rtt.clone(),
234 last_window_update: self.controller.last_window_update,
235 receive_window: self.controller.receive_window,
236 max_receive_window: self.controller.max_receive_window,
237 send_window: self.controller.send_window,
238 },
239 buffer_len: self.buffer_len,
240 }
241 }
242 }
243
244 impl quickcheck::Arbitrary for Input {
245 fn arbitrary(g: &mut quickcheck::Gen) -> Self {
246 let config = Arc::new(Config::arbitrary(g));
247 let rtt = Rtt::arbitrary(g);
248
249 let max_connection_minus_default =
250 config.max_connection_receive_window.unwrap_or(usize::MAX)
251 - (config.max_num_streams * (DEFAULT_CREDIT as usize));
252
253 let max_receive_window = if rtt.get().is_none() {
254 DEFAULT_CREDIT
255 } else {
256 g.gen_range(
257 DEFAULT_CREDIT
258 ..(DEFAULT_CREDIT as usize)
259 .saturating_add(max_connection_minus_default)
260 .try_into()
261 .unwrap_or(u32::MAX)
262 .saturating_add(1),
263 )
264 };
265 let receive_window = g.gen_range(0..max_receive_window);
266 let buffer_len = g.gen_range(0..max_receive_window as usize);
267 let accumulated_max_stream_windows = Arc::new(Mutex::new(g.gen_range(
268 (max_receive_window - DEFAULT_CREDIT) as usize
269 ..max_connection_minus_default.saturating_add(1),
270 )));
271 let last_window_update =
272 Instant::now() - Duration::from_secs(g.gen_range(0..(60 * 60 * 24)));
273 let send_window = g.gen_range(0..u32::MAX);
274
275 Self {
276 controller: FlowController {
277 accumulated_max_stream_windows,
278 rtt,
279 last_window_update,
280 config,
281 receive_window,
282 max_receive_window,
283 send_window,
284 },
285 buffer_len,
286 }
287 }
288 }
289
290 #[test]
291 fn next_window_update() {
292 fn property(
293 Input {
294 mut controller,
295 buffer_len,
296 }: Input,
297 ) {
298 controller.next_window_update(buffer_len);
299 }
300
301 QuickCheck::new().quickcheck(property as fn(_))
302 }
303}