yamux/connection/stream/
flow_control.rs

1use std::{cmp, sync::Arc};
2
3use parking_lot::Mutex;
4use web_time::Instant;
5
6use crate::{connection::rtt::Rtt, Config, DEFAULT_CREDIT};
7
8#[derive(Debug)]
9pub(crate) struct FlowController {
10    config: Arc<Config>,
11    last_window_update: Instant,
12    /// See [`Connection::rtt`].
13    rtt: Rtt,
14    /// See [`Connection::accumulated_max_stream_windows`].
15    accumulated_max_stream_windows: Arc<Mutex<usize>>,
16    receive_window: u32,
17    max_receive_window: u32,
18    send_window: u32,
19}
20
21impl FlowController {
22    pub(crate) fn new(
23        receive_window: u32,
24        send_window: u32,
25        accumulated_max_stream_windows: Arc<Mutex<usize>>,
26        rtt: Rtt,
27        config: Arc<Config>,
28    ) -> Self {
29        Self {
30            receive_window,
31            send_window,
32            config,
33            rtt,
34            accumulated_max_stream_windows,
35            max_receive_window: DEFAULT_CREDIT,
36            last_window_update: Instant::now(),
37        }
38    }
39
40    /// Calculate the number of additional window bytes the receiving side (local) should grant the
41    /// sending side (remote) via a window update message.
42    ///
43    /// Returns `None` if too small to justify a window update message.
44    pub(crate) fn next_window_update(&mut self, buffer_len: usize) -> Option<u32> {
45        self.assert_invariants(buffer_len);
46
47        let bytes_received = self.max_receive_window - self.receive_window;
48        let mut next_window_update =
49            bytes_received.saturating_sub(buffer_len.try_into().unwrap_or(u32::MAX));
50
51        // Don't send an update in case half or more of the window is still available to the sender.
52        if next_window_update < self.max_receive_window / 2 {
53            return None;
54        }
55
56        log::trace!(
57            "received {} mb in {} seconds ({} mbit/s)",
58            next_window_update as f64 / crate::MIB as f64,
59            self.last_window_update.elapsed().as_secs_f64(),
60            next_window_update as f64 / crate::MIB as f64 * 8.0
61                / self.last_window_update.elapsed().as_secs_f64()
62        );
63
64        // Auto-tuning `max_receive_window`
65        //
66        // The ideal `max_receive_window` is equal to the bandwidth-delay-product (BDP), thus
67        // allowing the remote sender to exhaust the entire available bandwidth on a single stream.
68        // Choosing `max_receive_window` too small prevents the remote sender from exhausting the
69        // available bandwidth. Choosing `max_receive_window` to large is wasteful and delays
70        // backpressure from the receiver to the sender on the stream.
71        //
72        // In case the remote sender has exhausted half or more of its credit in less than 2
73        // round-trips, try to double `max_receive_window`.
74        //
75        // For simplicity `max_receive_window` is never decreased.
76        //
77        // This implementation is heavily influenced by QUIC. See document below for rational on the
78        // above strategy.
79        //
80        // https://docs.google.com/document/d/1F2YfdDXKpy20WVKJueEf4abn_LVZHhMUMS5gX6Pgjl4/edit?usp=sharing
81        if self
82            .rtt
83            .get()
84            .map(|rtt| self.last_window_update.elapsed() < rtt * 2)
85            .unwrap_or(false)
86        {
87            let mut accumulated_max_stream_windows = self.accumulated_max_stream_windows.lock();
88
89            // Ideally one can just double it:
90            let new_max = self.max_receive_window.saturating_mul(2);
91
92            // But one has to consider the configured connection limit:
93            let new_max = {
94                let connection_limit: usize = self.max_receive_window as usize +
95                    // the overall configured conneciton limit
96                    (self.config.max_connection_receive_window.unwrap_or(usize::MAX)
97                    // minus the minimum amount of window guaranteed to each stream
98                    - self.config.max_num_streams * DEFAULT_CREDIT as usize
99                    // minus the amount of bytes beyond the minimum amount (`DEFAULT_CREDIT`)
100                    // already allocated by this and other streams on the connection.
101                    - *accumulated_max_stream_windows);
102
103                cmp::min(new_max, connection_limit.try_into().unwrap_or(u32::MAX))
104            };
105
106            // Account for the additional credit on the accumulated connection counter.
107            *accumulated_max_stream_windows += (new_max - self.max_receive_window) as usize;
108            drop(accumulated_max_stream_windows);
109
110            log::debug!(
111                "old window_max: {} mb, new window_max: {} mb",
112                self.max_receive_window as f64 / crate::MIB as f64,
113                new_max as f64 / crate::MIB as f64
114            );
115
116            self.max_receive_window = new_max;
117
118            // Recalculate `next_window_update` with the new `max_receive_window`.
119            let bytes_received = self.max_receive_window - self.receive_window;
120            next_window_update =
121                bytes_received.saturating_sub(buffer_len.try_into().unwrap_or(u32::MAX));
122        }
123
124        self.last_window_update = Instant::now();
125        self.receive_window += next_window_update;
126
127        self.assert_invariants(buffer_len);
128
129        Some(next_window_update)
130    }
131
132    fn assert_invariants(&self, buffer_len: usize) {
133        if !cfg!(debug_assertions) {
134            return;
135        }
136
137        let config = &self.config;
138        let rtt = self.rtt.get();
139        let accumulated_max_stream_windows = *self.accumulated_max_stream_windows.lock();
140
141        assert!(
142            buffer_len <= self.max_receive_window as usize,
143            "The current buffer size never exceeds the maximum stream receive window."
144        );
145        assert!(
146            self.receive_window <= self.max_receive_window,
147            "The current window never exceeds the maximum."
148        );
149        assert!(
150            (self.max_receive_window - DEFAULT_CREDIT) as usize
151                <= config.max_connection_receive_window.unwrap_or(usize::MAX)
152                    - config.max_num_streams * DEFAULT_CREDIT as usize,
153            "The maximum never exceeds its maximum portion of the configured connection limit."
154        );
155        assert!(
156            (self.max_receive_window - DEFAULT_CREDIT) as usize
157                <= accumulated_max_stream_windows,
158            "The amount by which the stream maximum exceeds DEFAULT_CREDIT is tracked in accumulated_max_stream_windows."
159        );
160        if rtt.is_none() {
161            assert_eq!(
162                self.max_receive_window, DEFAULT_CREDIT,
163                "The maximum is only increased iff an rtt measurement is available."
164            );
165        }
166    }
167
168    pub(crate) fn send_window(&self) -> u32 {
169        self.send_window
170    }
171
172    pub(crate) fn consume_send_window(&mut self, i: u32) {
173        self.send_window = self
174            .send_window
175            .checked_sub(i)
176            .expect("not exceed send window");
177    }
178
179    pub(crate) fn increase_send_window_by(&mut self, i: u32) {
180        self.send_window = self
181            .send_window
182            .checked_add(i)
183            .expect("send window not to exceed u32");
184    }
185
186    pub(crate) fn receive_window(&self) -> u32 {
187        self.receive_window
188    }
189
190    pub(crate) fn consume_receive_window(&mut self, i: u32) {
191        self.receive_window = self
192            .receive_window
193            .checked_sub(i)
194            .expect("not exceed receive window");
195    }
196}
197
198impl Drop for FlowController {
199    fn drop(&mut self) {
200        let mut accumulated_max_stream_windows = self.accumulated_max_stream_windows.lock();
201
202        debug_assert!(
203            *accumulated_max_stream_windows >= (self.max_receive_window - DEFAULT_CREDIT) as usize,
204            "{accumulated_max_stream_windows} {}",
205            self.max_receive_window
206        );
207
208        *accumulated_max_stream_windows -= (self.max_receive_window - DEFAULT_CREDIT) as usize;
209    }
210}
211
212#[cfg(test)]
213mod tests {
214    use super::*;
215    use quickcheck::{GenRange, QuickCheck};
216    use web_time::Duration;
217
218    #[derive(Debug)]
219    struct Input {
220        controller: FlowController,
221        buffer_len: usize,
222    }
223
224    #[cfg(test)]
225    impl Clone for Input {
226        fn clone(&self) -> Self {
227            Self {
228                controller: FlowController {
229                    config: self.controller.config.clone(),
230                    accumulated_max_stream_windows: Arc::new(Mutex::new(
231                        *self.controller.accumulated_max_stream_windows.lock(),
232                    )),
233                    rtt: self.controller.rtt.clone(),
234                    last_window_update: self.controller.last_window_update,
235                    receive_window: self.controller.receive_window,
236                    max_receive_window: self.controller.max_receive_window,
237                    send_window: self.controller.send_window,
238                },
239                buffer_len: self.buffer_len,
240            }
241        }
242    }
243
244    impl quickcheck::Arbitrary for Input {
245        fn arbitrary(g: &mut quickcheck::Gen) -> Self {
246            let config = Arc::new(Config::arbitrary(g));
247            let rtt = Rtt::arbitrary(g);
248
249            let max_connection_minus_default =
250                config.max_connection_receive_window.unwrap_or(usize::MAX)
251                    - (config.max_num_streams * (DEFAULT_CREDIT as usize));
252
253            let max_receive_window = if rtt.get().is_none() {
254                DEFAULT_CREDIT
255            } else {
256                g.gen_range(
257                    DEFAULT_CREDIT
258                        ..(DEFAULT_CREDIT as usize)
259                            .saturating_add(max_connection_minus_default)
260                            .try_into()
261                            .unwrap_or(u32::MAX)
262                            .saturating_add(1),
263                )
264            };
265            let receive_window = g.gen_range(0..max_receive_window);
266            let buffer_len = g.gen_range(0..max_receive_window as usize);
267            let accumulated_max_stream_windows = Arc::new(Mutex::new(g.gen_range(
268                (max_receive_window - DEFAULT_CREDIT) as usize
269                    ..max_connection_minus_default.saturating_add(1),
270            )));
271            let last_window_update =
272                Instant::now() - Duration::from_secs(g.gen_range(0..(60 * 60 * 24)));
273            let send_window = g.gen_range(0..u32::MAX);
274
275            Self {
276                controller: FlowController {
277                    accumulated_max_stream_windows,
278                    rtt,
279                    last_window_update,
280                    config,
281                    receive_window,
282                    max_receive_window,
283                    send_window,
284                },
285                buffer_len,
286            }
287        }
288    }
289
290    #[test]
291    fn next_window_update() {
292        fn property(
293            Input {
294                mut controller,
295                buffer_len,
296            }: Input,
297        ) {
298            controller.next_window_update(buffer_len);
299        }
300
301        QuickCheck::new().quickcheck(property as fn(_))
302    }
303}