sled/
config.rs

1use std::{
2    collections::HashMap,
3    fs,
4    fs::File,
5    io,
6    io::{BufRead, BufReader, ErrorKind, Read, Seek, Write},
7    ops::Deref,
8    path::{Path, PathBuf},
9    sync::atomic::AtomicUsize,
10};
11
12use crate::pagecache::{arr_to_u32, u32_to_arr, Lsn};
13use crate::*;
14
15const DEFAULT_PATH: &str = "default.sled";
16
17/// The high-level database mode, according to
18/// the trade-offs of the RUM conjecture.
19#[derive(Debug, Clone, Copy)]
20pub enum Mode {
21    /// In this mode, the database will make
22    /// decisions that favor using less space
23    /// instead of supporting the highest possible
24    /// write throughput. This mode will also
25    /// rewrite data more frequently as it
26    /// strives to reduce fragmentation.
27    LowSpace,
28    /// In this mode, the database will try
29    /// to maximize write throughput while
30    /// potentially using more disk space.
31    HighThroughput,
32}
33
34/// A persisted configuration about high-level
35/// storage file information
36#[derive(Debug, Eq, PartialEq, Clone, Copy)]
37struct StorageParameters {
38    pub segment_size: usize,
39    pub use_compression: bool,
40    pub version: (usize, usize),
41}
42
43impl StorageParameters {
44    pub fn serialize(&self) -> Vec<u8> {
45        let mut out = vec![];
46
47        writeln!(&mut out, "segment_size: {}", self.segment_size).unwrap();
48        writeln!(&mut out, "use_compression: {}", self.use_compression)
49            .unwrap();
50        writeln!(&mut out, "version: {}.{}", self.version.0, self.version.1)
51            .unwrap();
52
53        out
54    }
55
56    pub fn deserialize(bytes: &[u8]) -> Result<StorageParameters> {
57        let reader = BufReader::new(bytes);
58
59        let mut lines = HashMap::new();
60
61        for line in reader.lines() {
62            let line = if let Ok(l) = line {
63                l
64            } else {
65                error!(
66                    "failed to parse persisted config as UTF-8. \
67                     This changed in sled version 0.29"
68                );
69                return Err(Error::Unsupported(
70                    "failed to open database that may \
71                     have been created using a sled version \
72                     earlier than 0.29"
73                        .to_string(),
74                ));
75            };
76            let mut split = line.split(": ").map(String::from);
77            let k = if let Some(k) = split.next() {
78                k
79            } else {
80                error!("failed to parse persisted config line: {}", line);
81                return Err(Error::corruption(None));
82            };
83            let v = if let Some(v) = split.next() {
84                v
85            } else {
86                error!("failed to parse persisted config line: {}", line);
87                return Err(Error::corruption(None));
88            };
89            lines.insert(k, v);
90        }
91
92        let segment_size: usize = if let Some(raw) = lines.get("segment_size") {
93            if let Ok(parsed) = raw.parse() {
94                parsed
95            } else {
96                error!("failed to parse segment_size value: {}", raw);
97                return Err(Error::corruption(None));
98            }
99        } else {
100            error!(
101                "failed to retrieve required configuration parameter: segment_size"
102            );
103            return Err(Error::corruption(None));
104        };
105
106        let use_compression: bool = if let Some(raw) =
107            lines.get("use_compression")
108        {
109            if let Ok(parsed) = raw.parse() {
110                parsed
111            } else {
112                error!("failed to parse use_compression value: {}", raw);
113                return Err(Error::corruption(None));
114            }
115        } else {
116            error!(
117                "failed to retrieve required configuration parameter: use_compression"
118            );
119            return Err(Error::corruption(None));
120        };
121
122        let version: (usize, usize) = if let Some(raw) = lines.get("version") {
123            let mut split = raw.split('.');
124            let major = if let Some(raw_major) = split.next() {
125                if let Ok(parsed_major) = raw_major.parse() {
126                    parsed_major
127                } else {
128                    error!(
129                        "failed to parse major version value from line: {}",
130                        raw
131                    );
132                    return Err(Error::corruption(None));
133                }
134            } else {
135                error!("failed to parse major version value: {}", raw);
136                return Err(Error::corruption(None));
137            };
138
139            let minor = if let Some(raw_minor) = split.next() {
140                if let Ok(parsed_minor) = raw_minor.parse() {
141                    parsed_minor
142                } else {
143                    error!(
144                        "failed to parse minor version value from line: {}",
145                        raw
146                    );
147                    return Err(Error::corruption(None));
148                }
149            } else {
150                error!("failed to parse minor version value: {}", raw);
151                return Err(Error::corruption(None));
152            };
153
154            (major, minor)
155        } else {
156            error!(
157                "failed to retrieve required configuration parameter: version"
158            );
159            return Err(Error::corruption(None));
160        };
161
162        Ok(StorageParameters { segment_size, use_compression, version })
163    }
164}
165
166/// Top-level configuration for the system.
167///
168/// # Examples
169///
170/// ```
171/// let _config = sled::Config::default()
172///     .path("/path/to/data".to_owned())
173///     .cache_capacity(10_000_000_000)
174///     .flush_every_ms(Some(1000));
175/// ```
176#[derive(Default, Debug, Clone)]
177pub struct Config(Arc<Inner>);
178
179impl Deref for Config {
180    type Target = Inner;
181
182    fn deref(&self) -> &Inner {
183        &self.0
184    }
185}
186
187#[doc(hidden)]
188#[derive(Debug, Clone)]
189pub struct Inner {
190    #[doc(hidden)]
191    pub cache_capacity: u64,
192    #[doc(hidden)]
193    pub flush_every_ms: Option<u64>,
194    #[doc(hidden)]
195    pub segment_size: usize,
196    #[doc(hidden)]
197    pub path: PathBuf,
198    #[doc(hidden)]
199    pub create_new: bool,
200    #[doc(hidden)]
201    pub mode: Mode,
202    #[doc(hidden)]
203    pub temporary: bool,
204    #[doc(hidden)]
205    pub use_compression: bool,
206    #[doc(hidden)]
207    pub compression_factor: i32,
208    #[doc(hidden)]
209    pub print_profile_on_drop: bool,
210    #[doc(hidden)]
211    pub idgen_persist_interval: u64,
212    #[doc(hidden)]
213    pub version: (usize, usize),
214    tmp_path: PathBuf,
215    pub(crate) global_error: Arc<Atomic<Error>>,
216    #[cfg(feature = "event_log")]
217    /// an event log for concurrent debugging
218    pub event_log: Arc<event_log::EventLog>,
219}
220
221impl Default for Inner {
222    fn default() -> Self {
223        Self {
224            // generally useful
225            path: PathBuf::from(DEFAULT_PATH),
226            tmp_path: Config::gen_temp_path(),
227            create_new: false,
228            cache_capacity: 1024 * 1024 * 1024, // 1gb
229            mode: Mode::LowSpace,
230            use_compression: false,
231            compression_factor: 5,
232            temporary: false,
233            version: crate_version(),
234
235            // useful in testing
236            segment_size: 512 * 1024, // 512kb in bytes
237            print_profile_on_drop: false,
238            flush_every_ms: Some(500),
239            idgen_persist_interval: 1_000_000,
240            global_error: Arc::new(Atomic::default()),
241            #[cfg(feature = "event_log")]
242            event_log: Arc::new(crate::event_log::EventLog::default()),
243        }
244    }
245}
246
247impl Inner {
248    // Get the path of the database
249    #[doc(hidden)]
250    pub fn get_path(&self) -> PathBuf {
251        if self.temporary && self.path == PathBuf::from(DEFAULT_PATH) {
252            self.tmp_path.clone()
253        } else {
254            self.path.clone()
255        }
256    }
257
258    pub(crate) fn blob_path(&self, id: Lsn) -> PathBuf {
259        self.get_path().join("blobs").join(format!("{}", id))
260    }
261
262    fn db_path(&self) -> PathBuf {
263        self.get_path().join("db")
264    }
265
266    fn config_path(&self) -> PathBuf {
267        self.get_path().join("conf")
268    }
269
270    pub(crate) fn normalize<T>(&self, value: T) -> T
271    where
272        T: Copy
273            + TryFrom<usize>
274            + std::ops::Div<Output = T>
275            + std::ops::Mul<Output = T>,
276        <T as std::convert::TryFrom<usize>>::Error: Debug,
277    {
278        let segment_size: T = T::try_from(self.segment_size).unwrap();
279        value / segment_size * segment_size
280    }
281}
282
283macro_rules! supported {
284    ($cond:expr, $msg:expr) => {
285        if !$cond {
286            return Err(Error::Unsupported($msg.to_owned()));
287        }
288    };
289}
290
291macro_rules! builder {
292    ($(($name:ident, $t:ty, $desc:expr)),*) => {
293        $(
294            #[doc=$desc]
295            pub fn $name(mut self, to: $t) -> Self {
296                if Arc::strong_count(&self.0) != 1 {
297                    error!(
298                        "config has already been used to start \
299                         the system and probably should not be \
300                         mutated",
301                    );
302                }
303                let m = Arc::make_mut(&mut self.0);
304                m.$name = to;
305                self
306            }
307        )*
308    }
309}
310
311impl Config {
312    /// Returns a default `Config`
313    pub fn new() -> Config {
314        Config::default()
315    }
316
317    /// Set the path of the database (builder).
318    pub fn path<P: AsRef<Path>>(mut self, path: P) -> Config {
319        let m = Arc::get_mut(&mut self.0).unwrap();
320        m.path = path.as_ref().to_path_buf();
321        self
322    }
323
324    /// A testing-only method for reducing the io-buffer size
325    /// to trigger correctness-critical behavior more often
326    /// by shrinking the buffer size. Don't rely on this.
327    #[doc(hidden)]
328    pub fn segment_size(mut self, segment_size: usize) -> Config {
329        if Arc::strong_count(&self.0) != 1 {
330            error!(
331                "config has already been used to start \
332                 the system and probably should not be \
333                 mutated",
334            );
335        }
336        let m = Arc::make_mut(&mut self.0);
337        m.segment_size = segment_size;
338        self
339    }
340
341    /// Opens a `Db` based on the provided config.
342    pub fn open(&self) -> Result<Db> {
343        // only validate, setup directory, and open file once
344        self.validate()?;
345
346        let mut config = self.clone();
347        config.limit_cache_max_memory();
348
349        let file = config.open_file()?;
350
351        // seal config in a Config
352        let config = RunningConfig { inner: config, file: Arc::new(file) };
353
354        Db::start_inner(config)
355    }
356
357    #[doc(hidden)]
358    #[deprecated(
359        since = "0.31.0",
360        note = "this does nothing for now. maybe it will come back in the future."
361    )]
362    pub const fn segment_cleanup_skew(self, _: usize) -> Self {
363        self
364    }
365
366    #[doc(hidden)]
367    #[deprecated(
368        since = "0.31.0",
369        note = "this does nothing for now. maybe it will come back in the future."
370    )]
371    pub const fn segment_cleanup_threshold(self, _: u8) -> Self {
372        self
373    }
374
375    #[doc(hidden)]
376    #[deprecated(
377        since = "0.31.0",
378        note = "this does nothing for now. maybe it will come back in the future."
379    )]
380    pub const fn snapshot_after_ops(self, _: u64) -> Self {
381        self
382    }
383
384    #[doc(hidden)]
385    #[deprecated(
386        since = "0.31.0",
387        note = "this does nothing for now. maybe it will come back in the future."
388    )]
389    pub fn snapshot_path<P>(self, _: P) -> Self {
390        self
391    }
392
393    #[doc(hidden)]
394    pub fn flush_every_ms(mut self, every_ms: Option<u64>) -> Self {
395        if Arc::strong_count(&self.0) != 1 {
396            error!(
397                "config has already been used to start \
398                 the system and probably should not be \
399                 mutated",
400            );
401        }
402        let m = Arc::make_mut(&mut self.0);
403        m.flush_every_ms = every_ms;
404        self
405    }
406
407    #[doc(hidden)]
408    pub fn idgen_persist_interval(mut self, interval: u64) -> Self {
409        if Arc::strong_count(&self.0) != 1 {
410            error!(
411                "config has already been used to start \
412                 the system and probably should not be \
413                 mutated",
414            );
415        }
416        let m = Arc::make_mut(&mut self.0);
417        m.idgen_persist_interval = interval;
418        self
419    }
420
421    /// Finalize the configuration.
422    ///
423    /// # Panics
424    ///
425    /// This function will panic if it is not possible
426    /// to open the files for performing database IO,
427    /// or if the provided configuration fails some
428    /// basic sanity checks.
429    #[doc(hidden)]
430    #[deprecated(since = "0.29.0", note = "use Config::open instead")]
431    pub fn build(mut self) -> RunningConfig {
432        // only validate, setup directory, and open file once
433        self.validate().unwrap();
434
435        self.limit_cache_max_memory();
436
437        let file = self.open_file().unwrap_or_else(|e| {
438            panic!("open file at {:?}: {}", self.db_path(), e);
439        });
440
441        // seal config in a Config
442        RunningConfig { inner: self, file: Arc::new(file) }
443    }
444
445    fn gen_temp_path() -> PathBuf {
446        use std::time::SystemTime;
447
448        static SALT_COUNTER: AtomicUsize = AtomicUsize::new(0);
449
450        let seed = SALT_COUNTER.fetch_add(1, SeqCst) as u128;
451
452        let now = SystemTime::now()
453            .duration_since(SystemTime::UNIX_EPOCH)
454            .unwrap()
455            .as_nanos()
456            << 48;
457
458        #[cfg(not(miri))]
459        let pid = u128::from(std::process::id());
460
461        #[cfg(miri)]
462        let pid = 0;
463
464        let salt = (pid << 16) + now + seed;
465
466        if cfg!(target_os = "linux") {
467            // use shared memory for temporary linux files
468            format!("/dev/shm/pagecache.tmp.{}", salt).into()
469        } else {
470            std::env::temp_dir().join(format!("pagecache.tmp.{}", salt))
471        }
472    }
473
474    fn limit_cache_max_memory(&mut self) {
475        let limit = sys_limits::get_memory_limit();
476        if limit > 0 && self.cache_capacity > limit {
477            let m = Arc::make_mut(&mut self.0);
478            m.cache_capacity = limit;
479            error!(
480                "cache capacity is limited to the cgroup memory \
481                 limit: {} bytes",
482                self.cache_capacity
483            );
484        }
485    }
486
487    builder!(
488        (
489            cache_capacity,
490            u64,
491            "maximum size in bytes for the system page cache"
492        ),
493        (
494            mode,
495            Mode,
496            "specify whether the system should run in \"small\" or \"fast\" mode"
497        ),
498        (use_compression, bool, "whether to use zstd compression"),
499        (
500            compression_factor,
501            i32,
502            "the compression factor to use with zstd compression. Ranges from 1 up to 22. Levels >= 20 are 'ultra'."
503        ),
504        (
505            temporary,
506            bool,
507            "deletes the database after drop. if no path is set, uses /dev/shm on linux"
508        ),
509        (
510            create_new,
511            bool,
512            "attempts to exclusively open the database, failing if it already exists"
513        ),
514        (
515            print_profile_on_drop,
516            bool,
517            "print a performance profile when the Config is dropped"
518        )
519    );
520
521    // panics if config options are outside of advised range
522    fn validate(&self) -> Result<()> {
523        supported!(
524            self.segment_size.count_ones() == 1,
525            "segment_size should be a power of 2"
526        );
527        supported!(
528            self.segment_size >= 256,
529            "segment_size should be hundreds of kb at minimum, and we won't start if below 256"
530        );
531        supported!(
532            self.segment_size <= 1 << 24,
533            "segment_size should be <= 16mb"
534        );
535        if self.use_compression {
536            supported!(
537                cfg!(feature = "compression"),
538                "the 'compression' feature must be enabled"
539            );
540        }
541        supported!(
542            self.compression_factor >= 1,
543            "compression_factor must be >= 1"
544        );
545        supported!(
546            self.compression_factor <= 22,
547            "compression_factor must be <= 22"
548        );
549        supported!(
550            self.idgen_persist_interval > 0,
551            "idgen_persist_interval must be above 0"
552        );
553        Ok(())
554    }
555
556    fn open_file(&self) -> Result<File> {
557        let blob_dir: PathBuf = self.get_path().join("blobs");
558
559        if !blob_dir.exists() {
560            fs::create_dir_all(blob_dir)?;
561        }
562
563        self.verify_config()?;
564
565        // open the data file
566        let mut options = fs::OpenOptions::new();
567
568        let _ = options.create(true);
569        let _ = options.read(true);
570        let _ = options.write(true);
571
572        if self.create_new {
573            options.create_new(true);
574        }
575
576        self.try_lock(options.open(&self.db_path())?)
577    }
578
579    fn try_lock(&self, file: File) -> Result<File> {
580        #[cfg(all(
581            not(miri),
582            any(windows, target_os = "linux", target_os = "macos")
583        ))]
584        {
585            use fs2::FileExt;
586
587            let try_lock = if cfg!(feature = "testing") {
588                // we block here because during testing
589                // there are many filesystem race condition
590                // that happen, causing locks to be held
591                // for long periods of time, so we should
592                // block to wait on reopening files.
593                file.lock_exclusive()
594            } else {
595                file.try_lock_exclusive()
596            };
597
598            if let Err(e) = try_lock {
599                return Err(Error::Io(io::Error::new(
600                    ErrorKind::Other,
601                    format!(
602                        "could not acquire lock on {:?}: {:?}",
603                        self.db_path().to_string_lossy(),
604                        e
605                    ),
606                )));
607            }
608        }
609
610        Ok(file)
611    }
612
613    fn verify_config(&self) -> Result<()> {
614        match self.read_config() {
615            Ok(Some(old)) => {
616                supported!(
617                    self.use_compression == old.use_compression,
618                    format!(
619                        "cannot change compression values across restarts. \
620                         old value of use_compression loaded from disk: {}, \
621                         currently set value: {}.",
622                        old.use_compression, self.use_compression,
623                    )
624                );
625
626                supported!(
627                    self.segment_size == old.segment_size,
628                    format!(
629                        "cannot change the io buffer size across restarts. \
630                         please change it back to {}",
631                        old.segment_size
632                    )
633                );
634
635                supported!(
636                    self.version == old.version,
637                    format!(
638                        "This database was created using \
639                         pagecache version {}.{}, but our pagecache \
640                         version is {}.{}. Please perform an upgrade \
641                         using the sled::Db::export and sled::Db::import \
642                         methods.",
643                        old.version.0,
644                        old.version.1,
645                        self.version.0,
646                        self.version.1,
647                    )
648                );
649                Ok(())
650            }
651            Ok(None) => self.write_config(),
652            Err(e) => Err(e),
653        }
654    }
655
656    fn serialize(&self) -> Vec<u8> {
657        let persisted_config = StorageParameters {
658            version: self.version,
659            segment_size: self.segment_size,
660            use_compression: self.use_compression,
661        };
662
663        persisted_config.serialize()
664    }
665
666    fn write_config(&self) -> Result<()> {
667        let bytes = self.serialize();
668        let crc: u32 = crc32(&*bytes);
669        let crc_arr = u32_to_arr(crc);
670
671        let path = self.config_path();
672
673        let mut f =
674            fs::OpenOptions::new().write(true).create(true).open(path)?;
675
676        io_fail!(self, "write_config bytes");
677        f.write_all(&*bytes)?;
678        io_fail!(self, "write_config crc");
679        f.write_all(&crc_arr)?;
680        io_fail!(self, "write_config post");
681        Ok(())
682    }
683
684    fn read_config(&self) -> Result<Option<StorageParameters>> {
685        let path = self.config_path();
686
687        let f_res = fs::OpenOptions::new().read(true).open(&path);
688
689        let mut f = match f_res {
690            Err(ref e) if e.kind() == ErrorKind::NotFound => {
691                return Ok(None);
692            }
693            Err(other) => {
694                return Err(other.into());
695            }
696            Ok(f) => f,
697        };
698
699        if f.metadata()?.len() <= 8 {
700            warn!("empty/corrupt configuration file found");
701            return Ok(None);
702        }
703
704        let mut buf = vec![];
705        let _ = f.read_to_end(&mut buf)?;
706        let len = buf.len();
707        let _ = buf.split_off(len - 4);
708
709        let mut crc_arr = [0_u8; 4];
710        let _ = f.seek(io::SeekFrom::End(-4))?;
711        f.read_exact(&mut crc_arr)?;
712        let crc_expected = arr_to_u32(&crc_arr);
713
714        let crc_actual = crc32(&*buf);
715
716        if crc_expected != crc_actual {
717            warn!(
718                "crc for settings file {:?} failed! \
719                 can't verify that config is safe",
720                path
721            );
722        }
723
724        StorageParameters::deserialize(&buf).map(Some)
725    }
726
727    /// Return the global error if one was encountered during
728    /// an asynchronous IO operation.
729    #[doc(hidden)]
730    pub fn global_error(&self) -> Result<()> {
731        let guard = pin();
732        let ge = self.global_error.load(Acquire, &guard);
733        if ge.is_null() {
734            Ok(())
735        } else {
736            #[allow(unsafe_code)]
737            unsafe {
738                Err(ge.deref().clone())
739            }
740        }
741    }
742
743    pub(crate) fn reset_global_error(&self) {
744        let guard = pin();
745        let old = self.global_error.swap(Shared::default(), SeqCst, &guard);
746        if !old.is_null() {
747            let guard = pin();
748            #[allow(unsafe_code)]
749            unsafe {
750                guard.defer_destroy(old);
751            }
752        }
753    }
754
755    pub(crate) fn set_global_error(&self, error_value: Error) {
756        let guard = pin();
757        let error = Owned::new(error_value);
758
759        let expected_old = Shared::null();
760
761        let _ = self.global_error.compare_and_set(
762            expected_old,
763            error,
764            SeqCst,
765            &guard,
766        );
767    }
768
769    #[cfg(feature = "failpoints")]
770    #[cfg(feature = "event_log")]
771    #[doc(hidden)]
772    // truncate the underlying file for corruption testing purposes.
773    pub fn truncate_corrupt(&self, new_len: u64) {
774        self.event_log.reset();
775        let path = self.db_path();
776        let f = std::fs::OpenOptions::new().write(true).open(path).unwrap();
777        f.set_len(new_len).expect("should be able to truncate");
778    }
779}
780
781/// A Configuration that has an associated opened
782/// file.
783#[allow(clippy::module_name_repetitions)]
784#[derive(Debug, Clone)]
785pub struct RunningConfig {
786    inner: Config,
787    pub(crate) file: Arc<File>,
788}
789
790#[allow(unsafe_code)]
791unsafe impl Send for RunningConfig {}
792
793#[allow(unsafe_code)]
794unsafe impl Sync for RunningConfig {}
795
796impl Deref for RunningConfig {
797    type Target = Config;
798
799    fn deref(&self) -> &Config {
800        &self.inner
801    }
802}
803
804impl Drop for Inner {
805    fn drop(&mut self) {
806        if self.print_profile_on_drop {
807            M.print_profile();
808        }
809
810        if !self.temporary {
811            return;
812        }
813
814        // Our files are temporary, so nuke them.
815        debug!("removing temporary storage file {:?}", self.get_path());
816        let _res = fs::remove_dir_all(&self.get_path());
817    }
818}
819
820impl RunningConfig {
821    // returns the snapshot file paths for this system
822    #[doc(hidden)]
823    pub fn get_snapshot_files(&self) -> io::Result<Vec<PathBuf>> {
824        let path = self.get_path().join("snap.");
825
826        let absolute_path: PathBuf = if Path::new(&path).is_absolute() {
827            path
828        } else {
829            std::env::current_dir()?.join(path)
830        };
831
832        let filter = |dir_entry: io::Result<fs::DirEntry>| {
833            if let Ok(de) = dir_entry {
834                let path_buf = de.path();
835                let path = path_buf.as_path();
836                let path_str = &*path.to_string_lossy();
837                if path_str.starts_with(&*absolute_path.to_string_lossy())
838                    && !path_str.ends_with(".in___motion")
839                {
840                    Some(path.to_path_buf())
841                } else {
842                    None
843                }
844            } else {
845                None
846            }
847        };
848
849        let snap_dir = Path::new(&absolute_path).parent().unwrap();
850
851        if !snap_dir.exists() {
852            fs::create_dir_all(snap_dir)?;
853        }
854
855        Ok(snap_dir.read_dir()?.filter_map(filter).collect())
856    }
857}
858
859fn crate_version() -> (usize, usize) {
860    let vsn = env!("CARGO_PKG_VERSION");
861    let mut parts = vsn.split('.');
862    let major = parts.next().unwrap().parse().unwrap();
863    let minor = parts.next().unwrap().parse().unwrap();
864    (major, minor)
865}