1use std::thread;
2use std::time::Duration;
3
4use parking_lot::{Condvar, Mutex};
5
6use super::*;
7
8#[derive(Debug, Clone, Copy)]
9pub(crate) enum ShutdownState {
10 Running,
11 ShuttingDown,
12 ShutDown,
13}
14
15impl ShutdownState {
16 fn is_running(self) -> bool {
17 if let ShutdownState::Running = self { true } else { false }
18 }
19
20 fn is_shutdown(self) -> bool {
21 if let ShutdownState::ShutDown = self { true } else { false }
22 }
23}
24
25#[derive(Debug)]
26pub(crate) struct Flusher {
27 shutdown: Arc<Mutex<ShutdownState>>,
28 sc: Arc<Condvar>,
29 join_handle: Mutex<Option<std::thread::JoinHandle<()>>>,
30}
31
32impl Flusher {
33 pub(crate) fn new(
35 name: String,
36 pagecache: Arc<PageCache>,
37 flush_every_ms: u64,
38 ) -> Self {
39 #[allow(clippy::mutex_atomic)] let shutdown = Arc::new(Mutex::new(ShutdownState::Running));
41 let sc = Arc::new(Condvar::new());
42
43 let join_handle = thread::Builder::new()
44 .name(name)
45 .spawn({
46 let shutdown = shutdown.clone();
47 let sc = sc.clone();
48 move || run(&shutdown, &sc, &pagecache, flush_every_ms)
49 })
50 .unwrap();
51
52 Self { shutdown, sc, join_handle: Mutex::new(Some(join_handle)) }
53 }
54}
55
56fn run(
57 shutdown: &Arc<Mutex<ShutdownState>>,
58 sc: &Arc<Condvar>,
59 pagecache: &Arc<PageCache>,
60 flush_every_ms: u64,
61) {
62 let flush_every = Duration::from_millis(flush_every_ms);
63 let mut shutdown = shutdown.lock();
64 let mut wrote_data = false;
65 while shutdown.is_running() || wrote_data {
66 let before = std::time::Instant::now();
67 let cc = concurrency_control::read();
68 match pagecache.log.roll_iobuf() {
69 Ok(0) => {
70 wrote_data = false;
71 if !shutdown.is_running() {
72 break;
73 }
74 }
75 Ok(_) => {
76 wrote_data = true;
77 if !shutdown.is_running() {
78 continue;
82 }
83 }
84 Err(e) => {
85 error!("failed to flush from periodic flush thread: {}", e);
86
87 #[cfg(feature = "failpoints")]
88 pagecache.set_failpoint(e);
89
90 *shutdown = ShutdownState::ShutDown;
91
92 drop(shutdown);
95
96 let _notified = sc.notify_all();
97 return;
98 }
99 }
100 drop(cc);
101
102 while {
110 let made_progress = match pagecache.attempt_gc() {
111 Err(e) => {
112 error!(
113 "failed to clean file from periodic flush thread: {}",
114 e
115 );
116
117 #[cfg(feature = "failpoints")]
118 pagecache.set_failpoint(e);
119
120 *shutdown = ShutdownState::ShutDown;
121
122 drop(shutdown);
125
126 let _notified = sc.notify_all();
127 return;
128 }
129 Ok(false) => false,
130 Ok(true) => true,
131 };
132 made_progress
133 && shutdown.is_running()
134 && before.elapsed() < flush_every / 2
135 } {}
136
137 if let Err(e) = pagecache.config.file.sync_all() {
138 error!("failed to fsync from periodic flush thread: {}", e);
139 }
140
141 let sleep_duration = flush_every
142 .checked_sub(before.elapsed())
143 .unwrap_or_else(|| Duration::from_millis(1));
144
145 if shutdown.is_running() {
146 sc.wait_for(&mut shutdown, sleep_duration);
150 }
151 }
152
153 *shutdown = ShutdownState::ShutDown;
154
155 drop(shutdown);
158
159 let _notified = sc.notify_all();
160}
161
162impl Drop for Flusher {
163 fn drop(&mut self) {
164 let mut shutdown = self.shutdown.lock();
165 if shutdown.is_running() {
166 *shutdown = ShutdownState::ShuttingDown;
167 let _notified = self.sc.notify_all();
168 }
169
170 while !shutdown.is_shutdown() {
171 let _ = self.sc.wait_for(&mut shutdown, Duration::from_millis(100));
172 }
173
174 let mut join_handle_opt = self.join_handle.lock();
175 if let Some(join_handle) = join_handle_opt.take() {
176 if let Err(e) = join_handle.join() {
177 error!("error joining Periodic thread: {:?}", e);
178 }
179 }
180 }
181}