sled/
threadpool.rs

1//! A simple adaptive threadpool that returns a oneshot future.
2
3use std::{
4    collections::VecDeque,
5    sync::atomic::{AtomicBool, Ordering::Relaxed},
6    thread,
7    time::{Duration, Instant},
8};
9
10use parking_lot::{Condvar, Mutex};
11
12use crate::{
13    debug_delay, warn, Acquire, AtomicUsize, Error, Lazy, OneShot, Result,
14    SeqCst,
15};
16
17// This is lower for CI reasons.
18#[cfg(windows)]
19const MAX_THREADS: usize = 16;
20
21#[cfg(not(windows))]
22const MAX_THREADS: usize = 128;
23
24const DESIRED_WAITING_THREADS: usize = 7;
25
26static WAITING_THREAD_COUNT: AtomicUsize = AtomicUsize::new(0);
27static TOTAL_THREAD_COUNT: AtomicUsize = AtomicUsize::new(0);
28static SPAWNS: AtomicUsize = AtomicUsize::new(0);
29static SPAWNING: AtomicBool = AtomicBool::new(false);
30
31type Work = Box<dyn FnOnce() + Send + 'static>;
32
33struct Queue {
34    cv: Condvar,
35    mu: Mutex<VecDeque<Work>>,
36}
37
38impl Queue {
39    fn recv_timeout(&self, duration: Duration) -> Option<Work> {
40        let mut queue = self.mu.lock();
41
42        let cutoff = Instant::now() + duration;
43
44        while queue.is_empty() {
45            WAITING_THREAD_COUNT.fetch_add(1, SeqCst);
46            let res = self.cv.wait_until(&mut queue, cutoff);
47            WAITING_THREAD_COUNT.fetch_sub(1, SeqCst);
48            if res.timed_out() {
49                break;
50            }
51        }
52
53        queue.pop_front()
54    }
55
56    fn try_recv(&self) -> Option<Work> {
57        let mut queue = self.mu.lock();
58        queue.pop_front()
59    }
60
61    fn send(&self, work: Work) -> usize {
62        let mut queue = self.mu.lock();
63        queue.push_back(work);
64
65        let len = queue.len();
66
67        // having held the mutex makes this linearized
68        // with the notify below.
69        drop(queue);
70
71        self.cv.notify_all();
72
73        len
74    }
75}
76
77static QUEUE: Lazy<Queue, fn() -> Queue> = Lazy::new(init_queue);
78static BROKEN: AtomicBool = AtomicBool::new(false);
79
80fn init_queue() -> Queue {
81    debug_delay();
82    for _ in 0..DESIRED_WAITING_THREADS {
83        debug_delay();
84        if let Err(e) = spawn_new_thread(true) {
85            log::error!("failed to initialize threadpool: {:?}", e);
86        }
87    }
88    Queue { cv: Condvar::new(), mu: Mutex::new(VecDeque::new()) }
89}
90
91fn perform_work(is_immortal: bool) {
92    let wait_limit = Duration::from_secs(1);
93
94    let mut performed = 0;
95    let mut contiguous_overshoots = 0;
96
97    while is_immortal || performed < 5 || contiguous_overshoots < 3 {
98        debug_delay();
99        let task_res = QUEUE.recv_timeout(wait_limit);
100
101        if let Some(task) = task_res {
102            WAITING_THREAD_COUNT.fetch_sub(1, SeqCst);
103            (task)();
104            WAITING_THREAD_COUNT.fetch_add(1, SeqCst);
105            performed += 1;
106        }
107
108        while let Some(task) = QUEUE.try_recv() {
109            debug_delay();
110            WAITING_THREAD_COUNT.fetch_sub(1, SeqCst);
111            (task)();
112            WAITING_THREAD_COUNT.fetch_add(1, SeqCst);
113            performed += 1;
114        }
115
116        debug_delay();
117
118        let waiting = WAITING_THREAD_COUNT.load(Acquire);
119
120        if waiting > DESIRED_WAITING_THREADS {
121            contiguous_overshoots += 1;
122        } else {
123            contiguous_overshoots = 0;
124        }
125    }
126}
127
128// Create up to MAX_THREADS dynamic blocking task worker threads.
129// Dynamic threads will terminate themselves if they don't
130// receive any work after one second.
131fn maybe_spawn_new_thread() -> Result<()> {
132    debug_delay();
133    let total_workers = TOTAL_THREAD_COUNT.load(Acquire);
134    debug_delay();
135    let waiting_threads = WAITING_THREAD_COUNT.load(Acquire);
136
137    if waiting_threads >= DESIRED_WAITING_THREADS
138        || total_workers >= MAX_THREADS
139    {
140        return Ok(());
141    }
142
143    if !SPAWNING.compare_and_swap(false, true, SeqCst) {
144        spawn_new_thread(false)?;
145    }
146
147    Ok(())
148}
149
150fn spawn_new_thread(is_immortal: bool) -> Result<()> {
151    if BROKEN.load(Relaxed) {
152        return Err(Error::ReportableBug(
153            "IO thread unexpectedly panicked. please report \
154            this bug on the sled github repo."
155                .to_string(),
156        ));
157    }
158
159    let spawn_id = SPAWNS.fetch_add(1, SeqCst);
160
161    TOTAL_THREAD_COUNT.fetch_add(1, SeqCst);
162    let spawn_res = thread::Builder::new()
163        .name(format!("sled-io-{}", spawn_id))
164        .spawn(move || {
165            SPAWNING.store(false, SeqCst);
166            debug_delay();
167            let res = std::panic::catch_unwind(|| perform_work(is_immortal));
168            TOTAL_THREAD_COUNT.fetch_sub(1, SeqCst);
169            if is_immortal || res.is_err() {
170                // IO thread panicked, shut down the system
171                log::error!(
172                    "IO thread unexpectedly terminated.
173                    please report this error at the sled github repo. {:?}",
174                    res
175                );
176                BROKEN.store(true, SeqCst);
177            }
178        });
179
180    if let Err(e) = spawn_res {
181        static E: AtomicBool = AtomicBool::new(false);
182
183        SPAWNING.store(false, SeqCst);
184
185        if !E.compare_and_swap(false, true, Relaxed) {
186            // only execute this once
187            warn!(
188                "Failed to dynamically increase the threadpool size: {:?}.",
189                e,
190            )
191        }
192    }
193
194    Ok(())
195}
196
197/// Spawn a function on the threadpool.
198pub fn spawn<F, R>(work: F) -> Result<OneShot<R>>
199where
200    F: FnOnce() -> R + Send + 'static,
201    R: Send + 'static + Sized,
202{
203    let (promise_filler, promise) = OneShot::pair();
204    let task = move || {
205        let result = (work)();
206        promise_filler.fill(result);
207    };
208
209    let depth = QUEUE.send(Box::new(task));
210
211    if depth > DESIRED_WAITING_THREADS {
212        maybe_spawn_new_thread()?;
213    }
214
215    Ok(promise)
216}