sled/db.rs
1use std::ops::Deref;
2
3use crate::*;
4
5/// The `sled` embedded database! Implements
6/// `Deref<Target = sled::Tree>` to refer to
7/// a default keyspace / namespace / bucket.
8#[derive(Clone)]
9pub struct Db {
10 #[doc(hidden)]
11 pub context: Context,
12 pub(crate) default: Tree,
13 tenants: Arc<RwLock<FastMap8<IVec, Tree>>>,
14}
15
16/// Opens a `Db` with a default configuration at the
17/// specified path. This will create a new storage
18/// directory at the specified path if it does
19/// not already exist. You can use the `Db::was_recovered`
20/// method to determine if your database was recovered
21/// from a previous instance. You can use `Config::create_new`
22/// if you want to increase the chances that the database
23/// will be freshly created.
24pub fn open<P: AsRef<std::path::Path>>(path: P) -> Result<Db> {
25 Config::new().path(path).open()
26}
27
28#[allow(unsafe_code)]
29unsafe impl Send for Db {}
30
31#[allow(unsafe_code)]
32unsafe impl Sync for Db {}
33
34impl Deref for Db {
35 type Target = Tree;
36
37 fn deref(&self) -> &Tree {
38 &self.default
39 }
40}
41
42impl Debug for Db {
43 fn fmt(
44 &self,
45 f: &mut fmt::Formatter<'_>,
46 ) -> std::result::Result<(), fmt::Error> {
47 let tenants = self.tenants.read();
48 writeln!(f, "Db {{")?;
49 for (raw_name, tree) in tenants.iter() {
50 let name = std::str::from_utf8(raw_name)
51 .ok()
52 .map_or_else(|| format!("{:?}", raw_name), String::from);
53 write!(f, " Tree: {:?} contents: {:?}", name, tree)?;
54 }
55 write!(f, "}}")?;
56 Ok(())
57 }
58}
59
60impl Db {
61 #[doc(hidden)]
62 #[deprecated(since = "0.30.2", note = "replaced by `sled::open`")]
63 pub fn open<P: AsRef<std::path::Path>>(path: P) -> Result<Self> {
64 Config::new().path(path).open()
65 }
66
67 pub(crate) fn start_inner(config: RunningConfig) -> Result<Self> {
68 let _measure = Measure::new(&M.tree_start);
69
70 let context = Context::start(config)?;
71
72 #[cfg(all(
73 not(miri),
74 any(
75 windows,
76 target_os = "linux",
77 target_os = "macos",
78 target_os = "dragonfly",
79 target_os = "freebsd",
80 target_os = "openbsd",
81 target_os = "netbsd",
82 )
83 ))]
84 {
85 let flusher_pagecache = context.pagecache.clone();
86 let flusher = context.flush_every_ms.map(move |fem| {
87 flusher::Flusher::new(
88 "log flusher".to_owned(),
89 flusher_pagecache,
90 fem,
91 )
92 });
93 *context.flusher.lock() = flusher;
94 }
95
96 // create or open the default tree
97 let guard = pin();
98 let default =
99 meta::open_tree(&context, DEFAULT_TREE_ID.to_vec(), &guard)?;
100
101 let ret = Self {
102 context: context.clone(),
103 default,
104 tenants: Arc::new(RwLock::new(FastMap8::default())),
105 };
106
107 let mut tenants = ret.tenants.write();
108
109 for (id, root) in context.pagecache.get_meta(&guard)?.tenants() {
110 let tree = Tree(Arc::new(TreeInner {
111 tree_id: id.clone(),
112 subscribers: Subscribers::default(),
113 context: context.clone(),
114 root: AtomicU64::new(root),
115 merge_operator: RwLock::new(None),
116 }));
117 assert!(tenants.insert(id, tree).is_none());
118 }
119
120 drop(tenants);
121
122 #[cfg(feature = "event_log")]
123 {
124 for (_name, tree) in ret.tenants.read().iter() {
125 tree.verify_integrity().unwrap();
126 }
127 ret.context.event_log.verify();
128 }
129
130 Ok(ret)
131 }
132
133 /// Open or create a new disk-backed Tree with its own keyspace,
134 /// accessible from the `Db` via the provided identifier.
135 pub fn open_tree<V: AsRef<[u8]>>(&self, name: V) -> Result<Tree> {
136 let name_ref = name.as_ref();
137 let tenants = self.tenants.read();
138 if let Some(tree) = tenants.get(name_ref) {
139 return Ok(tree.clone());
140 }
141 drop(tenants);
142
143 let guard = pin();
144
145 let mut tenants = self.tenants.write();
146
147 // we need to check this again in case another
148 // thread opened it concurrently.
149 if let Some(tree) = tenants.get(name_ref) {
150 return Ok(tree.clone());
151 }
152
153 let tree = meta::open_tree(&self.context, name_ref.to_vec(), &guard)?;
154
155 assert!(tenants.insert(name_ref.into(), tree.clone()).is_none());
156
157 Ok(tree)
158 }
159
160 /// Remove a disk-backed collection.
161 pub fn drop_tree<V: AsRef<[u8]>>(&self, name: V) -> Result<bool> {
162 let name_ref = name.as_ref();
163 if name_ref == DEFAULT_TREE_ID {
164 return Err(Error::Unsupported(
165 "cannot remove the core structures".into(),
166 ));
167 }
168 trace!("dropping tree {:?}", name_ref,);
169
170 let mut tenants = self.tenants.write();
171
172 let tree = if let Some(tree) = tenants.remove(&*name_ref) {
173 tree
174 } else {
175 return Ok(false);
176 };
177
178 let guard = pin();
179
180 let mut root_id =
181 Some(self.context.pagecache.meta_pid_for_name(name_ref, &guard)?);
182
183 let mut leftmost_chain: Vec<PageId> = vec![root_id.unwrap()];
184 let mut cursor = root_id.unwrap();
185 while let Some(view) = self.view_for_pid(cursor, &guard)? {
186 if let Some(index) = view.data.index_ref() {
187 let leftmost_child = index.pointers[0];
188 leftmost_chain.push(leftmost_child);
189 cursor = leftmost_child;
190 } else {
191 break;
192 }
193 }
194
195 loop {
196 let res = self
197 .context
198 .pagecache
199 .cas_root_in_meta(name_ref, root_id, None, &guard)?;
200
201 if let Err(actual_root) = res {
202 root_id = actual_root;
203 } else {
204 break;
205 }
206 }
207
208 tree.root.store(u64::max_value(), SeqCst);
209
210 // drop writer lock
211 drop(tenants);
212
213 tree.gc_pages(leftmost_chain)?;
214
215 guard.flush();
216
217 Ok(true)
218 }
219
220 /// Returns the trees names saved in this Db.
221 pub fn tree_names(&self) -> Vec<IVec> {
222 let tenants = self.tenants.read();
223 tenants.iter().map(|(name, _)| name.clone()).collect()
224 }
225
226 /// Returns `true` if the database was
227 /// recovered from a previous process.
228 /// Note that database state is only
229 /// guaranteed to be present up to the
230 /// last call to `flush`! Otherwise state
231 /// is synced to disk periodically if the
232 /// `sync_every_ms` configuration option
233 /// is set to `Some(number_of_ms_between_syncs)`
234 /// or if the IO buffer gets filled to
235 /// capacity before being rotated.
236 pub fn was_recovered(&self) -> bool {
237 self.context.was_recovered()
238 }
239
240 /// Generate a monotonic ID. Not guaranteed to be
241 /// contiguous. Written to disk every `idgen_persist_interval`
242 /// operations, followed by a blocking flush. During recovery, we
243 /// take the last recovered generated ID and add 2x
244 /// the `idgen_persist_interval` to it. While persisting, if the
245 /// previous persisted counter wasn't synced to disk yet, we will do
246 /// a blocking flush to fsync the latest counter, ensuring
247 /// that we will never give out the same counter twice.
248 pub fn generate_id(&self) -> Result<u64> {
249 self.context.generate_id()
250 }
251
252 /// A database export method for all collections in the `Db`,
253 /// for use in sled version upgrades. Can be used in combination
254 /// with the `import` method below on a database running a later
255 /// version.
256 ///
257 /// # Panics
258 ///
259 /// Panics if any IO problems occur while trying
260 /// to perform the export.
261 ///
262 /// # Examples
263 ///
264 /// If you want to migrate from one version of sled
265 /// to another, you need to pull in both versions
266 /// by using version renaming:
267 ///
268 /// `Cargo.toml`:
269 ///
270 /// ```toml
271 /// [dependencies]
272 /// sled = "0.32"
273 /// old_sled = { version = "0.31", package = "sled" }
274 /// ```
275 ///
276 /// and in your code, remember that old versions of
277 /// sled might have a different way to open them
278 /// than the current `sled::open` method:
279 ///
280 /// ```
281 /// # use sled as old_sled;
282 /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
283 /// let old = old_sled::open("my_old_db")?;
284 ///
285 /// // may be a different version of sled,
286 /// // the export type is version agnostic.
287 /// let new = sled::open("my_new_db")?;
288 ///
289 /// let export = old.export();
290 /// new.import(export);
291 ///
292 /// assert_eq!(old.checksum()?, new.checksum()?);
293 /// # Ok(()) }
294 /// ```
295 pub fn export(
296 &self,
297 ) -> Vec<(CollectionType, CollectionName, impl Iterator<Item = Vec<Vec<u8>>>)>
298 {
299 let tenants = self.tenants.read();
300
301 let mut ret = vec![];
302
303 for (name, tree) in tenants.iter() {
304 ret.push((
305 b"tree".to_vec(),
306 name.to_vec(),
307 tree.iter().map(|kv_opt| {
308 let kv = kv_opt.unwrap();
309 vec![kv.0.to_vec(), kv.1.to_vec()]
310 }),
311 ));
312 }
313
314 ret
315 }
316
317 /// Imports the collections from a previous database.
318 ///
319 /// # Panics
320 ///
321 /// Panics if any IO problems occur while trying
322 /// to perform the import.
323 ///
324 /// # Examples
325 ///
326 /// If you want to migrate from one version of sled
327 /// to another, you need to pull in both versions
328 /// by using version renaming:
329 ///
330 /// `Cargo.toml`:
331 ///
332 /// ```toml
333 /// [dependencies]
334 /// sled = "0.32"
335 /// old_sled = { version = "0.31", package = "sled" }
336 /// ```
337 ///
338 /// and in your code, remember that old versions of
339 /// sled might have a different way to open them
340 /// than the current `sled::open` method:
341 ///
342 /// ```
343 /// # use sled as old_sled;
344 /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
345 /// let old = old_sled::open("my_old_db")?;
346 ///
347 /// // may be a different version of sled,
348 /// // the export type is version agnostic.
349 /// let new = sled::open("my_new_db")?;
350 ///
351 /// let export = old.export();
352 /// new.import(export);
353 ///
354 /// assert_eq!(old.checksum()?, new.checksum()?);
355 /// # Ok(()) }
356 /// ```
357 pub fn import(
358 &self,
359 export: Vec<(
360 CollectionType,
361 CollectionName,
362 impl Iterator<Item = Vec<Vec<u8>>>,
363 )>,
364 ) {
365 for (collection_type, collection_name, collection_iter) in export {
366 match collection_type {
367 ref t if t == b"tree" => {
368 let tree = self
369 .open_tree(collection_name)
370 .expect("failed to open new tree during import");
371 for mut kv in collection_iter {
372 let v = kv
373 .pop()
374 .expect("failed to get value from tree export");
375 let k = kv
376 .pop()
377 .expect("failed to get key from tree export");
378 let old = tree.insert(k, v).expect(
379 "failed to insert value during tree import",
380 );
381 assert!(
382 old.is_none(),
383 "import is overwriting existing data"
384 );
385 }
386 }
387 other => panic!("unknown collection type {:?}", other),
388 }
389 }
390 }
391
392 /// Returns the CRC32 of all keys and values
393 /// in this Db.
394 ///
395 /// This is O(N) and locks all underlying Trees
396 /// for the duration of the entire scan.
397 pub fn checksum(&self) -> Result<u32> {
398 let tenants_mu = self.tenants.write();
399
400 // we use a btreemap to ensure lexicographic
401 // iteration over tree names to have consistent
402 // checksums.
403 let tenants: BTreeMap<_, _> = tenants_mu.iter().collect();
404
405 let mut hasher = crc32fast::Hasher::new();
406 let mut locks = vec![];
407
408 locks.push(concurrency_control::write());
409
410 for (name, tree) in &tenants {
411 hasher.update(name);
412
413 let mut iter = tree.iter();
414 while let Some(kv_res) = iter.next_inner() {
415 let (k, v) = kv_res?;
416 hasher.update(&k);
417 hasher.update(&v);
418 }
419 }
420
421 Ok(hasher.finalize())
422 }
423
424 /// Returns the on-disk size of the storage files
425 /// for this database.
426 pub fn size_on_disk(&self) -> Result<u64> {
427 self.context.pagecache.size_on_disk()
428 }
429
430 /// Traverses all files and calculates their total physical
431 /// size, then traverses all pages and calculates their
432 /// total logical size, then divides the physical size
433 /// by the logical size.
434 #[doc(hidden)]
435 pub fn space_amplification(&self) -> Result<f64> {
436 self.context.pagecache.space_amplification()
437 }
438}
439
440/// These types provide the information that allows an entire
441/// system to be exported and imported to facilitate
442/// major upgrades. It is comprised entirely
443/// of standard library types to be forward compatible.
444/// NB this definitions are expensive to change, because
445/// they impact the migration path.
446type CollectionType = Vec<u8>;
447type CollectionName = Vec<u8>;