sled/pagecache/
snapshot.rs

1#[cfg(feature = "zstd")]
2use zstd::block::{compress, decompress};
3
4use crate::*;
5
6use super::{
7    arr_to_u32, gc_blobs, pwrite_all, raw_segment_iter_from, u32_to_arr,
8    u64_to_arr, BasedBuf, DiskPtr, LogIter, LogKind, LogOffset, Lsn,
9    MessageKind,
10};
11
12/// A snapshot of the state required to quickly restart
13/// the `PageCache` and `SegmentAccountant`.
14#[derive(PartialEq, Debug, Default)]
15#[cfg_attr(test, derive(Clone))]
16pub struct Snapshot {
17    /// The last read message lsn
18    pub stable_lsn: Option<Lsn>,
19    /// The last read message lid
20    pub active_segment: Option<LogOffset>,
21    /// the mapping from pages to (lsn, lid)
22    pub pt: Vec<PageState>,
23}
24
25#[derive(Clone, Debug, PartialEq)]
26pub enum PageState {
27    /// Present signifies a page that has some data.
28    ///
29    /// It has two parts. The base and the fragments.
30    /// `base` is separated to guarantee that it will
31    /// always have at least one because it is
32    /// correct by construction.
33    /// The third element in each tuple is the on-log
34    /// size for the corresponding write. If things
35    /// are pretty large, they spill into the blobs
36    /// directory, but still get a small pointer that
37    /// gets written into the log. The sizes are used
38    /// for the garbage collection statistics on
39    /// segments. The lsn and the DiskPtr can be used
40    /// for actually reading the item off the disk,
41    /// and the size tells us how much storage it uses
42    /// on the disk.
43    Present {
44        base: (Lsn, DiskPtr, u64),
45        frags: Vec<(Lsn, DiskPtr, u64)>,
46    },
47
48    /// This is a free page.
49    Free(Lsn, DiskPtr),
50    Uninitialized,
51}
52
53impl PageState {
54    fn push(&mut self, item: (Lsn, DiskPtr, u64)) {
55        match *self {
56            PageState::Present { base, ref mut frags } => {
57                if frags.last().map_or(base.0, |f| f.0) < item.0 {
58                    frags.push(item)
59                } else {
60                    debug!(
61                        "skipping merging item {:?} into \
62                        existing PageState::Present({:?})",
63                        item, frags
64                    );
65                }
66            }
67            _ => panic!("pushed frags to {:?}", self),
68        }
69    }
70
71    pub(crate) fn is_free(&self) -> bool {
72        match *self {
73            PageState::Free(_, _) => true,
74            _ => false,
75        }
76    }
77
78    #[cfg(feature = "testing")]
79    fn offsets(&self) -> Vec<LogOffset> {
80        match *self {
81            PageState::Present { base, ref frags } => {
82                let mut offsets = vec![base.1.lid()];
83                for (_, ptr, _) in frags {
84                    offsets.push(ptr.lid());
85                }
86                offsets
87            }
88            PageState::Free(_, ptr) => vec![ptr.lid()],
89            PageState::Uninitialized => {
90                panic!("called offsets on Uninitialized")
91            }
92        }
93    }
94}
95
96impl Snapshot {
97    pub fn recovered_coords(
98        &self,
99        segment_size: usize,
100    ) -> (Option<LogOffset>, Option<Lsn>) {
101        if self.stable_lsn.is_none() {
102            return (None, None);
103        }
104
105        let stable_lsn = self.stable_lsn.unwrap();
106
107        if let Some(base_offset) = self.active_segment {
108            let progress = stable_lsn % segment_size as Lsn;
109            let offset = base_offset + LogOffset::try_from(progress).unwrap();
110
111            (Some(offset), Some(stable_lsn))
112        } else {
113            let lsn_idx = stable_lsn / segment_size as Lsn
114                + if stable_lsn % segment_size as Lsn == 0 { 0 } else { 1 };
115            let next_lsn = lsn_idx * segment_size as Lsn;
116            (None, Some(next_lsn))
117        }
118    }
119
120    fn apply(
121        &mut self,
122        log_kind: LogKind,
123        pid: PageId,
124        lsn: Lsn,
125        disk_ptr: DiskPtr,
126        sz: u64,
127    ) -> Result<()> {
128        trace!(
129            "trying to deserialize buf for pid {} ptr {} lsn {}",
130            pid,
131            disk_ptr,
132            lsn
133        );
134        let _measure = Measure::new(&M.snapshot_apply);
135
136        let pushed = if self.pt.len() <= usize::try_from(pid).unwrap() {
137            self.pt.resize(
138                usize::try_from(pid + 1).unwrap(),
139                PageState::Uninitialized,
140            );
141            true
142        } else {
143            false
144        };
145
146        match log_kind {
147            LogKind::Replace => {
148                trace!(
149                    "compact of pid {} at ptr {} lsn {}",
150                    pid,
151                    disk_ptr,
152                    lsn,
153                );
154
155                let pid_usize = usize::try_from(pid).unwrap();
156
157                self.pt[pid_usize] = PageState::Present {
158                    base: (lsn, disk_ptr, sz),
159                    frags: vec![],
160                };
161            }
162            LogKind::Link => {
163                // Because we rewrite pages over time, we may have relocated
164                // a page's initial Compact to a later segment. We should skip
165                // over pages here unless we've encountered a Compact for them.
166                if let Some(lids @ PageState::Present { .. }) =
167                    self.pt.get_mut(usize::try_from(pid).unwrap())
168                {
169                    trace!(
170                        "append of pid {} at lid {} lsn {}",
171                        pid,
172                        disk_ptr,
173                        lsn,
174                    );
175
176                    lids.push((lsn, disk_ptr, sz));
177                } else {
178                    trace!(
179                        "skipping dangling append of pid {} at lid {} lsn {}",
180                        pid,
181                        disk_ptr,
182                        lsn,
183                    );
184                    if pushed {
185                        let old = self.pt.pop().unwrap();
186                        if old != PageState::Uninitialized {
187                            error!("expected previous page state to be uninitialized");
188                            return Err(Error::corruption(None));
189                        }
190                    }
191                }
192            }
193            LogKind::Free => {
194                trace!("free of pid {} at ptr {} lsn {}", pid, disk_ptr, lsn);
195                self.pt[usize::try_from(pid).unwrap()] =
196                    PageState::Free(lsn, disk_ptr);
197            }
198            LogKind::Corrupted | LogKind::Skip => {
199                error!(
200                    "unexpected messagekind in snapshot application for pid {}: {:?}",
201                    pid, log_kind
202                );
203                return Err(Error::corruption(None));
204            }
205        }
206
207        Ok(())
208    }
209}
210
211fn advance_snapshot(
212    mut iter: LogIter,
213    mut snapshot: Snapshot,
214    config: &RunningConfig,
215) -> Result<Snapshot> {
216    let _measure = Measure::new(&M.advance_snapshot);
217
218    trace!("building on top of old snapshot: {:?}", snapshot);
219
220    let old_stable_lsn = snapshot.stable_lsn;
221
222    while let Some((log_kind, pid, lsn, ptr, sz)) = iter.next() {
223        trace!(
224            "in advance_snapshot looking at item with pid {} lsn {} ptr {}",
225            pid,
226            lsn,
227            ptr
228        );
229
230        if lsn < snapshot.stable_lsn.unwrap_or(-1) {
231            // don't process already-processed Lsn's. stable_lsn is for the last
232            // item ALREADY INCLUDED lsn in the snapshot.
233            trace!(
234                "continuing in advance_snapshot, lsn {} ptr {} stable_lsn {:?}",
235                lsn,
236                ptr,
237                snapshot.stable_lsn
238            );
239            continue;
240        }
241
242        if lsn >= iter.max_lsn.unwrap() {
243            error!("lsn {} >= iter max_lsn {}", lsn, iter.max_lsn.unwrap());
244            return Err(Error::corruption(None));
245        }
246
247        snapshot.apply(log_kind, pid, lsn, ptr, sz)?;
248    }
249
250    // `snapshot.tip_lid` can be set based on 4 possibilities for the tip of the
251    // log:
252    // 1. an empty DB - tip set to None, causing a fresh segment to be
253    //    allocated on initialization
254    // 2. the recovered tip is at the end of a
255    //    segment with less space left than would fit MAX_MSG_HEADER_LEN -
256    //    tip set to None, causing a fresh segment to be allocated on
257    //    initialization, as in #1 above
258    // 3. the recovered tip is in the middle of a segment - both set to the end
259    //    of the last valid message, causing the system to be initialized to
260    //    that point without allocating a new segment
261    // 4. the recovered tip is at the beginning of a new segment, but without
262    //    any valid messages in it yet. treat as #3 above, but also take care
263    //    in te SA initialization to properly initialize any segment tracking
264    //    state despite not having any pages currently residing there.
265
266    let no_recovery_progress = iter.cur_lsn.is_none()
267        || iter.cur_lsn.unwrap() <= snapshot.stable_lsn.unwrap_or(0);
268    let db_is_empty = no_recovery_progress && snapshot.stable_lsn.is_none();
269
270    #[cfg(feature = "testing")]
271    let mut shred_point = None;
272
273    let snapshot = if db_is_empty {
274        trace!("db is empty, returning default snapshot");
275        if snapshot != Snapshot::default() {
276            error!("expected snapshot to be Snapshot::default");
277            return Err(Error::corruption(None));
278        }
279        snapshot
280    } else if iter.cur_lsn.is_none() {
281        trace!(
282            "no recovery progress happened since the last snapshot \
283            was generated, returning the previous one"
284        );
285        snapshot
286    } else {
287        let iterated_lsn = iter.cur_lsn.unwrap();
288
289        let segment_progress: Lsn = iterated_lsn % (config.segment_size as Lsn);
290
291        // progress should never be below the SEG_HEADER_LEN if the segment_base
292        // is set. progress can only be 0 if we've maxed out the
293        // previous segment, unsetting the iterator segment_base in the
294        // process.
295        let monotonic = segment_progress >= SEG_HEADER_LEN as Lsn
296            || (segment_progress == 0 && iter.segment_base.is_none());
297        if !monotonic {
298            error!("expected segment progress {} to be above SEG_HEADER_LEN or == 0, cur_lsn: {}",
299                segment_progress,
300                iterated_lsn,
301            );
302            return Err(Error::corruption(None));
303        }
304
305        let (stable_lsn, active_segment) = if segment_progress
306            + MAX_MSG_HEADER_LEN as Lsn
307            >= config.segment_size as Lsn
308        {
309            let bumped =
310                config.normalize(iterated_lsn) + config.segment_size as Lsn;
311            trace!("bumping snapshot.stable_lsn to {}", bumped);
312            (bumped, None)
313        } else {
314            if let Some(BasedBuf { offset, .. }) = iter.segment_base {
315                // either situation 3 or situation 4. we need to zero the
316                // tail of the segment after the recovered tip
317                let shred_len = config.segment_size
318                    - usize::try_from(segment_progress).unwrap()
319                    - 1;
320                let shred_zone = vec![MessageKind::Corrupted.into(); shred_len];
321                let shred_base =
322                    offset + LogOffset::try_from(segment_progress).unwrap();
323
324                #[cfg(feature = "testing")]
325                {
326                    shred_point = Some(shred_base);
327                }
328
329                debug!(
330                    "zeroing the end of the recovered segment at lsn {} between lids {} and {}",
331                    config.normalize(iterated_lsn),
332                    shred_base,
333                    shred_base + shred_len as LogOffset
334                );
335                pwrite_all(&config.file, &shred_zone, shred_base)?;
336                config.file.sync_all()?;
337            }
338            (iterated_lsn, iter.segment_base.map(|bb| bb.offset))
339        };
340
341        if stable_lsn < snapshot.stable_lsn.unwrap_or(0) {
342            error!(
343                "unexpected corruption encountered in storage snapshot file. \
344                stable lsn {} should be >= snapshot.stable_lsn {}",
345                stable_lsn,
346                snapshot.stable_lsn.unwrap_or(0),
347            );
348            return Err(Error::corruption(None));
349        }
350
351        snapshot.stable_lsn = Some(stable_lsn);
352        snapshot.active_segment = active_segment;
353
354        snapshot
355    };
356
357    trace!("generated snapshot: {:?}", snapshot);
358
359    if snapshot.stable_lsn < old_stable_lsn {
360        error!("unexpected corruption encountered in storage snapshot file");
361        return Err(Error::corruption(None));
362    }
363
364    if snapshot.stable_lsn > old_stable_lsn {
365        write_snapshot(config, &snapshot)?;
366    }
367
368    #[cfg(feature = "testing")]
369    let reverse_segments = {
370        use std::collections::{HashMap, HashSet};
371        let shred_base = shred_point.unwrap_or(LogOffset::max_value());
372        let mut reverse_segments = HashMap::new();
373        for (pid, page) in snapshot.pt.iter().enumerate() {
374            let offsets = page.offsets();
375            for offset in offsets {
376                let segment = config.normalize(offset);
377                if segment == config.normalize(shred_base) {
378                    assert!(
379                        offset < shred_base,
380                        "we shredded the location for pid {}
381                        with locations {:?}
382                        by zeroing the file tip after lid {}",
383                        pid,
384                        page,
385                        shred_base
386                    );
387                }
388                let entry = reverse_segments
389                    .entry(segment)
390                    .or_insert_with(HashSet::new);
391                entry.insert((pid, offset));
392            }
393        }
394        reverse_segments
395    };
396
397    for (lsn, to_zero) in &iter.segments {
398        debug!("zeroing torn segment at lsn {} lid {}", lsn, to_zero);
399
400        #[cfg(feature = "testing")]
401        {
402            if let Some(pids) = reverse_segments.get(to_zero) {
403                assert!(
404                    pids.is_empty(),
405                    "expected segment that we're zeroing at lid {} \
406                    lsn {} \
407                    to contain no pages, but it contained pids {:?}",
408                    to_zero,
409                    lsn,
410                    pids
411                );
412            }
413        }
414
415        // NB we intentionally corrupt this header to prevent any segment
416        // from being allocated which would duplicate its LSN, messing
417        // up recovery in the future.
418        io_fail!(config, "segment initial free zero");
419        pwrite_all(
420            &config.file,
421            &*vec![MessageKind::Corrupted.into(); config.segment_size],
422            *to_zero,
423        )?;
424        if !config.temporary {
425            config.file.sync_all()?;
426        }
427    }
428
429    // remove all blob files larger than our stable offset
430    if let Some(stable_lsn) = snapshot.stable_lsn {
431        gc_blobs(config, stable_lsn)?;
432    }
433
434    #[cfg(feature = "event_log")]
435    config.event_log.recovered_lsn(snapshot.stable_lsn.unwrap_or(0));
436
437    Ok(snapshot)
438}
439
440/// Read a `Snapshot` or generate a default, then advance it to
441/// the tip of the data file, if present.
442pub fn read_snapshot_or_default(config: &RunningConfig) -> Result<Snapshot> {
443    // NB we want to error out if the read snapshot was corrupted.
444    // We only use a default Snapshot when there is no snapshot found.
445    let last_snap = read_snapshot(config)?.unwrap_or_else(Snapshot::default);
446
447    let log_iter =
448        raw_segment_iter_from(last_snap.stable_lsn.unwrap_or(0), config)?;
449
450    let res = advance_snapshot(log_iter, last_snap, config)?;
451
452    Ok(res)
453}
454
455/// Read a `Snapshot` from disk.
456/// Returns an error if the read snapshot was corrupted.
457/// Returns `Ok(Some(snapshot))` if there was nothing written.
458fn read_snapshot(config: &RunningConfig) -> Result<Option<Snapshot>> {
459    let mut candidates = config.get_snapshot_files()?;
460    if candidates.is_empty() {
461        debug!("no previous snapshot found");
462        return Ok(None);
463    }
464
465    candidates.sort();
466    let path = candidates.pop().unwrap();
467
468    let mut f = std::fs::OpenOptions::new().read(true).open(&path)?;
469
470    let mut buf = vec![];
471    let _read = f.read_to_end(&mut buf)?;
472    let len = buf.len();
473    if len <= 12 {
474        warn!("empty/corrupt snapshot file found");
475        return Err(Error::corruption(None));
476    }
477
478    let mut len_expected_bytes = [0; 8];
479    len_expected_bytes.copy_from_slice(&buf[len - 12..len - 4]);
480
481    let mut crc_expected_bytes = [0; 4];
482    crc_expected_bytes.copy_from_slice(&buf[len - 4..]);
483
484    let _ = buf.split_off(len - 12);
485    let crc_expected: u32 = arr_to_u32(&crc_expected_bytes);
486
487    let crc_actual = crc32(&buf);
488
489    if crc_expected != crc_actual {
490        warn!("corrupt snapshot file found, crc does not match expected");
491        return Err(Error::corruption(None));
492    }
493
494    #[cfg(feature = "zstd")]
495    let bytes = if config.use_compression {
496        use std::convert::TryInto;
497
498        let len_expected: u64 =
499            u64::from_le_bytes(len_expected_bytes.as_ref().try_into().unwrap());
500
501        decompress(&*buf, usize::try_from(len_expected).unwrap()).unwrap()
502    } else {
503        buf
504    };
505
506    #[cfg(not(feature = "zstd"))]
507    let bytes = buf;
508
509    Snapshot::deserialize(&mut bytes.as_slice()).map(Some)
510}
511
512fn write_snapshot(config: &RunningConfig, snapshot: &Snapshot) -> Result<()> {
513    trace!("writing snapshot {:?}", snapshot);
514
515    let raw_bytes = snapshot.serialize();
516    let decompressed_len = raw_bytes.len();
517
518    #[cfg(feature = "zstd")]
519    let bytes = if config.use_compression {
520        compress(&*raw_bytes, config.compression_factor).unwrap()
521    } else {
522        raw_bytes
523    };
524
525    #[cfg(not(feature = "zstd"))]
526    let bytes = raw_bytes;
527
528    let crc32: [u8; 4] = u32_to_arr(crc32(&bytes));
529    let len_bytes: [u8; 8] = u64_to_arr(decompressed_len as u64);
530
531    let path_1_suffix =
532        format!("snap.{:016X}.generating", snapshot.stable_lsn.unwrap_or(0));
533
534    let mut path_1 = config.get_path();
535    path_1.push(path_1_suffix);
536
537    let path_2_suffix =
538        format!("snap.{:016X}", snapshot.stable_lsn.unwrap_or(0));
539
540    let mut path_2 = config.get_path();
541    path_2.push(path_2_suffix);
542
543    let parent = path_1.parent().unwrap();
544    std::fs::create_dir_all(parent)?;
545    let mut f =
546        std::fs::OpenOptions::new().write(true).create(true).open(&path_1)?;
547
548    // write the snapshot bytes, followed by a crc64 checksum at the end
549    io_fail!(config, "snap write");
550    f.write_all(&*bytes)?;
551    io_fail!(config, "snap write len");
552    f.write_all(&len_bytes)?;
553    io_fail!(config, "snap write crc");
554    f.write_all(&crc32)?;
555    io_fail!(config, "snap write post");
556    f.sync_all()?;
557
558    trace!("wrote snapshot to {}", path_1.to_string_lossy());
559
560    io_fail!(config, "snap write mv");
561    std::fs::rename(&path_1, &path_2)?;
562    io_fail!(config, "snap write mv post");
563
564    trace!("renamed snapshot to {}", path_2.to_string_lossy());
565
566    // clean up any old snapshots
567    let candidates = config.get_snapshot_files()?;
568    for path in candidates {
569        let path_str = path.file_name().unwrap().to_str().unwrap();
570        if !path_2.to_string_lossy().ends_with(&*path_str) {
571            debug!("removing old snapshot file {:?}", path);
572
573            io_fail!(config, "snap write rm old");
574
575            if let Err(e) = std::fs::remove_file(&path) {
576                // TODO should this just be a try return?
577                warn!(
578                    "failed to remove old snapshot file, maybe snapshot race? {}",
579                    e
580                );
581            }
582        }
583    }
584    Ok(())
585}