sled/pagecache/
logger.rs

1use std::fs::File;
2
3use super::{
4    arr_to_lsn, arr_to_u32, assert_usize, bump_atomic_lsn, header, iobuf,
5    lsn_to_arr, maybe_decompress, pread_exact, pread_exact_or_eof, read_blob,
6    roll_iobuf, u32_to_arr, Arc, BasedBuf, BlobPointer, DiskPtr, IoBuf, IoBufs,
7    LogKind, LogOffset, Lsn, MessageKind, Reservation, Serialize, Snapshot,
8    BATCH_MANIFEST_PID, COUNTER_PID, MAX_MSG_HEADER_LEN, META_PID,
9    MINIMUM_ITEMS_PER_SEGMENT, SEG_HEADER_LEN,
10};
11
12use crate::*;
13
14/// A sequential store which allows users to create
15/// reservations placed at known log offsets, used
16/// for writing persistent data structures that need
17/// to know where to find persisted bits in the future.
18#[derive(Debug)]
19pub struct Log {
20    /// iobufs is the underlying lock-free IO write buffer.
21    pub(crate) iobufs: Arc<IoBufs>,
22    pub(crate) config: RunningConfig,
23}
24
25impl Log {
26    /// Start the log, open or create the configured file,
27    /// and optionally start the periodic buffer flush thread.
28    pub fn start(config: RunningConfig, snapshot: &Snapshot) -> Result<Self> {
29        let iobufs = Arc::new(IoBufs::start(config.clone(), snapshot)?);
30
31        Ok(Self { iobufs, config })
32    }
33
34    /// Flushes any pending IO buffers to disk to ensure durability.
35    /// Returns the number of bytes written during this call.
36    pub fn flush(&self) -> Result<usize> {
37        iobuf::flush(&self.iobufs)
38    }
39
40    /// Return an iterator over the log, starting with
41    /// a specified offset.
42    pub fn iter_from(&self, lsn: Lsn) -> super::LogIter {
43        self.iobufs.iter_from(lsn)
44    }
45
46    pub(crate) fn roll_iobuf(&self) -> Result<usize> {
47        roll_iobuf(&self.iobufs)
48    }
49
50    /// read a buffer from the disk
51    pub fn read(&self, pid: PageId, lsn: Lsn, ptr: DiskPtr) -> Result<LogRead> {
52        trace!("reading log lsn {} ptr {}", lsn, ptr);
53
54        let expected_segment_number = SegmentNumber(
55            u64::try_from(lsn).unwrap()
56                / u64::try_from(self.config.segment_size).unwrap(),
57        );
58
59        if ptr.is_inline() {
60            iobuf::make_durable(&self.iobufs, lsn)?;
61            let f = &self.config.file;
62            read_message(&**f, ptr.lid(), expected_segment_number, &self.config)
63        } else {
64            // we short-circuit the inline read
65            // here because it might not still
66            // exist in the inline log.
67            let (_, blob_ptr) = ptr.blob();
68            iobuf::make_durable(&self.iobufs, blob_ptr)?;
69            read_blob(blob_ptr, &self.config).map(|(kind, buf)| {
70                let header = MessageHeader {
71                    kind,
72                    pid,
73                    segment_number: expected_segment_number,
74                    crc32: 0,
75                    len: 0,
76                };
77                LogRead::Blob(header, buf, blob_ptr, 0)
78            })
79        }
80    }
81
82    /// returns the current stable offset written to disk
83    pub fn stable_offset(&self) -> Lsn {
84        self.iobufs.stable()
85    }
86
87    /// blocks until the specified log sequence number has
88    /// been made stable on disk. Returns the number of
89    /// bytes written during this call. this is appropriate
90    /// as a full consistency-barrier for all data written
91    /// up until this point.
92    pub fn make_stable(&self, lsn: Lsn) -> Result<usize> {
93        iobuf::make_stable(&self.iobufs, lsn)
94    }
95
96    /// Reserve a replacement buffer for a previously written
97    /// blob write. This ensures the message header has the
98    /// proper blob flag set.
99    pub(super) fn rewrite_blob_pointer(
100        &self,
101        pid: PageId,
102        blob_pointer: BlobPointer,
103        guard: &Guard,
104    ) -> Result<Reservation<'_>> {
105        self.reserve_inner(
106            LogKind::Replace,
107            pid,
108            &blob_pointer,
109            Some(blob_pointer),
110            guard,
111        )
112    }
113
114    /// Tries to claim a reservation for writing a buffer to a
115    /// particular location in stable storge, which may either be
116    /// completed or aborted later. Useful for maintaining
117    /// linearizability across CAS operations that may need to
118    /// persist part of their operation.
119    #[allow(unused)]
120    pub fn reserve<T: Serialize + Debug>(
121        &self,
122        log_kind: LogKind,
123        pid: PageId,
124        item: &T,
125        guard: &Guard,
126    ) -> Result<Reservation<'_>> {
127        #[cfg(feature = "compression")]
128        {
129            if self.config.use_compression && pid != BATCH_MANIFEST_PID {
130                use zstd::block::compress;
131
132                let buf = item.serialize();
133
134                let _measure = Measure::new(&M.compress);
135
136                let compressed_buf =
137                    compress(&buf, self.config.compression_factor).unwrap();
138
139                return self.reserve_inner(
140                    log_kind,
141                    pid,
142                    &IVec::from(compressed_buf),
143                    None,
144                    guard,
145                );
146            }
147        }
148
149        self.reserve_inner(log_kind, pid, item, None, guard)
150    }
151
152    fn reserve_inner<T: Serialize + Debug>(
153        &self,
154        log_kind: LogKind,
155        pid: PageId,
156        item: &T,
157        blob_rewrite: Option<Lsn>,
158        _: &Guard,
159    ) -> Result<Reservation<'_>> {
160        let _measure = Measure::new(&M.reserve_lat);
161
162        let serialized_len = item.serialized_size();
163        let max_buf_len =
164            u64::try_from(MAX_MSG_HEADER_LEN).unwrap() + serialized_len;
165
166        M.reserve_sz.measure(max_buf_len);
167
168        let max_buf_size = (self.config.segment_size
169            / MINIMUM_ITEMS_PER_SEGMENT)
170            - SEG_HEADER_LEN;
171
172        let over_blob_threshold =
173            max_buf_len > u64::try_from(max_buf_size).unwrap();
174
175        assert!(!(over_blob_threshold && blob_rewrite.is_some()));
176
177        let mut printed = false;
178        macro_rules! trace_once {
179            ($($msg:expr),*) => {
180                if !printed {
181                    trace!($($msg),*);
182                    printed = true;
183                }
184            };
185        }
186
187        let backoff = Backoff::new();
188
189        let kind = match (
190            pid,
191            log_kind,
192            over_blob_threshold || blob_rewrite.is_some(),
193        ) {
194            (COUNTER_PID, LogKind::Replace, false) => MessageKind::Counter,
195            (META_PID, LogKind::Replace, true) => MessageKind::BlobMeta,
196            (META_PID, LogKind::Replace, false) => MessageKind::InlineMeta,
197            (BATCH_MANIFEST_PID, LogKind::Skip, false) => {
198                MessageKind::BatchManifest
199            }
200            (_, LogKind::Free, false) => MessageKind::Free,
201            (_, LogKind::Replace, true) => MessageKind::BlobNode,
202            (_, LogKind::Replace, false) => MessageKind::InlineNode,
203            (_, LogKind::Link, true) => MessageKind::BlobLink,
204            (_, LogKind::Link, false) => MessageKind::InlineLink,
205            other => unreachable!(
206                "unexpected combination of PageId, \
207                 LogKind, and blob status: {:?}",
208                other
209            ),
210        };
211
212        loop {
213            M.log_reservation_attempted();
214
215            // don't continue if the system
216            // has encountered an issue.
217            if let Err(e) = self.config.global_error() {
218                let intervals = self.iobufs.intervals.lock();
219
220                // having held the mutex makes this linearized
221                // with the notify below.
222                drop(intervals);
223
224                let _notified = self.iobufs.interval_updated.notify_all();
225                return Err(e);
226            }
227
228            // load current header value
229            let iobuf = self.iobufs.current_iobuf();
230            let header = iobuf.get_header();
231            let buf_offset = header::offset(header);
232            let reservation_lsn =
233                iobuf.lsn + Lsn::try_from(buf_offset).unwrap();
234
235            // skip if already sealed
236            if header::is_sealed(header) {
237                // already sealed, start over and hope cur
238                // has already been bumped by sealer.
239                trace_once!("io buffer already sealed, spinning");
240
241                backoff.snooze();
242
243                continue;
244            }
245
246            // figure out how big the header + buf will be.
247            // this is variable because of varints used
248            // in the header.
249            let message_header = MessageHeader {
250                crc32: 0,
251                kind,
252                segment_number: SegmentNumber(
253                    u64::try_from(iobuf.lsn).unwrap()
254                        / u64::try_from(self.config.segment_size).unwrap(),
255                ),
256                pid,
257                len: if over_blob_threshold {
258                    reservation_lsn.serialized_size()
259                } else {
260                    serialized_len
261                },
262            };
263
264            let inline_buf_len = if over_blob_threshold {
265                usize::try_from(
266                    message_header.serialized_size()
267                        + reservation_lsn.serialized_size(),
268                )
269                .unwrap()
270            } else {
271                usize::try_from(
272                    message_header.serialized_size() + serialized_len,
273                )
274                .unwrap()
275            };
276
277            trace!("reserving buf of len {}", inline_buf_len);
278
279            // try to claim space
280            let prospective_size = buf_offset + inline_buf_len;
281            // we don't reserve anything if we're within the last
282            // MAX_MSG_HEADER_LEN bytes of the buffer. during
283            // recovery, we assume that nothing can begin here,
284            // because headers are dynamically sized.
285            let red_zone = iobuf.capacity - buf_offset < MAX_MSG_HEADER_LEN;
286            let would_overflow = prospective_size > iobuf.capacity || red_zone;
287            if would_overflow {
288                // This buffer is too full to accept our write!
289                // Try to seal the buffer, and maybe write it if
290                // there are zero writers.
291                trace_once!("io buffer too full, spinning");
292                iobuf::maybe_seal_and_write_iobuf(
293                    &self.iobufs,
294                    &iobuf,
295                    header,
296                    true,
297                )?;
298                backoff.spin();
299                continue;
300            }
301
302            // attempt to claim by incrementing an unsealed header
303            let bumped_offset = header::bump_offset(header, inline_buf_len);
304
305            // check for maxed out IO buffer writers
306            if header::n_writers(bumped_offset) == header::MAX_WRITERS {
307                trace_once!(
308                    "spinning because our buffer has {} writers already",
309                    header::MAX_WRITERS
310                );
311                backoff.snooze();
312                continue;
313            }
314
315            let claimed = header::incr_writers(bumped_offset);
316
317            if iobuf.cas_header(header, claimed).is_err() {
318                // CAS failed, start over
319                trace_once!("CAS failed while claiming buffer slot, spinning");
320                backoff.spin();
321                continue;
322            }
323
324            let log_offset = iobuf.offset;
325
326            // if we're giving out a reservation,
327            // the writer count should be positive
328            assert_ne!(header::n_writers(claimed), 0);
329
330            // should never have claimed a sealed buffer
331            assert!(!header::is_sealed(claimed));
332
333            // MAX is used to signify unreadiness of
334            // the underlying IO buffer, and if it's
335            // still set here, the buffer counters
336            // used to choose this IO buffer
337            // were incremented in a racy way.
338            assert_ne!(
339                log_offset,
340                LogOffset::max_value(),
341                "fucked up on iobuf with lsn {}\n{:?}",
342                reservation_lsn,
343                self
344            );
345
346            let destination = iobuf.get_mut_range(buf_offset, inline_buf_len);
347            let reservation_lid = log_offset + buf_offset as LogOffset;
348
349            trace!(
350                "reserved {} bytes at lsn {} lid {}",
351                inline_buf_len,
352                reservation_lsn,
353                reservation_lid,
354            );
355
356            bump_atomic_lsn(
357                &self.iobufs.max_reserved_lsn,
358                reservation_lsn + inline_buf_len as Lsn - 1,
359            );
360
361            let blob_id =
362                if over_blob_threshold { Some(reservation_lsn) } else { None };
363
364            self.iobufs.encapsulate(
365                item,
366                message_header,
367                destination,
368                blob_id,
369            )?;
370
371            M.log_reservation_success();
372
373            let pointer = if let Some(blob_id) = blob_id {
374                DiskPtr::new_blob(reservation_lid, blob_id)
375            } else if let Some(blob_rewrite) = blob_rewrite {
376                DiskPtr::new_blob(reservation_lid, blob_rewrite)
377            } else {
378                DiskPtr::new_inline(reservation_lid)
379            };
380
381            return Ok(Reservation {
382                iobuf,
383                log: self,
384                buf: destination,
385                flushed: false,
386                lsn: reservation_lsn,
387                pointer,
388                is_blob_rewrite: blob_rewrite.is_some(),
389                header_len: usize::try_from(message_header.serialized_size())
390                    .unwrap(),
391            });
392        }
393    }
394
395    /// Called by Reservation on termination (completion or abort).
396    /// Handles departure from shared state, and possibly writing
397    /// the buffer to stable storage if necessary.
398    pub(super) fn exit_reservation(&self, iobuf: &Arc<IoBuf>) -> Result<()> {
399        let mut header = iobuf.get_header();
400
401        // Decrement writer count, retrying until successful.
402        loop {
403            let new_hv = header::decr_writers(header);
404            match iobuf.cas_header(header, new_hv) {
405                Ok(new) => {
406                    header = new;
407                    break;
408                }
409                Err(new) => {
410                    // we failed to decr, retry
411                    header = new;
412                }
413            }
414        }
415
416        // Succeeded in decrementing writers, if we decremented writn
417        // to 0 and it's sealed then we should write it to storage.
418        if header::n_writers(header) == 0 && header::is_sealed(header) {
419            if let Err(e) = self.config.global_error() {
420                let intervals = self.iobufs.intervals.lock();
421
422                // having held the mutex makes this linearized
423                // with the notify below.
424                drop(intervals);
425
426                let _notified = self.iobufs.interval_updated.notify_all();
427                return Err(e);
428            }
429
430            let lsn = iobuf.lsn;
431            trace!(
432                "asynchronously writing iobuf with lsn {} \
433                 to log from exit_reservation",
434                lsn
435            );
436            let iobufs = self.iobufs.clone();
437            let iobuf = iobuf.clone();
438            threadpool::spawn(move || {
439                if let Err(e) = iobufs.write_to_log(&iobuf) {
440                    error!(
441                        "hit error while writing iobuf with lsn {}: {:?}",
442                        lsn, e
443                    );
444                    iobufs.config.set_global_error(e);
445                }
446            })?;
447
448            Ok(())
449        } else {
450            Ok(())
451        }
452    }
453}
454
455impl Drop for Log {
456    fn drop(&mut self) {
457        // don't do any more IO if we're crashing
458        if self.config.global_error().is_err() {
459            return;
460        }
461
462        if let Err(e) = iobuf::flush(&self.iobufs) {
463            error!("failed to flush from IoBufs::drop: {}", e);
464        }
465
466        if !self.config.temporary {
467            self.config.file.sync_all().unwrap();
468        }
469
470        debug!("IoBufs dropped");
471    }
472}
473
474/// All log messages are prepended with this header
475#[derive(Debug, Copy, Clone, PartialEq)]
476pub struct MessageHeader {
477    pub crc32: u32,
478    pub kind: MessageKind,
479    pub segment_number: SegmentNumber,
480    pub pid: PageId,
481    pub len: u64,
482}
483
484/// A number representing a segment number.
485#[derive(Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Debug)]
486#[repr(transparent)]
487pub struct SegmentNumber(pub u64);
488
489impl std::ops::Deref for SegmentNumber {
490    type Target = u64;
491
492    fn deref(&self) -> &u64 {
493        &self.0
494    }
495}
496
497/// A segment's header contains the new base LSN and a reference
498/// to the previous log segment.
499#[derive(Debug, Copy, Clone, PartialEq)]
500pub struct SegmentHeader {
501    pub lsn: Lsn,
502    pub max_stable_lsn: Lsn,
503    pub ok: bool,
504}
505
506/// The result of a read of a log message
507#[derive(Debug)]
508pub enum LogRead {
509    /// Successful read, entirely on-log
510    Inline(MessageHeader, Vec<u8>, u32),
511    /// Successful read, spilled to its own blob file
512    Blob(MessageHeader, Vec<u8>, BlobPointer, u32),
513    /// A cancelled message was encountered
514    Canceled(u32),
515    /// A padding message used to show that a segment was filled
516    Cap(SegmentNumber),
517    /// This log message was not readable due to corruption
518    Corrupted,
519    /// This blob file is no longer available
520    DanglingBlob(MessageHeader, BlobPointer, u32),
521    /// This data may only be read if at least this future location is stable
522    BatchManifest(Lsn, u32),
523}
524
525impl LogRead {
526    /// Return true if we read a successful Inline or Blob value.
527    pub fn is_successful(&self) -> bool {
528        match *self {
529            LogRead::Inline(..) | LogRead::Blob(..) => true,
530            _ => false,
531        }
532    }
533
534    /// Return the underlying data read from a log read, if successful.
535    pub fn into_data(self) -> Option<Vec<u8>> {
536        match self {
537            LogRead::Blob(_, buf, _, _) | LogRead::Inline(_, buf, _) => {
538                Some(buf)
539            }
540            _ => None,
541        }
542    }
543}
544
545impl From<[u8; SEG_HEADER_LEN]> for SegmentHeader {
546    fn from(buf: [u8; SEG_HEADER_LEN]) -> Self {
547        #[allow(unsafe_code)]
548        unsafe {
549            let crc32_header =
550                arr_to_u32(buf.get_unchecked(0..4)) ^ 0xFFFF_FFFF;
551
552            let xor_lsn = arr_to_lsn(buf.get_unchecked(4..12));
553            let lsn = xor_lsn ^ 0x7FFF_FFFF_FFFF_FFFF;
554
555            let xor_max_stable_lsn = arr_to_lsn(buf.get_unchecked(12..20));
556            let max_stable_lsn = xor_max_stable_lsn ^ 0x7FFF_FFFF_FFFF_FFFF;
557
558            let crc32_tested = crc32(&buf[4..20]);
559
560            let ok = crc32_tested == crc32_header;
561
562            if !ok {
563                debug!(
564                    "segment with lsn {} had computed crc {}, \
565                     but stored crc {}",
566                    lsn, crc32_tested, crc32_header
567                );
568            }
569
570            Self { lsn, max_stable_lsn, ok }
571        }
572    }
573}
574
575impl Into<[u8; SEG_HEADER_LEN]> for SegmentHeader {
576    fn into(self) -> [u8; SEG_HEADER_LEN] {
577        let mut buf = [0; SEG_HEADER_LEN];
578
579        let xor_lsn = self.lsn ^ 0x7FFF_FFFF_FFFF_FFFF;
580        let lsn_arr = lsn_to_arr(xor_lsn);
581
582        let xor_max_stable_lsn = self.max_stable_lsn ^ 0x7FFF_FFFF_FFFF_FFFF;
583        let highest_stable_lsn_arr = lsn_to_arr(xor_max_stable_lsn);
584
585        #[allow(unsafe_code)]
586        unsafe {
587            std::ptr::copy_nonoverlapping(
588                lsn_arr.as_ptr(),
589                buf.as_mut_ptr().add(4),
590                std::mem::size_of::<u64>(),
591            );
592            std::ptr::copy_nonoverlapping(
593                highest_stable_lsn_arr.as_ptr(),
594                buf.as_mut_ptr().add(12),
595                std::mem::size_of::<u64>(),
596            );
597        }
598
599        let crc32 = u32_to_arr(crc32(&buf[4..20]) ^ 0xFFFF_FFFF);
600
601        #[allow(unsafe_code)]
602        unsafe {
603            std::ptr::copy_nonoverlapping(
604                crc32.as_ptr(),
605                buf.as_mut_ptr(),
606                std::mem::size_of::<u32>(),
607            );
608        }
609
610        buf
611    }
612}
613
614pub(crate) fn read_segment_header(
615    file: &File,
616    lid: LogOffset,
617) -> Result<SegmentHeader> {
618    trace!("reading segment header at {}", lid);
619
620    let mut seg_header_buf = [0; SEG_HEADER_LEN];
621    pread_exact(file, &mut seg_header_buf, lid)?;
622    let segment_header = SegmentHeader::from(seg_header_buf);
623
624    if segment_header.lsn < Lsn::try_from(lid).unwrap() {
625        debug!(
626            "segment had lsn {} but we expected something \
627             greater, as the base lid is {}",
628            segment_header.lsn, lid
629        );
630    }
631
632    Ok(segment_header)
633}
634
635pub(crate) trait ReadAt {
636    fn pread_exact(&self, dst: &mut [u8], at: u64) -> std::io::Result<()>;
637
638    fn pread_exact_or_eof(
639        &self,
640        dst: &mut [u8],
641        at: u64,
642    ) -> std::io::Result<usize>;
643}
644
645impl ReadAt for File {
646    fn pread_exact(&self, dst: &mut [u8], at: u64) -> std::io::Result<()> {
647        pread_exact(self, dst, at)
648    }
649
650    fn pread_exact_or_eof(
651        &self,
652        dst: &mut [u8],
653        at: u64,
654    ) -> std::io::Result<usize> {
655        pread_exact_or_eof(self, dst, at)
656    }
657}
658
659impl ReadAt for BasedBuf {
660    fn pread_exact(&self, dst: &mut [u8], mut at: u64) -> std::io::Result<()> {
661        if at < self.offset
662            || u64::try_from(dst.len()).unwrap() + at
663                > u64::try_from(self.buf.len()).unwrap() + self.offset
664        {
665            return Err(std::io::Error::new(
666                std::io::ErrorKind::UnexpectedEof,
667                "failed to fill buffer",
668            ));
669        }
670        at -= self.offset;
671        let at_usize = usize::try_from(at).unwrap();
672        let to_usize = at_usize + dst.len();
673        dst.copy_from_slice(self.buf[at_usize..to_usize].as_ref());
674        Ok(())
675    }
676
677    fn pread_exact_or_eof(
678        &self,
679        dst: &mut [u8],
680        mut at: u64,
681    ) -> std::io::Result<usize> {
682        if at < self.offset
683            || u64::try_from(self.buf.len()).unwrap() < at - self.offset
684        {
685            return Err(std::io::Error::new(
686                std::io::ErrorKind::UnexpectedEof,
687                "failed to fill buffer",
688            ));
689        }
690        at -= self.offset;
691
692        let at_usize = usize::try_from(at).unwrap();
693
694        let len = std::cmp::min(dst.len(), self.buf.len() - at_usize);
695
696        let start = at_usize;
697        let end = start + len;
698        dst[..len].copy_from_slice(self.buf[start..end].as_ref());
699        Ok(len)
700    }
701}
702
703/// read a buffer from the disk
704pub(crate) fn read_message<R: ReadAt>(
705    file: &R,
706    lid: LogOffset,
707    expected_segment_number: SegmentNumber,
708    config: &Config,
709) -> Result<LogRead> {
710    let _measure = Measure::new(&M.read);
711    let segment_len = config.segment_size;
712    let seg_start = lid / segment_len as LogOffset * segment_len as LogOffset;
713    trace!("reading message from segment: {} at lid: {}", seg_start, lid);
714    assert!(seg_start + SEG_HEADER_LEN as LogOffset <= lid);
715    assert!(
716        (seg_start + segment_len as LogOffset) - lid
717            >= MAX_MSG_HEADER_LEN as LogOffset,
718        "tried to read a message from the red zone"
719    );
720
721    let msg_header_buf = &mut [0; 128];
722    let _read_bytes = file.pread_exact_or_eof(msg_header_buf, lid)?;
723    let header_cursor = &mut msg_header_buf.as_ref();
724    let len_before = header_cursor.len();
725    let header = MessageHeader::deserialize(header_cursor)?;
726    let len_after = header_cursor.len();
727    trace!("read message header at lid {}: {:?}", lid, header);
728    let message_offset = len_before - len_after;
729
730    let ceiling = seg_start + segment_len as LogOffset;
731
732    assert!(lid + message_offset as LogOffset <= ceiling);
733
734    let max_possible_len =
735        assert_usize(ceiling - lid - message_offset as LogOffset);
736
737    if header.len > max_possible_len as u64 {
738        trace!(
739            "read a corrupted message with impossibly long length {:?}",
740            header
741        );
742        return Ok(LogRead::Corrupted);
743    }
744
745    let header_len = usize::try_from(header.len).unwrap();
746
747    if header.kind == MessageKind::Corrupted {
748        trace!(
749            "read a corrupted message with Corrupted MessageKind: {:?}",
750            header
751        );
752        return Ok(LogRead::Corrupted);
753    }
754
755    // perform crc check on everything that isn't Corrupted
756    let mut buf = vec![0; header_len];
757
758    if header_len > len_after {
759        // we have to read more data from disk
760        file.pread_exact(&mut buf, lid + message_offset as LogOffset)?;
761    } else {
762        // we already read this data in the initial read
763        buf.copy_from_slice(header_cursor[..header_len].as_ref());
764    }
765
766    let crc32 = calculate_message_crc32(
767        msg_header_buf[..message_offset].as_ref(),
768        &buf,
769    );
770
771    if crc32 != header.crc32 {
772        trace!("read a message with a bad checksum with header {:?}", header);
773        return Ok(LogRead::Corrupted);
774    }
775
776    let inline_len = u32::try_from(message_offset).unwrap()
777        + u32::try_from(header.len).unwrap();
778
779    if header.segment_number != expected_segment_number {
780        debug!(
781            "header {:?} does not contain expected segment_number {:?}",
782            header, expected_segment_number
783        );
784        return Ok(LogRead::Corrupted);
785    }
786
787    match header.kind {
788        MessageKind::Canceled => {
789            trace!("read failed of len {}", header.len);
790            Ok(LogRead::Canceled(inline_len))
791        }
792        MessageKind::Cap => {
793            trace!("read pad in segment number {:?}", header.segment_number);
794            Ok(LogRead::Cap(header.segment_number))
795        }
796        MessageKind::BlobLink
797        | MessageKind::BlobNode
798        | MessageKind::BlobMeta => {
799            let id = arr_to_lsn(&buf);
800
801            match read_blob(id, config) {
802                Ok((kind, buf)) => {
803                    assert_eq!(header.kind, kind);
804                    trace!(
805                        "read a successful blob message for blob {} in segment number {:?}",
806                        id,
807                        header.segment_number,
808                    );
809
810                    Ok(LogRead::Blob(header, buf, id, inline_len))
811                }
812                Err(Error::Io(ref e))
813                    if e.kind() == std::io::ErrorKind::NotFound =>
814                {
815                    debug!(
816                        "underlying blob file not found for blob {} in segment number {:?}",
817                        id, header.segment_number,
818                    );
819                    Ok(LogRead::DanglingBlob(header, id, inline_len))
820                }
821                Err(other_e) => {
822                    debug!("failed to read blob: {:?}", other_e);
823                    Err(other_e)
824                }
825            }
826        }
827        MessageKind::InlineLink
828        | MessageKind::InlineNode
829        | MessageKind::InlineMeta
830        | MessageKind::Free
831        | MessageKind::Counter => {
832            trace!("read a successful inline message");
833            let buf = if config.use_compression {
834                maybe_decompress(buf)?
835            } else {
836                buf
837            };
838
839            Ok(LogRead::Inline(header, buf, inline_len))
840        }
841        MessageKind::BatchManifest => {
842            assert_eq!(buf.len(), std::mem::size_of::<Lsn>());
843            let max_lsn = arr_to_lsn(&buf);
844            Ok(LogRead::BatchManifest(max_lsn, inline_len))
845        }
846        MessageKind::Corrupted => unreachable!(
847            "corrupted should have been handled \
848             before reading message length above"
849        ),
850    }
851}