1use std::{
4 collections::VecDeque,
5 sync::atomic::{AtomicBool, Ordering::Relaxed},
6 thread,
7 time::{Duration, Instant},
8};
9
10use parking_lot::{Condvar, Mutex};
11
12use crate::{
13 debug_delay, warn, Acquire, AtomicUsize, Error, Lazy, OneShot, Result,
14 SeqCst,
15};
16
17#[cfg(windows)]
19const MAX_THREADS: usize = 16;
20
21#[cfg(not(windows))]
22const MAX_THREADS: usize = 128;
23
24const DESIRED_WAITING_THREADS: usize = 7;
25
26static WAITING_THREAD_COUNT: AtomicUsize = AtomicUsize::new(0);
27static TOTAL_THREAD_COUNT: AtomicUsize = AtomicUsize::new(0);
28static SPAWNS: AtomicUsize = AtomicUsize::new(0);
29static SPAWNING: AtomicBool = AtomicBool::new(false);
30
31type Work = Box<dyn FnOnce() + Send + 'static>;
32
33struct Queue {
34 cv: Condvar,
35 mu: Mutex<VecDeque<Work>>,
36}
37
38impl Queue {
39 fn recv_timeout(&self, duration: Duration) -> Option<Work> {
40 let mut queue = self.mu.lock();
41
42 let cutoff = Instant::now() + duration;
43
44 while queue.is_empty() {
45 WAITING_THREAD_COUNT.fetch_add(1, SeqCst);
46 let res = self.cv.wait_until(&mut queue, cutoff);
47 WAITING_THREAD_COUNT.fetch_sub(1, SeqCst);
48 if res.timed_out() {
49 break;
50 }
51 }
52
53 queue.pop_front()
54 }
55
56 fn try_recv(&self) -> Option<Work> {
57 let mut queue = self.mu.lock();
58 queue.pop_front()
59 }
60
61 fn send(&self, work: Work) -> usize {
62 let mut queue = self.mu.lock();
63 queue.push_back(work);
64
65 let len = queue.len();
66
67 drop(queue);
70
71 self.cv.notify_all();
72
73 len
74 }
75}
76
77static QUEUE: Lazy<Queue, fn() -> Queue> = Lazy::new(init_queue);
78static BROKEN: AtomicBool = AtomicBool::new(false);
79
80fn init_queue() -> Queue {
81 debug_delay();
82 for _ in 0..DESIRED_WAITING_THREADS {
83 debug_delay();
84 if let Err(e) = spawn_new_thread(true) {
85 log::error!("failed to initialize threadpool: {:?}", e);
86 }
87 }
88 Queue { cv: Condvar::new(), mu: Mutex::new(VecDeque::new()) }
89}
90
91fn perform_work(is_immortal: bool) {
92 let wait_limit = Duration::from_secs(1);
93
94 let mut performed = 0;
95 let mut contiguous_overshoots = 0;
96
97 while is_immortal || performed < 5 || contiguous_overshoots < 3 {
98 debug_delay();
99 let task_res = QUEUE.recv_timeout(wait_limit);
100
101 if let Some(task) = task_res {
102 WAITING_THREAD_COUNT.fetch_sub(1, SeqCst);
103 (task)();
104 WAITING_THREAD_COUNT.fetch_add(1, SeqCst);
105 performed += 1;
106 }
107
108 while let Some(task) = QUEUE.try_recv() {
109 debug_delay();
110 WAITING_THREAD_COUNT.fetch_sub(1, SeqCst);
111 (task)();
112 WAITING_THREAD_COUNT.fetch_add(1, SeqCst);
113 performed += 1;
114 }
115
116 debug_delay();
117
118 let waiting = WAITING_THREAD_COUNT.load(Acquire);
119
120 if waiting > DESIRED_WAITING_THREADS {
121 contiguous_overshoots += 1;
122 } else {
123 contiguous_overshoots = 0;
124 }
125 }
126}
127
128fn maybe_spawn_new_thread() -> Result<()> {
132 debug_delay();
133 let total_workers = TOTAL_THREAD_COUNT.load(Acquire);
134 debug_delay();
135 let waiting_threads = WAITING_THREAD_COUNT.load(Acquire);
136
137 if waiting_threads >= DESIRED_WAITING_THREADS
138 || total_workers >= MAX_THREADS
139 {
140 return Ok(());
141 }
142
143 if !SPAWNING.compare_and_swap(false, true, SeqCst) {
144 spawn_new_thread(false)?;
145 }
146
147 Ok(())
148}
149
150fn spawn_new_thread(is_immortal: bool) -> Result<()> {
151 if BROKEN.load(Relaxed) {
152 return Err(Error::ReportableBug(
153 "IO thread unexpectedly panicked. please report \
154 this bug on the sled github repo."
155 .to_string(),
156 ));
157 }
158
159 let spawn_id = SPAWNS.fetch_add(1, SeqCst);
160
161 TOTAL_THREAD_COUNT.fetch_add(1, SeqCst);
162 let spawn_res = thread::Builder::new()
163 .name(format!("sled-io-{}", spawn_id))
164 .spawn(move || {
165 SPAWNING.store(false, SeqCst);
166 debug_delay();
167 let res = std::panic::catch_unwind(|| perform_work(is_immortal));
168 TOTAL_THREAD_COUNT.fetch_sub(1, SeqCst);
169 if is_immortal || res.is_err() {
170 log::error!(
172 "IO thread unexpectedly terminated.
173 please report this error at the sled github repo. {:?}",
174 res
175 );
176 BROKEN.store(true, SeqCst);
177 }
178 });
179
180 if let Err(e) = spawn_res {
181 static E: AtomicBool = AtomicBool::new(false);
182
183 SPAWNING.store(false, SeqCst);
184
185 if !E.compare_and_swap(false, true, Relaxed) {
186 warn!(
188 "Failed to dynamically increase the threadpool size: {:?}.",
189 e,
190 )
191 }
192 }
193
194 Ok(())
195}
196
197pub fn spawn<F, R>(work: F) -> Result<OneShot<R>>
199where
200 F: FnOnce() -> R + Send + 'static,
201 R: Send + 'static + Sized,
202{
203 let (promise_filler, promise) = OneShot::pair();
204 let task = move || {
205 let result = (work)();
206 promise_filler.fill(result);
207 };
208
209 let depth = QUEUE.send(Box::new(task));
210
211 if depth > DESIRED_WAITING_THREADS {
212 maybe_spawn_new_thread()?;
213 }
214
215 Ok(promise)
216}