1use std::{
2 collections::HashMap,
3 fs,
4 fs::File,
5 io,
6 io::{BufRead, BufReader, ErrorKind, Read, Seek, Write},
7 ops::Deref,
8 path::{Path, PathBuf},
9 sync::atomic::AtomicUsize,
10};
11
12use crate::pagecache::{arr_to_u32, u32_to_arr, Lsn};
13use crate::*;
14
15const DEFAULT_PATH: &str = "default.sled";
16
17#[derive(Debug, Clone, Copy)]
20pub enum Mode {
21 LowSpace,
28 HighThroughput,
32}
33
34#[derive(Debug, Eq, PartialEq, Clone, Copy)]
37struct StorageParameters {
38 pub segment_size: usize,
39 pub use_compression: bool,
40 pub version: (usize, usize),
41}
42
43impl StorageParameters {
44 pub fn serialize(&self) -> Vec<u8> {
45 let mut out = vec![];
46
47 writeln!(&mut out, "segment_size: {}", self.segment_size).unwrap();
48 writeln!(&mut out, "use_compression: {}", self.use_compression)
49 .unwrap();
50 writeln!(&mut out, "version: {}.{}", self.version.0, self.version.1)
51 .unwrap();
52
53 out
54 }
55
56 pub fn deserialize(bytes: &[u8]) -> Result<StorageParameters> {
57 let reader = BufReader::new(bytes);
58
59 let mut lines = HashMap::new();
60
61 for line in reader.lines() {
62 let line = if let Ok(l) = line {
63 l
64 } else {
65 error!(
66 "failed to parse persisted config as UTF-8. \
67 This changed in sled version 0.29"
68 );
69 return Err(Error::Unsupported(
70 "failed to open database that may \
71 have been created using a sled version \
72 earlier than 0.29"
73 .to_string(),
74 ));
75 };
76 let mut split = line.split(": ").map(String::from);
77 let k = if let Some(k) = split.next() {
78 k
79 } else {
80 error!("failed to parse persisted config line: {}", line);
81 return Err(Error::corruption(None));
82 };
83 let v = if let Some(v) = split.next() {
84 v
85 } else {
86 error!("failed to parse persisted config line: {}", line);
87 return Err(Error::corruption(None));
88 };
89 lines.insert(k, v);
90 }
91
92 let segment_size: usize = if let Some(raw) = lines.get("segment_size") {
93 if let Ok(parsed) = raw.parse() {
94 parsed
95 } else {
96 error!("failed to parse segment_size value: {}", raw);
97 return Err(Error::corruption(None));
98 }
99 } else {
100 error!(
101 "failed to retrieve required configuration parameter: segment_size"
102 );
103 return Err(Error::corruption(None));
104 };
105
106 let use_compression: bool = if let Some(raw) =
107 lines.get("use_compression")
108 {
109 if let Ok(parsed) = raw.parse() {
110 parsed
111 } else {
112 error!("failed to parse use_compression value: {}", raw);
113 return Err(Error::corruption(None));
114 }
115 } else {
116 error!(
117 "failed to retrieve required configuration parameter: use_compression"
118 );
119 return Err(Error::corruption(None));
120 };
121
122 let version: (usize, usize) = if let Some(raw) = lines.get("version") {
123 let mut split = raw.split('.');
124 let major = if let Some(raw_major) = split.next() {
125 if let Ok(parsed_major) = raw_major.parse() {
126 parsed_major
127 } else {
128 error!(
129 "failed to parse major version value from line: {}",
130 raw
131 );
132 return Err(Error::corruption(None));
133 }
134 } else {
135 error!("failed to parse major version value: {}", raw);
136 return Err(Error::corruption(None));
137 };
138
139 let minor = if let Some(raw_minor) = split.next() {
140 if let Ok(parsed_minor) = raw_minor.parse() {
141 parsed_minor
142 } else {
143 error!(
144 "failed to parse minor version value from line: {}",
145 raw
146 );
147 return Err(Error::corruption(None));
148 }
149 } else {
150 error!("failed to parse minor version value: {}", raw);
151 return Err(Error::corruption(None));
152 };
153
154 (major, minor)
155 } else {
156 error!(
157 "failed to retrieve required configuration parameter: version"
158 );
159 return Err(Error::corruption(None));
160 };
161
162 Ok(StorageParameters { segment_size, use_compression, version })
163 }
164}
165
166#[derive(Default, Debug, Clone)]
177pub struct Config(Arc<Inner>);
178
179impl Deref for Config {
180 type Target = Inner;
181
182 fn deref(&self) -> &Inner {
183 &self.0
184 }
185}
186
187#[doc(hidden)]
188#[derive(Debug, Clone)]
189pub struct Inner {
190 #[doc(hidden)]
191 pub cache_capacity: u64,
192 #[doc(hidden)]
193 pub flush_every_ms: Option<u64>,
194 #[doc(hidden)]
195 pub segment_size: usize,
196 #[doc(hidden)]
197 pub path: PathBuf,
198 #[doc(hidden)]
199 pub create_new: bool,
200 #[doc(hidden)]
201 pub mode: Mode,
202 #[doc(hidden)]
203 pub temporary: bool,
204 #[doc(hidden)]
205 pub use_compression: bool,
206 #[doc(hidden)]
207 pub compression_factor: i32,
208 #[doc(hidden)]
209 pub print_profile_on_drop: bool,
210 #[doc(hidden)]
211 pub idgen_persist_interval: u64,
212 #[doc(hidden)]
213 pub version: (usize, usize),
214 tmp_path: PathBuf,
215 pub(crate) global_error: Arc<Atomic<Error>>,
216 #[cfg(feature = "event_log")]
217 pub event_log: Arc<event_log::EventLog>,
219}
220
221impl Default for Inner {
222 fn default() -> Self {
223 Self {
224 path: PathBuf::from(DEFAULT_PATH),
226 tmp_path: Config::gen_temp_path(),
227 create_new: false,
228 cache_capacity: 1024 * 1024 * 1024, mode: Mode::LowSpace,
230 use_compression: false,
231 compression_factor: 5,
232 temporary: false,
233 version: crate_version(),
234
235 segment_size: 512 * 1024, print_profile_on_drop: false,
238 flush_every_ms: Some(500),
239 idgen_persist_interval: 1_000_000,
240 global_error: Arc::new(Atomic::default()),
241 #[cfg(feature = "event_log")]
242 event_log: Arc::new(crate::event_log::EventLog::default()),
243 }
244 }
245}
246
247impl Inner {
248 #[doc(hidden)]
250 pub fn get_path(&self) -> PathBuf {
251 if self.temporary && self.path == PathBuf::from(DEFAULT_PATH) {
252 self.tmp_path.clone()
253 } else {
254 self.path.clone()
255 }
256 }
257
258 pub(crate) fn blob_path(&self, id: Lsn) -> PathBuf {
259 self.get_path().join("blobs").join(format!("{}", id))
260 }
261
262 fn db_path(&self) -> PathBuf {
263 self.get_path().join("db")
264 }
265
266 fn config_path(&self) -> PathBuf {
267 self.get_path().join("conf")
268 }
269
270 pub(crate) fn normalize<T>(&self, value: T) -> T
271 where
272 T: Copy
273 + TryFrom<usize>
274 + std::ops::Div<Output = T>
275 + std::ops::Mul<Output = T>,
276 <T as std::convert::TryFrom<usize>>::Error: Debug,
277 {
278 let segment_size: T = T::try_from(self.segment_size).unwrap();
279 value / segment_size * segment_size
280 }
281}
282
283macro_rules! supported {
284 ($cond:expr, $msg:expr) => {
285 if !$cond {
286 return Err(Error::Unsupported($msg.to_owned()));
287 }
288 };
289}
290
291macro_rules! builder {
292 ($(($name:ident, $t:ty, $desc:expr)),*) => {
293 $(
294 #[doc=$desc]
295 pub fn $name(mut self, to: $t) -> Self {
296 if Arc::strong_count(&self.0) != 1 {
297 error!(
298 "config has already been used to start \
299 the system and probably should not be \
300 mutated",
301 );
302 }
303 let m = Arc::make_mut(&mut self.0);
304 m.$name = to;
305 self
306 }
307 )*
308 }
309}
310
311impl Config {
312 pub fn new() -> Config {
314 Config::default()
315 }
316
317 pub fn path<P: AsRef<Path>>(mut self, path: P) -> Config {
319 let m = Arc::get_mut(&mut self.0).unwrap();
320 m.path = path.as_ref().to_path_buf();
321 self
322 }
323
324 #[doc(hidden)]
328 pub fn segment_size(mut self, segment_size: usize) -> Config {
329 if Arc::strong_count(&self.0) != 1 {
330 error!(
331 "config has already been used to start \
332 the system and probably should not be \
333 mutated",
334 );
335 }
336 let m = Arc::make_mut(&mut self.0);
337 m.segment_size = segment_size;
338 self
339 }
340
341 pub fn open(&self) -> Result<Db> {
343 self.validate()?;
345
346 let mut config = self.clone();
347 config.limit_cache_max_memory();
348
349 let file = config.open_file()?;
350
351 let config = RunningConfig { inner: config, file: Arc::new(file) };
353
354 Db::start_inner(config)
355 }
356
357 #[doc(hidden)]
358 #[deprecated(
359 since = "0.31.0",
360 note = "this does nothing for now. maybe it will come back in the future."
361 )]
362 pub const fn segment_cleanup_skew(self, _: usize) -> Self {
363 self
364 }
365
366 #[doc(hidden)]
367 #[deprecated(
368 since = "0.31.0",
369 note = "this does nothing for now. maybe it will come back in the future."
370 )]
371 pub const fn segment_cleanup_threshold(self, _: u8) -> Self {
372 self
373 }
374
375 #[doc(hidden)]
376 #[deprecated(
377 since = "0.31.0",
378 note = "this does nothing for now. maybe it will come back in the future."
379 )]
380 pub const fn snapshot_after_ops(self, _: u64) -> Self {
381 self
382 }
383
384 #[doc(hidden)]
385 #[deprecated(
386 since = "0.31.0",
387 note = "this does nothing for now. maybe it will come back in the future."
388 )]
389 pub fn snapshot_path<P>(self, _: P) -> Self {
390 self
391 }
392
393 #[doc(hidden)]
394 pub fn flush_every_ms(mut self, every_ms: Option<u64>) -> Self {
395 if Arc::strong_count(&self.0) != 1 {
396 error!(
397 "config has already been used to start \
398 the system and probably should not be \
399 mutated",
400 );
401 }
402 let m = Arc::make_mut(&mut self.0);
403 m.flush_every_ms = every_ms;
404 self
405 }
406
407 #[doc(hidden)]
408 pub fn idgen_persist_interval(mut self, interval: u64) -> Self {
409 if Arc::strong_count(&self.0) != 1 {
410 error!(
411 "config has already been used to start \
412 the system and probably should not be \
413 mutated",
414 );
415 }
416 let m = Arc::make_mut(&mut self.0);
417 m.idgen_persist_interval = interval;
418 self
419 }
420
421 #[doc(hidden)]
430 #[deprecated(since = "0.29.0", note = "use Config::open instead")]
431 pub fn build(mut self) -> RunningConfig {
432 self.validate().unwrap();
434
435 self.limit_cache_max_memory();
436
437 let file = self.open_file().unwrap_or_else(|e| {
438 panic!("open file at {:?}: {}", self.db_path(), e);
439 });
440
441 RunningConfig { inner: self, file: Arc::new(file) }
443 }
444
445 fn gen_temp_path() -> PathBuf {
446 use std::time::SystemTime;
447
448 static SALT_COUNTER: AtomicUsize = AtomicUsize::new(0);
449
450 let seed = SALT_COUNTER.fetch_add(1, SeqCst) as u128;
451
452 let now = SystemTime::now()
453 .duration_since(SystemTime::UNIX_EPOCH)
454 .unwrap()
455 .as_nanos()
456 << 48;
457
458 #[cfg(not(miri))]
459 let pid = u128::from(std::process::id());
460
461 #[cfg(miri)]
462 let pid = 0;
463
464 let salt = (pid << 16) + now + seed;
465
466 if cfg!(target_os = "linux") {
467 format!("/dev/shm/pagecache.tmp.{}", salt).into()
469 } else {
470 std::env::temp_dir().join(format!("pagecache.tmp.{}", salt))
471 }
472 }
473
474 fn limit_cache_max_memory(&mut self) {
475 let limit = sys_limits::get_memory_limit();
476 if limit > 0 && self.cache_capacity > limit {
477 let m = Arc::make_mut(&mut self.0);
478 m.cache_capacity = limit;
479 error!(
480 "cache capacity is limited to the cgroup memory \
481 limit: {} bytes",
482 self.cache_capacity
483 );
484 }
485 }
486
487 builder!(
488 (
489 cache_capacity,
490 u64,
491 "maximum size in bytes for the system page cache"
492 ),
493 (
494 mode,
495 Mode,
496 "specify whether the system should run in \"small\" or \"fast\" mode"
497 ),
498 (use_compression, bool, "whether to use zstd compression"),
499 (
500 compression_factor,
501 i32,
502 "the compression factor to use with zstd compression. Ranges from 1 up to 22. Levels >= 20 are 'ultra'."
503 ),
504 (
505 temporary,
506 bool,
507 "deletes the database after drop. if no path is set, uses /dev/shm on linux"
508 ),
509 (
510 create_new,
511 bool,
512 "attempts to exclusively open the database, failing if it already exists"
513 ),
514 (
515 print_profile_on_drop,
516 bool,
517 "print a performance profile when the Config is dropped"
518 )
519 );
520
521 fn validate(&self) -> Result<()> {
523 supported!(
524 self.segment_size.count_ones() == 1,
525 "segment_size should be a power of 2"
526 );
527 supported!(
528 self.segment_size >= 256,
529 "segment_size should be hundreds of kb at minimum, and we won't start if below 256"
530 );
531 supported!(
532 self.segment_size <= 1 << 24,
533 "segment_size should be <= 16mb"
534 );
535 if self.use_compression {
536 supported!(
537 cfg!(feature = "compression"),
538 "the 'compression' feature must be enabled"
539 );
540 }
541 supported!(
542 self.compression_factor >= 1,
543 "compression_factor must be >= 1"
544 );
545 supported!(
546 self.compression_factor <= 22,
547 "compression_factor must be <= 22"
548 );
549 supported!(
550 self.idgen_persist_interval > 0,
551 "idgen_persist_interval must be above 0"
552 );
553 Ok(())
554 }
555
556 fn open_file(&self) -> Result<File> {
557 let blob_dir: PathBuf = self.get_path().join("blobs");
558
559 if !blob_dir.exists() {
560 fs::create_dir_all(blob_dir)?;
561 }
562
563 self.verify_config()?;
564
565 let mut options = fs::OpenOptions::new();
567
568 let _ = options.create(true);
569 let _ = options.read(true);
570 let _ = options.write(true);
571
572 if self.create_new {
573 options.create_new(true);
574 }
575
576 self.try_lock(options.open(&self.db_path())?)
577 }
578
579 fn try_lock(&self, file: File) -> Result<File> {
580 #[cfg(all(
581 not(miri),
582 any(windows, target_os = "linux", target_os = "macos")
583 ))]
584 {
585 use fs2::FileExt;
586
587 let try_lock = if cfg!(feature = "testing") {
588 file.lock_exclusive()
594 } else {
595 file.try_lock_exclusive()
596 };
597
598 if let Err(e) = try_lock {
599 return Err(Error::Io(io::Error::new(
600 ErrorKind::Other,
601 format!(
602 "could not acquire lock on {:?}: {:?}",
603 self.db_path().to_string_lossy(),
604 e
605 ),
606 )));
607 }
608 }
609
610 Ok(file)
611 }
612
613 fn verify_config(&self) -> Result<()> {
614 match self.read_config() {
615 Ok(Some(old)) => {
616 supported!(
617 self.use_compression == old.use_compression,
618 format!(
619 "cannot change compression values across restarts. \
620 old value of use_compression loaded from disk: {}, \
621 currently set value: {}.",
622 old.use_compression, self.use_compression,
623 )
624 );
625
626 supported!(
627 self.segment_size == old.segment_size,
628 format!(
629 "cannot change the io buffer size across restarts. \
630 please change it back to {}",
631 old.segment_size
632 )
633 );
634
635 supported!(
636 self.version == old.version,
637 format!(
638 "This database was created using \
639 pagecache version {}.{}, but our pagecache \
640 version is {}.{}. Please perform an upgrade \
641 using the sled::Db::export and sled::Db::import \
642 methods.",
643 old.version.0,
644 old.version.1,
645 self.version.0,
646 self.version.1,
647 )
648 );
649 Ok(())
650 }
651 Ok(None) => self.write_config(),
652 Err(e) => Err(e),
653 }
654 }
655
656 fn serialize(&self) -> Vec<u8> {
657 let persisted_config = StorageParameters {
658 version: self.version,
659 segment_size: self.segment_size,
660 use_compression: self.use_compression,
661 };
662
663 persisted_config.serialize()
664 }
665
666 fn write_config(&self) -> Result<()> {
667 let bytes = self.serialize();
668 let crc: u32 = crc32(&*bytes);
669 let crc_arr = u32_to_arr(crc);
670
671 let path = self.config_path();
672
673 let mut f =
674 fs::OpenOptions::new().write(true).create(true).open(path)?;
675
676 io_fail!(self, "write_config bytes");
677 f.write_all(&*bytes)?;
678 io_fail!(self, "write_config crc");
679 f.write_all(&crc_arr)?;
680 io_fail!(self, "write_config post");
681 Ok(())
682 }
683
684 fn read_config(&self) -> Result<Option<StorageParameters>> {
685 let path = self.config_path();
686
687 let f_res = fs::OpenOptions::new().read(true).open(&path);
688
689 let mut f = match f_res {
690 Err(ref e) if e.kind() == ErrorKind::NotFound => {
691 return Ok(None);
692 }
693 Err(other) => {
694 return Err(other.into());
695 }
696 Ok(f) => f,
697 };
698
699 if f.metadata()?.len() <= 8 {
700 warn!("empty/corrupt configuration file found");
701 return Ok(None);
702 }
703
704 let mut buf = vec![];
705 let _ = f.read_to_end(&mut buf)?;
706 let len = buf.len();
707 let _ = buf.split_off(len - 4);
708
709 let mut crc_arr = [0_u8; 4];
710 let _ = f.seek(io::SeekFrom::End(-4))?;
711 f.read_exact(&mut crc_arr)?;
712 let crc_expected = arr_to_u32(&crc_arr);
713
714 let crc_actual = crc32(&*buf);
715
716 if crc_expected != crc_actual {
717 warn!(
718 "crc for settings file {:?} failed! \
719 can't verify that config is safe",
720 path
721 );
722 }
723
724 StorageParameters::deserialize(&buf).map(Some)
725 }
726
727 #[doc(hidden)]
730 pub fn global_error(&self) -> Result<()> {
731 let guard = pin();
732 let ge = self.global_error.load(Acquire, &guard);
733 if ge.is_null() {
734 Ok(())
735 } else {
736 #[allow(unsafe_code)]
737 unsafe {
738 Err(ge.deref().clone())
739 }
740 }
741 }
742
743 pub(crate) fn reset_global_error(&self) {
744 let guard = pin();
745 let old = self.global_error.swap(Shared::default(), SeqCst, &guard);
746 if !old.is_null() {
747 let guard = pin();
748 #[allow(unsafe_code)]
749 unsafe {
750 guard.defer_destroy(old);
751 }
752 }
753 }
754
755 pub(crate) fn set_global_error(&self, error_value: Error) {
756 let guard = pin();
757 let error = Owned::new(error_value);
758
759 let expected_old = Shared::null();
760
761 let _ = self.global_error.compare_and_set(
762 expected_old,
763 error,
764 SeqCst,
765 &guard,
766 );
767 }
768
769 #[cfg(feature = "failpoints")]
770 #[cfg(feature = "event_log")]
771 #[doc(hidden)]
772 pub fn truncate_corrupt(&self, new_len: u64) {
774 self.event_log.reset();
775 let path = self.db_path();
776 let f = std::fs::OpenOptions::new().write(true).open(path).unwrap();
777 f.set_len(new_len).expect("should be able to truncate");
778 }
779}
780
781#[allow(clippy::module_name_repetitions)]
784#[derive(Debug, Clone)]
785pub struct RunningConfig {
786 inner: Config,
787 pub(crate) file: Arc<File>,
788}
789
790#[allow(unsafe_code)]
791unsafe impl Send for RunningConfig {}
792
793#[allow(unsafe_code)]
794unsafe impl Sync for RunningConfig {}
795
796impl Deref for RunningConfig {
797 type Target = Config;
798
799 fn deref(&self) -> &Config {
800 &self.inner
801 }
802}
803
804impl Drop for Inner {
805 fn drop(&mut self) {
806 if self.print_profile_on_drop {
807 M.print_profile();
808 }
809
810 if !self.temporary {
811 return;
812 }
813
814 debug!("removing temporary storage file {:?}", self.get_path());
816 let _res = fs::remove_dir_all(&self.get_path());
817 }
818}
819
820impl RunningConfig {
821 #[doc(hidden)]
823 pub fn get_snapshot_files(&self) -> io::Result<Vec<PathBuf>> {
824 let path = self.get_path().join("snap.");
825
826 let absolute_path: PathBuf = if Path::new(&path).is_absolute() {
827 path
828 } else {
829 std::env::current_dir()?.join(path)
830 };
831
832 let filter = |dir_entry: io::Result<fs::DirEntry>| {
833 if let Ok(de) = dir_entry {
834 let path_buf = de.path();
835 let path = path_buf.as_path();
836 let path_str = &*path.to_string_lossy();
837 if path_str.starts_with(&*absolute_path.to_string_lossy())
838 && !path_str.ends_with(".in___motion")
839 {
840 Some(path.to_path_buf())
841 } else {
842 None
843 }
844 } else {
845 None
846 }
847 };
848
849 let snap_dir = Path::new(&absolute_path).parent().unwrap();
850
851 if !snap_dir.exists() {
852 fs::create_dir_all(snap_dir)?;
853 }
854
855 Ok(snap_dir.read_dir()?.filter_map(filter).collect())
856 }
857}
858
859fn crate_version() -> (usize, usize) {
860 let vsn = env!("CARGO_PKG_VERSION");
861 let mut parts = vsn.split('.');
862 let major = parts.next().unwrap().parse().unwrap();
863 let minor = parts.next().unwrap().parse().unwrap();
864 (major, minor)
865}