1#[cfg(feature = "zstd")]
2use zstd::block::{compress, decompress};
3
4use crate::*;
5
6use super::{
7 arr_to_u32, gc_blobs, pwrite_all, raw_segment_iter_from, u32_to_arr,
8 u64_to_arr, BasedBuf, DiskPtr, LogIter, LogKind, LogOffset, Lsn,
9 MessageKind,
10};
11
12#[derive(PartialEq, Debug, Default)]
15#[cfg_attr(test, derive(Clone))]
16pub struct Snapshot {
17 pub stable_lsn: Option<Lsn>,
19 pub active_segment: Option<LogOffset>,
21 pub pt: Vec<PageState>,
23}
24
25#[derive(Clone, Debug, PartialEq)]
26pub enum PageState {
27 Present {
44 base: (Lsn, DiskPtr, u64),
45 frags: Vec<(Lsn, DiskPtr, u64)>,
46 },
47
48 Free(Lsn, DiskPtr),
50 Uninitialized,
51}
52
53impl PageState {
54 fn push(&mut self, item: (Lsn, DiskPtr, u64)) {
55 match *self {
56 PageState::Present { base, ref mut frags } => {
57 if frags.last().map_or(base.0, |f| f.0) < item.0 {
58 frags.push(item)
59 } else {
60 debug!(
61 "skipping merging item {:?} into \
62 existing PageState::Present({:?})",
63 item, frags
64 );
65 }
66 }
67 _ => panic!("pushed frags to {:?}", self),
68 }
69 }
70
71 pub(crate) fn is_free(&self) -> bool {
72 match *self {
73 PageState::Free(_, _) => true,
74 _ => false,
75 }
76 }
77
78 #[cfg(feature = "testing")]
79 fn offsets(&self) -> Vec<LogOffset> {
80 match *self {
81 PageState::Present { base, ref frags } => {
82 let mut offsets = vec![base.1.lid()];
83 for (_, ptr, _) in frags {
84 offsets.push(ptr.lid());
85 }
86 offsets
87 }
88 PageState::Free(_, ptr) => vec![ptr.lid()],
89 PageState::Uninitialized => {
90 panic!("called offsets on Uninitialized")
91 }
92 }
93 }
94}
95
96impl Snapshot {
97 pub fn recovered_coords(
98 &self,
99 segment_size: usize,
100 ) -> (Option<LogOffset>, Option<Lsn>) {
101 if self.stable_lsn.is_none() {
102 return (None, None);
103 }
104
105 let stable_lsn = self.stable_lsn.unwrap();
106
107 if let Some(base_offset) = self.active_segment {
108 let progress = stable_lsn % segment_size as Lsn;
109 let offset = base_offset + LogOffset::try_from(progress).unwrap();
110
111 (Some(offset), Some(stable_lsn))
112 } else {
113 let lsn_idx = stable_lsn / segment_size as Lsn
114 + if stable_lsn % segment_size as Lsn == 0 { 0 } else { 1 };
115 let next_lsn = lsn_idx * segment_size as Lsn;
116 (None, Some(next_lsn))
117 }
118 }
119
120 fn apply(
121 &mut self,
122 log_kind: LogKind,
123 pid: PageId,
124 lsn: Lsn,
125 disk_ptr: DiskPtr,
126 sz: u64,
127 ) -> Result<()> {
128 trace!(
129 "trying to deserialize buf for pid {} ptr {} lsn {}",
130 pid,
131 disk_ptr,
132 lsn
133 );
134 let _measure = Measure::new(&M.snapshot_apply);
135
136 let pushed = if self.pt.len() <= usize::try_from(pid).unwrap() {
137 self.pt.resize(
138 usize::try_from(pid + 1).unwrap(),
139 PageState::Uninitialized,
140 );
141 true
142 } else {
143 false
144 };
145
146 match log_kind {
147 LogKind::Replace => {
148 trace!(
149 "compact of pid {} at ptr {} lsn {}",
150 pid,
151 disk_ptr,
152 lsn,
153 );
154
155 let pid_usize = usize::try_from(pid).unwrap();
156
157 self.pt[pid_usize] = PageState::Present {
158 base: (lsn, disk_ptr, sz),
159 frags: vec![],
160 };
161 }
162 LogKind::Link => {
163 if let Some(lids @ PageState::Present { .. }) =
167 self.pt.get_mut(usize::try_from(pid).unwrap())
168 {
169 trace!(
170 "append of pid {} at lid {} lsn {}",
171 pid,
172 disk_ptr,
173 lsn,
174 );
175
176 lids.push((lsn, disk_ptr, sz));
177 } else {
178 trace!(
179 "skipping dangling append of pid {} at lid {} lsn {}",
180 pid,
181 disk_ptr,
182 lsn,
183 );
184 if pushed {
185 let old = self.pt.pop().unwrap();
186 if old != PageState::Uninitialized {
187 error!("expected previous page state to be uninitialized");
188 return Err(Error::corruption(None));
189 }
190 }
191 }
192 }
193 LogKind::Free => {
194 trace!("free of pid {} at ptr {} lsn {}", pid, disk_ptr, lsn);
195 self.pt[usize::try_from(pid).unwrap()] =
196 PageState::Free(lsn, disk_ptr);
197 }
198 LogKind::Corrupted | LogKind::Skip => {
199 error!(
200 "unexpected messagekind in snapshot application for pid {}: {:?}",
201 pid, log_kind
202 );
203 return Err(Error::corruption(None));
204 }
205 }
206
207 Ok(())
208 }
209}
210
211fn advance_snapshot(
212 mut iter: LogIter,
213 mut snapshot: Snapshot,
214 config: &RunningConfig,
215) -> Result<Snapshot> {
216 let _measure = Measure::new(&M.advance_snapshot);
217
218 trace!("building on top of old snapshot: {:?}", snapshot);
219
220 let old_stable_lsn = snapshot.stable_lsn;
221
222 while let Some((log_kind, pid, lsn, ptr, sz)) = iter.next() {
223 trace!(
224 "in advance_snapshot looking at item with pid {} lsn {} ptr {}",
225 pid,
226 lsn,
227 ptr
228 );
229
230 if lsn < snapshot.stable_lsn.unwrap_or(-1) {
231 trace!(
234 "continuing in advance_snapshot, lsn {} ptr {} stable_lsn {:?}",
235 lsn,
236 ptr,
237 snapshot.stable_lsn
238 );
239 continue;
240 }
241
242 if lsn >= iter.max_lsn.unwrap() {
243 error!("lsn {} >= iter max_lsn {}", lsn, iter.max_lsn.unwrap());
244 return Err(Error::corruption(None));
245 }
246
247 snapshot.apply(log_kind, pid, lsn, ptr, sz)?;
248 }
249
250 let no_recovery_progress = iter.cur_lsn.is_none()
267 || iter.cur_lsn.unwrap() <= snapshot.stable_lsn.unwrap_or(0);
268 let db_is_empty = no_recovery_progress && snapshot.stable_lsn.is_none();
269
270 #[cfg(feature = "testing")]
271 let mut shred_point = None;
272
273 let snapshot = if db_is_empty {
274 trace!("db is empty, returning default snapshot");
275 if snapshot != Snapshot::default() {
276 error!("expected snapshot to be Snapshot::default");
277 return Err(Error::corruption(None));
278 }
279 snapshot
280 } else if iter.cur_lsn.is_none() {
281 trace!(
282 "no recovery progress happened since the last snapshot \
283 was generated, returning the previous one"
284 );
285 snapshot
286 } else {
287 let iterated_lsn = iter.cur_lsn.unwrap();
288
289 let segment_progress: Lsn = iterated_lsn % (config.segment_size as Lsn);
290
291 let monotonic = segment_progress >= SEG_HEADER_LEN as Lsn
296 || (segment_progress == 0 && iter.segment_base.is_none());
297 if !monotonic {
298 error!("expected segment progress {} to be above SEG_HEADER_LEN or == 0, cur_lsn: {}",
299 segment_progress,
300 iterated_lsn,
301 );
302 return Err(Error::corruption(None));
303 }
304
305 let (stable_lsn, active_segment) = if segment_progress
306 + MAX_MSG_HEADER_LEN as Lsn
307 >= config.segment_size as Lsn
308 {
309 let bumped =
310 config.normalize(iterated_lsn) + config.segment_size as Lsn;
311 trace!("bumping snapshot.stable_lsn to {}", bumped);
312 (bumped, None)
313 } else {
314 if let Some(BasedBuf { offset, .. }) = iter.segment_base {
315 let shred_len = config.segment_size
318 - usize::try_from(segment_progress).unwrap()
319 - 1;
320 let shred_zone = vec![MessageKind::Corrupted.into(); shred_len];
321 let shred_base =
322 offset + LogOffset::try_from(segment_progress).unwrap();
323
324 #[cfg(feature = "testing")]
325 {
326 shred_point = Some(shred_base);
327 }
328
329 debug!(
330 "zeroing the end of the recovered segment at lsn {} between lids {} and {}",
331 config.normalize(iterated_lsn),
332 shred_base,
333 shred_base + shred_len as LogOffset
334 );
335 pwrite_all(&config.file, &shred_zone, shred_base)?;
336 config.file.sync_all()?;
337 }
338 (iterated_lsn, iter.segment_base.map(|bb| bb.offset))
339 };
340
341 if stable_lsn < snapshot.stable_lsn.unwrap_or(0) {
342 error!(
343 "unexpected corruption encountered in storage snapshot file. \
344 stable lsn {} should be >= snapshot.stable_lsn {}",
345 stable_lsn,
346 snapshot.stable_lsn.unwrap_or(0),
347 );
348 return Err(Error::corruption(None));
349 }
350
351 snapshot.stable_lsn = Some(stable_lsn);
352 snapshot.active_segment = active_segment;
353
354 snapshot
355 };
356
357 trace!("generated snapshot: {:?}", snapshot);
358
359 if snapshot.stable_lsn < old_stable_lsn {
360 error!("unexpected corruption encountered in storage snapshot file");
361 return Err(Error::corruption(None));
362 }
363
364 if snapshot.stable_lsn > old_stable_lsn {
365 write_snapshot(config, &snapshot)?;
366 }
367
368 #[cfg(feature = "testing")]
369 let reverse_segments = {
370 use std::collections::{HashMap, HashSet};
371 let shred_base = shred_point.unwrap_or(LogOffset::max_value());
372 let mut reverse_segments = HashMap::new();
373 for (pid, page) in snapshot.pt.iter().enumerate() {
374 let offsets = page.offsets();
375 for offset in offsets {
376 let segment = config.normalize(offset);
377 if segment == config.normalize(shred_base) {
378 assert!(
379 offset < shred_base,
380 "we shredded the location for pid {}
381 with locations {:?}
382 by zeroing the file tip after lid {}",
383 pid,
384 page,
385 shred_base
386 );
387 }
388 let entry = reverse_segments
389 .entry(segment)
390 .or_insert_with(HashSet::new);
391 entry.insert((pid, offset));
392 }
393 }
394 reverse_segments
395 };
396
397 for (lsn, to_zero) in &iter.segments {
398 debug!("zeroing torn segment at lsn {} lid {}", lsn, to_zero);
399
400 #[cfg(feature = "testing")]
401 {
402 if let Some(pids) = reverse_segments.get(to_zero) {
403 assert!(
404 pids.is_empty(),
405 "expected segment that we're zeroing at lid {} \
406 lsn {} \
407 to contain no pages, but it contained pids {:?}",
408 to_zero,
409 lsn,
410 pids
411 );
412 }
413 }
414
415 io_fail!(config, "segment initial free zero");
419 pwrite_all(
420 &config.file,
421 &*vec![MessageKind::Corrupted.into(); config.segment_size],
422 *to_zero,
423 )?;
424 if !config.temporary {
425 config.file.sync_all()?;
426 }
427 }
428
429 if let Some(stable_lsn) = snapshot.stable_lsn {
431 gc_blobs(config, stable_lsn)?;
432 }
433
434 #[cfg(feature = "event_log")]
435 config.event_log.recovered_lsn(snapshot.stable_lsn.unwrap_or(0));
436
437 Ok(snapshot)
438}
439
440pub fn read_snapshot_or_default(config: &RunningConfig) -> Result<Snapshot> {
443 let last_snap = read_snapshot(config)?.unwrap_or_else(Snapshot::default);
446
447 let log_iter =
448 raw_segment_iter_from(last_snap.stable_lsn.unwrap_or(0), config)?;
449
450 let res = advance_snapshot(log_iter, last_snap, config)?;
451
452 Ok(res)
453}
454
455fn read_snapshot(config: &RunningConfig) -> Result<Option<Snapshot>> {
459 let mut candidates = config.get_snapshot_files()?;
460 if candidates.is_empty() {
461 debug!("no previous snapshot found");
462 return Ok(None);
463 }
464
465 candidates.sort();
466 let path = candidates.pop().unwrap();
467
468 let mut f = std::fs::OpenOptions::new().read(true).open(&path)?;
469
470 let mut buf = vec![];
471 let _read = f.read_to_end(&mut buf)?;
472 let len = buf.len();
473 if len <= 12 {
474 warn!("empty/corrupt snapshot file found");
475 return Err(Error::corruption(None));
476 }
477
478 let mut len_expected_bytes = [0; 8];
479 len_expected_bytes.copy_from_slice(&buf[len - 12..len - 4]);
480
481 let mut crc_expected_bytes = [0; 4];
482 crc_expected_bytes.copy_from_slice(&buf[len - 4..]);
483
484 let _ = buf.split_off(len - 12);
485 let crc_expected: u32 = arr_to_u32(&crc_expected_bytes);
486
487 let crc_actual = crc32(&buf);
488
489 if crc_expected != crc_actual {
490 warn!("corrupt snapshot file found, crc does not match expected");
491 return Err(Error::corruption(None));
492 }
493
494 #[cfg(feature = "zstd")]
495 let bytes = if config.use_compression {
496 use std::convert::TryInto;
497
498 let len_expected: u64 =
499 u64::from_le_bytes(len_expected_bytes.as_ref().try_into().unwrap());
500
501 decompress(&*buf, usize::try_from(len_expected).unwrap()).unwrap()
502 } else {
503 buf
504 };
505
506 #[cfg(not(feature = "zstd"))]
507 let bytes = buf;
508
509 Snapshot::deserialize(&mut bytes.as_slice()).map(Some)
510}
511
512fn write_snapshot(config: &RunningConfig, snapshot: &Snapshot) -> Result<()> {
513 trace!("writing snapshot {:?}", snapshot);
514
515 let raw_bytes = snapshot.serialize();
516 let decompressed_len = raw_bytes.len();
517
518 #[cfg(feature = "zstd")]
519 let bytes = if config.use_compression {
520 compress(&*raw_bytes, config.compression_factor).unwrap()
521 } else {
522 raw_bytes
523 };
524
525 #[cfg(not(feature = "zstd"))]
526 let bytes = raw_bytes;
527
528 let crc32: [u8; 4] = u32_to_arr(crc32(&bytes));
529 let len_bytes: [u8; 8] = u64_to_arr(decompressed_len as u64);
530
531 let path_1_suffix =
532 format!("snap.{:016X}.generating", snapshot.stable_lsn.unwrap_or(0));
533
534 let mut path_1 = config.get_path();
535 path_1.push(path_1_suffix);
536
537 let path_2_suffix =
538 format!("snap.{:016X}", snapshot.stable_lsn.unwrap_or(0));
539
540 let mut path_2 = config.get_path();
541 path_2.push(path_2_suffix);
542
543 let parent = path_1.parent().unwrap();
544 std::fs::create_dir_all(parent)?;
545 let mut f =
546 std::fs::OpenOptions::new().write(true).create(true).open(&path_1)?;
547
548 io_fail!(config, "snap write");
550 f.write_all(&*bytes)?;
551 io_fail!(config, "snap write len");
552 f.write_all(&len_bytes)?;
553 io_fail!(config, "snap write crc");
554 f.write_all(&crc32)?;
555 io_fail!(config, "snap write post");
556 f.sync_all()?;
557
558 trace!("wrote snapshot to {}", path_1.to_string_lossy());
559
560 io_fail!(config, "snap write mv");
561 std::fs::rename(&path_1, &path_2)?;
562 io_fail!(config, "snap write mv post");
563
564 trace!("renamed snapshot to {}", path_2.to_string_lossy());
565
566 let candidates = config.get_snapshot_files()?;
568 for path in candidates {
569 let path_str = path.file_name().unwrap().to_str().unwrap();
570 if !path_2.to_string_lossy().ends_with(&*path_str) {
571 debug!("removing old snapshot file {:?}", path);
572
573 io_fail!(config, "snap write rm old");
574
575 if let Err(e) = std::fs::remove_file(&path) {
576 warn!(
578 "failed to remove old snapshot file, maybe snapshot race? {}",
579 e
580 );
581 }
582 }
583 }
584 Ok(())
585}