sled/
db.rs

1use std::ops::Deref;
2
3use crate::*;
4
5/// The `sled` embedded database! Implements
6/// `Deref<Target = sled::Tree>` to refer to
7/// a default keyspace / namespace / bucket.
8#[derive(Clone)]
9pub struct Db {
10    #[doc(hidden)]
11    pub context: Context,
12    pub(crate) default: Tree,
13    tenants: Arc<RwLock<FastMap8<IVec, Tree>>>,
14}
15
16/// Opens a `Db` with a default configuration at the
17/// specified path. This will create a new storage
18/// directory at the specified path if it does
19/// not already exist. You can use the `Db::was_recovered`
20/// method to determine if your database was recovered
21/// from a previous instance. You can use `Config::create_new`
22/// if you want to increase the chances that the database
23/// will be freshly created.
24pub fn open<P: AsRef<std::path::Path>>(path: P) -> Result<Db> {
25    Config::new().path(path).open()
26}
27
28#[allow(unsafe_code)]
29unsafe impl Send for Db {}
30
31#[allow(unsafe_code)]
32unsafe impl Sync for Db {}
33
34impl Deref for Db {
35    type Target = Tree;
36
37    fn deref(&self) -> &Tree {
38        &self.default
39    }
40}
41
42impl Debug for Db {
43    fn fmt(
44        &self,
45        f: &mut fmt::Formatter<'_>,
46    ) -> std::result::Result<(), fmt::Error> {
47        let tenants = self.tenants.read();
48        writeln!(f, "Db {{")?;
49        for (raw_name, tree) in tenants.iter() {
50            let name = std::str::from_utf8(raw_name)
51                .ok()
52                .map_or_else(|| format!("{:?}", raw_name), String::from);
53            write!(f, "    Tree: {:?} contents: {:?}", name, tree)?;
54        }
55        write!(f, "}}")?;
56        Ok(())
57    }
58}
59
60impl Db {
61    #[doc(hidden)]
62    #[deprecated(since = "0.30.2", note = "replaced by `sled::open`")]
63    pub fn open<P: AsRef<std::path::Path>>(path: P) -> Result<Self> {
64        Config::new().path(path).open()
65    }
66
67    pub(crate) fn start_inner(config: RunningConfig) -> Result<Self> {
68        let _measure = Measure::new(&M.tree_start);
69
70        let context = Context::start(config)?;
71
72        #[cfg(all(
73            not(miri),
74            any(
75                windows,
76                target_os = "linux",
77                target_os = "macos",
78                target_os = "dragonfly",
79                target_os = "freebsd",
80                target_os = "openbsd",
81                target_os = "netbsd",
82            )
83        ))]
84        {
85            let flusher_pagecache = context.pagecache.clone();
86            let flusher = context.flush_every_ms.map(move |fem| {
87                flusher::Flusher::new(
88                    "log flusher".to_owned(),
89                    flusher_pagecache,
90                    fem,
91                )
92            });
93            *context.flusher.lock() = flusher;
94        }
95
96        // create or open the default tree
97        let guard = pin();
98        let default =
99            meta::open_tree(&context, DEFAULT_TREE_ID.to_vec(), &guard)?;
100
101        let ret = Self {
102            context: context.clone(),
103            default,
104            tenants: Arc::new(RwLock::new(FastMap8::default())),
105        };
106
107        let mut tenants = ret.tenants.write();
108
109        for (id, root) in context.pagecache.get_meta(&guard)?.tenants() {
110            let tree = Tree(Arc::new(TreeInner {
111                tree_id: id.clone(),
112                subscribers: Subscribers::default(),
113                context: context.clone(),
114                root: AtomicU64::new(root),
115                merge_operator: RwLock::new(None),
116            }));
117            assert!(tenants.insert(id, tree).is_none());
118        }
119
120        drop(tenants);
121
122        #[cfg(feature = "event_log")]
123        {
124            for (_name, tree) in ret.tenants.read().iter() {
125                tree.verify_integrity().unwrap();
126            }
127            ret.context.event_log.verify();
128        }
129
130        Ok(ret)
131    }
132
133    /// Open or create a new disk-backed Tree with its own keyspace,
134    /// accessible from the `Db` via the provided identifier.
135    pub fn open_tree<V: AsRef<[u8]>>(&self, name: V) -> Result<Tree> {
136        let name_ref = name.as_ref();
137        let tenants = self.tenants.read();
138        if let Some(tree) = tenants.get(name_ref) {
139            return Ok(tree.clone());
140        }
141        drop(tenants);
142
143        let guard = pin();
144
145        let mut tenants = self.tenants.write();
146
147        // we need to check this again in case another
148        // thread opened it concurrently.
149        if let Some(tree) = tenants.get(name_ref) {
150            return Ok(tree.clone());
151        }
152
153        let tree = meta::open_tree(&self.context, name_ref.to_vec(), &guard)?;
154
155        assert!(tenants.insert(name_ref.into(), tree.clone()).is_none());
156
157        Ok(tree)
158    }
159
160    /// Remove a disk-backed collection.
161    pub fn drop_tree<V: AsRef<[u8]>>(&self, name: V) -> Result<bool> {
162        let name_ref = name.as_ref();
163        if name_ref == DEFAULT_TREE_ID {
164            return Err(Error::Unsupported(
165                "cannot remove the core structures".into(),
166            ));
167        }
168        trace!("dropping tree {:?}", name_ref,);
169
170        let mut tenants = self.tenants.write();
171
172        let tree = if let Some(tree) = tenants.remove(&*name_ref) {
173            tree
174        } else {
175            return Ok(false);
176        };
177
178        let guard = pin();
179
180        let mut root_id =
181            Some(self.context.pagecache.meta_pid_for_name(name_ref, &guard)?);
182
183        let mut leftmost_chain: Vec<PageId> = vec![root_id.unwrap()];
184        let mut cursor = root_id.unwrap();
185        while let Some(view) = self.view_for_pid(cursor, &guard)? {
186            if let Some(index) = view.data.index_ref() {
187                let leftmost_child = index.pointers[0];
188                leftmost_chain.push(leftmost_child);
189                cursor = leftmost_child;
190            } else {
191                break;
192            }
193        }
194
195        loop {
196            let res = self
197                .context
198                .pagecache
199                .cas_root_in_meta(name_ref, root_id, None, &guard)?;
200
201            if let Err(actual_root) = res {
202                root_id = actual_root;
203            } else {
204                break;
205            }
206        }
207
208        tree.root.store(u64::max_value(), SeqCst);
209
210        // drop writer lock
211        drop(tenants);
212
213        tree.gc_pages(leftmost_chain)?;
214
215        guard.flush();
216
217        Ok(true)
218    }
219
220    /// Returns the trees names saved in this Db.
221    pub fn tree_names(&self) -> Vec<IVec> {
222        let tenants = self.tenants.read();
223        tenants.iter().map(|(name, _)| name.clone()).collect()
224    }
225
226    /// Returns `true` if the database was
227    /// recovered from a previous process.
228    /// Note that database state is only
229    /// guaranteed to be present up to the
230    /// last call to `flush`! Otherwise state
231    /// is synced to disk periodically if the
232    /// `sync_every_ms` configuration option
233    /// is set to `Some(number_of_ms_between_syncs)`
234    /// or if the IO buffer gets filled to
235    /// capacity before being rotated.
236    pub fn was_recovered(&self) -> bool {
237        self.context.was_recovered()
238    }
239
240    /// Generate a monotonic ID. Not guaranteed to be
241    /// contiguous. Written to disk every `idgen_persist_interval`
242    /// operations, followed by a blocking flush. During recovery, we
243    /// take the last recovered generated ID and add 2x
244    /// the `idgen_persist_interval` to it. While persisting, if the
245    /// previous persisted counter wasn't synced to disk yet, we will do
246    /// a blocking flush to fsync the latest counter, ensuring
247    /// that we will never give out the same counter twice.
248    pub fn generate_id(&self) -> Result<u64> {
249        self.context.generate_id()
250    }
251
252    /// A database export method for all collections in the `Db`,
253    /// for use in sled version upgrades. Can be used in combination
254    /// with the `import` method below on a database running a later
255    /// version.
256    ///
257    /// # Panics
258    ///
259    /// Panics if any IO problems occur while trying
260    /// to perform the export.
261    ///
262    /// # Examples
263    ///
264    /// If you want to migrate from one version of sled
265    /// to another, you need to pull in both versions
266    /// by using version renaming:
267    ///
268    /// `Cargo.toml`:
269    ///
270    /// ```toml
271    /// [dependencies]
272    /// sled = "0.32"
273    /// old_sled = { version = "0.31", package = "sled" }
274    /// ```
275    ///
276    /// and in your code, remember that old versions of
277    /// sled might have a different way to open them
278    /// than the current `sled::open` method:
279    ///
280    /// ```
281    /// # use sled as old_sled;
282    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
283    /// let old = old_sled::open("my_old_db")?;
284    ///
285    /// // may be a different version of sled,
286    /// // the export type is version agnostic.
287    /// let new = sled::open("my_new_db")?;
288    ///
289    /// let export = old.export();
290    /// new.import(export);
291    ///
292    /// assert_eq!(old.checksum()?, new.checksum()?);
293    /// # Ok(()) }
294    /// ```
295    pub fn export(
296        &self,
297    ) -> Vec<(CollectionType, CollectionName, impl Iterator<Item = Vec<Vec<u8>>>)>
298    {
299        let tenants = self.tenants.read();
300
301        let mut ret = vec![];
302
303        for (name, tree) in tenants.iter() {
304            ret.push((
305                b"tree".to_vec(),
306                name.to_vec(),
307                tree.iter().map(|kv_opt| {
308                    let kv = kv_opt.unwrap();
309                    vec![kv.0.to_vec(), kv.1.to_vec()]
310                }),
311            ));
312        }
313
314        ret
315    }
316
317    /// Imports the collections from a previous database.
318    ///
319    /// # Panics
320    ///
321    /// Panics if any IO problems occur while trying
322    /// to perform the import.
323    ///
324    /// # Examples
325    ///
326    /// If you want to migrate from one version of sled
327    /// to another, you need to pull in both versions
328    /// by using version renaming:
329    ///
330    /// `Cargo.toml`:
331    ///
332    /// ```toml
333    /// [dependencies]
334    /// sled = "0.32"
335    /// old_sled = { version = "0.31", package = "sled" }
336    /// ```
337    ///
338    /// and in your code, remember that old versions of
339    /// sled might have a different way to open them
340    /// than the current `sled::open` method:
341    ///
342    /// ```
343    /// # use sled as old_sled;
344    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
345    /// let old = old_sled::open("my_old_db")?;
346    ///
347    /// // may be a different version of sled,
348    /// // the export type is version agnostic.
349    /// let new = sled::open("my_new_db")?;
350    ///
351    /// let export = old.export();
352    /// new.import(export);
353    ///
354    /// assert_eq!(old.checksum()?, new.checksum()?);
355    /// # Ok(()) }
356    /// ```
357    pub fn import(
358        &self,
359        export: Vec<(
360            CollectionType,
361            CollectionName,
362            impl Iterator<Item = Vec<Vec<u8>>>,
363        )>,
364    ) {
365        for (collection_type, collection_name, collection_iter) in export {
366            match collection_type {
367                ref t if t == b"tree" => {
368                    let tree = self
369                        .open_tree(collection_name)
370                        .expect("failed to open new tree during import");
371                    for mut kv in collection_iter {
372                        let v = kv
373                            .pop()
374                            .expect("failed to get value from tree export");
375                        let k = kv
376                            .pop()
377                            .expect("failed to get key from tree export");
378                        let old = tree.insert(k, v).expect(
379                            "failed to insert value during tree import",
380                        );
381                        assert!(
382                            old.is_none(),
383                            "import is overwriting existing data"
384                        );
385                    }
386                }
387                other => panic!("unknown collection type {:?}", other),
388            }
389        }
390    }
391
392    /// Returns the CRC32 of all keys and values
393    /// in this Db.
394    ///
395    /// This is O(N) and locks all underlying Trees
396    /// for the duration of the entire scan.
397    pub fn checksum(&self) -> Result<u32> {
398        let tenants_mu = self.tenants.write();
399
400        // we use a btreemap to ensure lexicographic
401        // iteration over tree names to have consistent
402        // checksums.
403        let tenants: BTreeMap<_, _> = tenants_mu.iter().collect();
404
405        let mut hasher = crc32fast::Hasher::new();
406        let mut locks = vec![];
407
408        locks.push(concurrency_control::write());
409
410        for (name, tree) in &tenants {
411            hasher.update(name);
412
413            let mut iter = tree.iter();
414            while let Some(kv_res) = iter.next_inner() {
415                let (k, v) = kv_res?;
416                hasher.update(&k);
417                hasher.update(&v);
418            }
419        }
420
421        Ok(hasher.finalize())
422    }
423
424    /// Returns the on-disk size of the storage files
425    /// for this database.
426    pub fn size_on_disk(&self) -> Result<u64> {
427        self.context.pagecache.size_on_disk()
428    }
429
430    /// Traverses all files and calculates their total physical
431    /// size, then traverses all pages and calculates their
432    /// total logical size, then divides the physical size
433    /// by the logical size.
434    #[doc(hidden)]
435    pub fn space_amplification(&self) -> Result<f64> {
436        self.context.pagecache.space_amplification()
437    }
438}
439
440/// These types provide the information that allows an entire
441/// system to be exported and imported to facilitate
442/// major upgrades. It is comprised entirely
443/// of standard library types to be forward compatible.
444/// NB this definitions are expensive to change, because
445/// they impact the migration path.
446type CollectionType = Vec<u8>;
447type CollectionName = Vec<u8>;