sled/
flusher.rs

1use std::thread;
2use std::time::Duration;
3
4use parking_lot::{Condvar, Mutex};
5
6use super::*;
7
8#[derive(Debug, Clone, Copy)]
9pub(crate) enum ShutdownState {
10    Running,
11    ShuttingDown,
12    ShutDown,
13}
14
15impl ShutdownState {
16    fn is_running(self) -> bool {
17        if let ShutdownState::Running = self { true } else { false }
18    }
19
20    fn is_shutdown(self) -> bool {
21        if let ShutdownState::ShutDown = self { true } else { false }
22    }
23}
24
25#[derive(Debug)]
26pub(crate) struct Flusher {
27    shutdown: Arc<Mutex<ShutdownState>>,
28    sc: Arc<Condvar>,
29    join_handle: Mutex<Option<std::thread::JoinHandle<()>>>,
30}
31
32impl Flusher {
33    /// Spawns a thread that periodically calls `callback` until dropped.
34    pub(crate) fn new(
35        name: String,
36        pagecache: Arc<PageCache>,
37        flush_every_ms: u64,
38    ) -> Self {
39        #[allow(clippy::mutex_atomic)] // mutex used in CondVar below
40        let shutdown = Arc::new(Mutex::new(ShutdownState::Running));
41        let sc = Arc::new(Condvar::new());
42
43        let join_handle = thread::Builder::new()
44            .name(name)
45            .spawn({
46                let shutdown = shutdown.clone();
47                let sc = sc.clone();
48                move || run(&shutdown, &sc, &pagecache, flush_every_ms)
49            })
50            .unwrap();
51
52        Self { shutdown, sc, join_handle: Mutex::new(Some(join_handle)) }
53    }
54}
55
56fn run(
57    shutdown: &Arc<Mutex<ShutdownState>>,
58    sc: &Arc<Condvar>,
59    pagecache: &Arc<PageCache>,
60    flush_every_ms: u64,
61) {
62    let flush_every = Duration::from_millis(flush_every_ms);
63    let mut shutdown = shutdown.lock();
64    let mut wrote_data = false;
65    while shutdown.is_running() || wrote_data {
66        let before = std::time::Instant::now();
67        let cc = concurrency_control::read();
68        match pagecache.log.roll_iobuf() {
69            Ok(0) => {
70                wrote_data = false;
71                if !shutdown.is_running() {
72                    break;
73                }
74            }
75            Ok(_) => {
76                wrote_data = true;
77                if !shutdown.is_running() {
78                    // loop right away if we're in
79                    // shutdown mode, to flush data
80                    // more quickly.
81                    continue;
82                }
83            }
84            Err(e) => {
85                error!("failed to flush from periodic flush thread: {}", e);
86
87                #[cfg(feature = "failpoints")]
88                pagecache.set_failpoint(e);
89
90                *shutdown = ShutdownState::ShutDown;
91
92                // having held the mutex makes this linearized
93                // with the notify below.
94                drop(shutdown);
95
96                let _notified = sc.notify_all();
97                return;
98            }
99        }
100        drop(cc);
101
102        // so we can spend a little effort
103        // cleaning up the segments. try not to
104        // spend more than half of our sleep
105        // time rewriting pages though.
106        //
107        // this looks weird because it's a rust-style do-while
108        // where the conditional is the full body
109        while {
110            let made_progress = match pagecache.attempt_gc() {
111                Err(e) => {
112                    error!(
113                        "failed to clean file from periodic flush thread: {}",
114                        e
115                    );
116
117                    #[cfg(feature = "failpoints")]
118                    pagecache.set_failpoint(e);
119
120                    *shutdown = ShutdownState::ShutDown;
121
122                    // having held the mutex makes this linearized
123                    // with the notify below.
124                    drop(shutdown);
125
126                    let _notified = sc.notify_all();
127                    return;
128                }
129                Ok(false) => false,
130                Ok(true) => true,
131            };
132            made_progress
133                && shutdown.is_running()
134                && before.elapsed() < flush_every / 2
135        } {}
136
137        if let Err(e) = pagecache.config.file.sync_all() {
138            error!("failed to fsync from periodic flush thread: {}", e);
139        }
140
141        let sleep_duration = flush_every
142            .checked_sub(before.elapsed())
143            .unwrap_or_else(|| Duration::from_millis(1));
144
145        if shutdown.is_running() {
146            // only sleep before the next flush if we are
147            // running normally. if we're shutting down,
148            // flush faster.
149            sc.wait_for(&mut shutdown, sleep_duration);
150        }
151    }
152
153    *shutdown = ShutdownState::ShutDown;
154
155    // having held the mutex makes this linearized
156    // with the notify below.
157    drop(shutdown);
158
159    let _notified = sc.notify_all();
160}
161
162impl Drop for Flusher {
163    fn drop(&mut self) {
164        let mut shutdown = self.shutdown.lock();
165        if shutdown.is_running() {
166            *shutdown = ShutdownState::ShuttingDown;
167            let _notified = self.sc.notify_all();
168        }
169
170        while !shutdown.is_shutdown() {
171            let _ = self.sc.wait_for(&mut shutdown, Duration::from_millis(100));
172        }
173
174        let mut join_handle_opt = self.join_handle.lock();
175        if let Some(join_handle) = join_handle_opt.take() {
176            if let Err(e) = join_handle.join() {
177                error!("error joining Periodic thread: {:?}", e);
178            }
179        }
180    }
181}