1use std::fs::File;
2
3use super::{
4 arr_to_lsn, arr_to_u32, assert_usize, bump_atomic_lsn, header, iobuf,
5 lsn_to_arr, maybe_decompress, pread_exact, pread_exact_or_eof, read_blob,
6 roll_iobuf, u32_to_arr, Arc, BasedBuf, BlobPointer, DiskPtr, IoBuf, IoBufs,
7 LogKind, LogOffset, Lsn, MessageKind, Reservation, Serialize, Snapshot,
8 BATCH_MANIFEST_PID, COUNTER_PID, MAX_MSG_HEADER_LEN, META_PID,
9 MINIMUM_ITEMS_PER_SEGMENT, SEG_HEADER_LEN,
10};
11
12use crate::*;
13
14#[derive(Debug)]
19pub struct Log {
20 pub(crate) iobufs: Arc<IoBufs>,
22 pub(crate) config: RunningConfig,
23}
24
25impl Log {
26 pub fn start(config: RunningConfig, snapshot: &Snapshot) -> Result<Self> {
29 let iobufs = Arc::new(IoBufs::start(config.clone(), snapshot)?);
30
31 Ok(Self { iobufs, config })
32 }
33
34 pub fn flush(&self) -> Result<usize> {
37 iobuf::flush(&self.iobufs)
38 }
39
40 pub fn iter_from(&self, lsn: Lsn) -> super::LogIter {
43 self.iobufs.iter_from(lsn)
44 }
45
46 pub(crate) fn roll_iobuf(&self) -> Result<usize> {
47 roll_iobuf(&self.iobufs)
48 }
49
50 pub fn read(&self, pid: PageId, lsn: Lsn, ptr: DiskPtr) -> Result<LogRead> {
52 trace!("reading log lsn {} ptr {}", lsn, ptr);
53
54 let expected_segment_number = SegmentNumber(
55 u64::try_from(lsn).unwrap()
56 / u64::try_from(self.config.segment_size).unwrap(),
57 );
58
59 if ptr.is_inline() {
60 iobuf::make_durable(&self.iobufs, lsn)?;
61 let f = &self.config.file;
62 read_message(&**f, ptr.lid(), expected_segment_number, &self.config)
63 } else {
64 let (_, blob_ptr) = ptr.blob();
68 iobuf::make_durable(&self.iobufs, blob_ptr)?;
69 read_blob(blob_ptr, &self.config).map(|(kind, buf)| {
70 let header = MessageHeader {
71 kind,
72 pid,
73 segment_number: expected_segment_number,
74 crc32: 0,
75 len: 0,
76 };
77 LogRead::Blob(header, buf, blob_ptr, 0)
78 })
79 }
80 }
81
82 pub fn stable_offset(&self) -> Lsn {
84 self.iobufs.stable()
85 }
86
87 pub fn make_stable(&self, lsn: Lsn) -> Result<usize> {
93 iobuf::make_stable(&self.iobufs, lsn)
94 }
95
96 pub(super) fn rewrite_blob_pointer(
100 &self,
101 pid: PageId,
102 blob_pointer: BlobPointer,
103 guard: &Guard,
104 ) -> Result<Reservation<'_>> {
105 self.reserve_inner(
106 LogKind::Replace,
107 pid,
108 &blob_pointer,
109 Some(blob_pointer),
110 guard,
111 )
112 }
113
114 #[allow(unused)]
120 pub fn reserve<T: Serialize + Debug>(
121 &self,
122 log_kind: LogKind,
123 pid: PageId,
124 item: &T,
125 guard: &Guard,
126 ) -> Result<Reservation<'_>> {
127 #[cfg(feature = "compression")]
128 {
129 if self.config.use_compression && pid != BATCH_MANIFEST_PID {
130 use zstd::block::compress;
131
132 let buf = item.serialize();
133
134 let _measure = Measure::new(&M.compress);
135
136 let compressed_buf =
137 compress(&buf, self.config.compression_factor).unwrap();
138
139 return self.reserve_inner(
140 log_kind,
141 pid,
142 &IVec::from(compressed_buf),
143 None,
144 guard,
145 );
146 }
147 }
148
149 self.reserve_inner(log_kind, pid, item, None, guard)
150 }
151
152 fn reserve_inner<T: Serialize + Debug>(
153 &self,
154 log_kind: LogKind,
155 pid: PageId,
156 item: &T,
157 blob_rewrite: Option<Lsn>,
158 _: &Guard,
159 ) -> Result<Reservation<'_>> {
160 let _measure = Measure::new(&M.reserve_lat);
161
162 let serialized_len = item.serialized_size();
163 let max_buf_len =
164 u64::try_from(MAX_MSG_HEADER_LEN).unwrap() + serialized_len;
165
166 M.reserve_sz.measure(max_buf_len);
167
168 let max_buf_size = (self.config.segment_size
169 / MINIMUM_ITEMS_PER_SEGMENT)
170 - SEG_HEADER_LEN;
171
172 let over_blob_threshold =
173 max_buf_len > u64::try_from(max_buf_size).unwrap();
174
175 assert!(!(over_blob_threshold && blob_rewrite.is_some()));
176
177 let mut printed = false;
178 macro_rules! trace_once {
179 ($($msg:expr),*) => {
180 if !printed {
181 trace!($($msg),*);
182 printed = true;
183 }
184 };
185 }
186
187 let backoff = Backoff::new();
188
189 let kind = match (
190 pid,
191 log_kind,
192 over_blob_threshold || blob_rewrite.is_some(),
193 ) {
194 (COUNTER_PID, LogKind::Replace, false) => MessageKind::Counter,
195 (META_PID, LogKind::Replace, true) => MessageKind::BlobMeta,
196 (META_PID, LogKind::Replace, false) => MessageKind::InlineMeta,
197 (BATCH_MANIFEST_PID, LogKind::Skip, false) => {
198 MessageKind::BatchManifest
199 }
200 (_, LogKind::Free, false) => MessageKind::Free,
201 (_, LogKind::Replace, true) => MessageKind::BlobNode,
202 (_, LogKind::Replace, false) => MessageKind::InlineNode,
203 (_, LogKind::Link, true) => MessageKind::BlobLink,
204 (_, LogKind::Link, false) => MessageKind::InlineLink,
205 other => unreachable!(
206 "unexpected combination of PageId, \
207 LogKind, and blob status: {:?}",
208 other
209 ),
210 };
211
212 loop {
213 M.log_reservation_attempted();
214
215 if let Err(e) = self.config.global_error() {
218 let intervals = self.iobufs.intervals.lock();
219
220 drop(intervals);
223
224 let _notified = self.iobufs.interval_updated.notify_all();
225 return Err(e);
226 }
227
228 let iobuf = self.iobufs.current_iobuf();
230 let header = iobuf.get_header();
231 let buf_offset = header::offset(header);
232 let reservation_lsn =
233 iobuf.lsn + Lsn::try_from(buf_offset).unwrap();
234
235 if header::is_sealed(header) {
237 trace_once!("io buffer already sealed, spinning");
240
241 backoff.snooze();
242
243 continue;
244 }
245
246 let message_header = MessageHeader {
250 crc32: 0,
251 kind,
252 segment_number: SegmentNumber(
253 u64::try_from(iobuf.lsn).unwrap()
254 / u64::try_from(self.config.segment_size).unwrap(),
255 ),
256 pid,
257 len: if over_blob_threshold {
258 reservation_lsn.serialized_size()
259 } else {
260 serialized_len
261 },
262 };
263
264 let inline_buf_len = if over_blob_threshold {
265 usize::try_from(
266 message_header.serialized_size()
267 + reservation_lsn.serialized_size(),
268 )
269 .unwrap()
270 } else {
271 usize::try_from(
272 message_header.serialized_size() + serialized_len,
273 )
274 .unwrap()
275 };
276
277 trace!("reserving buf of len {}", inline_buf_len);
278
279 let prospective_size = buf_offset + inline_buf_len;
281 let red_zone = iobuf.capacity - buf_offset < MAX_MSG_HEADER_LEN;
286 let would_overflow = prospective_size > iobuf.capacity || red_zone;
287 if would_overflow {
288 trace_once!("io buffer too full, spinning");
292 iobuf::maybe_seal_and_write_iobuf(
293 &self.iobufs,
294 &iobuf,
295 header,
296 true,
297 )?;
298 backoff.spin();
299 continue;
300 }
301
302 let bumped_offset = header::bump_offset(header, inline_buf_len);
304
305 if header::n_writers(bumped_offset) == header::MAX_WRITERS {
307 trace_once!(
308 "spinning because our buffer has {} writers already",
309 header::MAX_WRITERS
310 );
311 backoff.snooze();
312 continue;
313 }
314
315 let claimed = header::incr_writers(bumped_offset);
316
317 if iobuf.cas_header(header, claimed).is_err() {
318 trace_once!("CAS failed while claiming buffer slot, spinning");
320 backoff.spin();
321 continue;
322 }
323
324 let log_offset = iobuf.offset;
325
326 assert_ne!(header::n_writers(claimed), 0);
329
330 assert!(!header::is_sealed(claimed));
332
333 assert_ne!(
339 log_offset,
340 LogOffset::max_value(),
341 "fucked up on iobuf with lsn {}\n{:?}",
342 reservation_lsn,
343 self
344 );
345
346 let destination = iobuf.get_mut_range(buf_offset, inline_buf_len);
347 let reservation_lid = log_offset + buf_offset as LogOffset;
348
349 trace!(
350 "reserved {} bytes at lsn {} lid {}",
351 inline_buf_len,
352 reservation_lsn,
353 reservation_lid,
354 );
355
356 bump_atomic_lsn(
357 &self.iobufs.max_reserved_lsn,
358 reservation_lsn + inline_buf_len as Lsn - 1,
359 );
360
361 let blob_id =
362 if over_blob_threshold { Some(reservation_lsn) } else { None };
363
364 self.iobufs.encapsulate(
365 item,
366 message_header,
367 destination,
368 blob_id,
369 )?;
370
371 M.log_reservation_success();
372
373 let pointer = if let Some(blob_id) = blob_id {
374 DiskPtr::new_blob(reservation_lid, blob_id)
375 } else if let Some(blob_rewrite) = blob_rewrite {
376 DiskPtr::new_blob(reservation_lid, blob_rewrite)
377 } else {
378 DiskPtr::new_inline(reservation_lid)
379 };
380
381 return Ok(Reservation {
382 iobuf,
383 log: self,
384 buf: destination,
385 flushed: false,
386 lsn: reservation_lsn,
387 pointer,
388 is_blob_rewrite: blob_rewrite.is_some(),
389 header_len: usize::try_from(message_header.serialized_size())
390 .unwrap(),
391 });
392 }
393 }
394
395 pub(super) fn exit_reservation(&self, iobuf: &Arc<IoBuf>) -> Result<()> {
399 let mut header = iobuf.get_header();
400
401 loop {
403 let new_hv = header::decr_writers(header);
404 match iobuf.cas_header(header, new_hv) {
405 Ok(new) => {
406 header = new;
407 break;
408 }
409 Err(new) => {
410 header = new;
412 }
413 }
414 }
415
416 if header::n_writers(header) == 0 && header::is_sealed(header) {
419 if let Err(e) = self.config.global_error() {
420 let intervals = self.iobufs.intervals.lock();
421
422 drop(intervals);
425
426 let _notified = self.iobufs.interval_updated.notify_all();
427 return Err(e);
428 }
429
430 let lsn = iobuf.lsn;
431 trace!(
432 "asynchronously writing iobuf with lsn {} \
433 to log from exit_reservation",
434 lsn
435 );
436 let iobufs = self.iobufs.clone();
437 let iobuf = iobuf.clone();
438 threadpool::spawn(move || {
439 if let Err(e) = iobufs.write_to_log(&iobuf) {
440 error!(
441 "hit error while writing iobuf with lsn {}: {:?}",
442 lsn, e
443 );
444 iobufs.config.set_global_error(e);
445 }
446 })?;
447
448 Ok(())
449 } else {
450 Ok(())
451 }
452 }
453}
454
455impl Drop for Log {
456 fn drop(&mut self) {
457 if self.config.global_error().is_err() {
459 return;
460 }
461
462 if let Err(e) = iobuf::flush(&self.iobufs) {
463 error!("failed to flush from IoBufs::drop: {}", e);
464 }
465
466 if !self.config.temporary {
467 self.config.file.sync_all().unwrap();
468 }
469
470 debug!("IoBufs dropped");
471 }
472}
473
474#[derive(Debug, Copy, Clone, PartialEq)]
476pub struct MessageHeader {
477 pub crc32: u32,
478 pub kind: MessageKind,
479 pub segment_number: SegmentNumber,
480 pub pid: PageId,
481 pub len: u64,
482}
483
484#[derive(Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Debug)]
486#[repr(transparent)]
487pub struct SegmentNumber(pub u64);
488
489impl std::ops::Deref for SegmentNumber {
490 type Target = u64;
491
492 fn deref(&self) -> &u64 {
493 &self.0
494 }
495}
496
497#[derive(Debug, Copy, Clone, PartialEq)]
500pub struct SegmentHeader {
501 pub lsn: Lsn,
502 pub max_stable_lsn: Lsn,
503 pub ok: bool,
504}
505
506#[derive(Debug)]
508pub enum LogRead {
509 Inline(MessageHeader, Vec<u8>, u32),
511 Blob(MessageHeader, Vec<u8>, BlobPointer, u32),
513 Canceled(u32),
515 Cap(SegmentNumber),
517 Corrupted,
519 DanglingBlob(MessageHeader, BlobPointer, u32),
521 BatchManifest(Lsn, u32),
523}
524
525impl LogRead {
526 pub fn is_successful(&self) -> bool {
528 match *self {
529 LogRead::Inline(..) | LogRead::Blob(..) => true,
530 _ => false,
531 }
532 }
533
534 pub fn into_data(self) -> Option<Vec<u8>> {
536 match self {
537 LogRead::Blob(_, buf, _, _) | LogRead::Inline(_, buf, _) => {
538 Some(buf)
539 }
540 _ => None,
541 }
542 }
543}
544
545impl From<[u8; SEG_HEADER_LEN]> for SegmentHeader {
546 fn from(buf: [u8; SEG_HEADER_LEN]) -> Self {
547 #[allow(unsafe_code)]
548 unsafe {
549 let crc32_header =
550 arr_to_u32(buf.get_unchecked(0..4)) ^ 0xFFFF_FFFF;
551
552 let xor_lsn = arr_to_lsn(buf.get_unchecked(4..12));
553 let lsn = xor_lsn ^ 0x7FFF_FFFF_FFFF_FFFF;
554
555 let xor_max_stable_lsn = arr_to_lsn(buf.get_unchecked(12..20));
556 let max_stable_lsn = xor_max_stable_lsn ^ 0x7FFF_FFFF_FFFF_FFFF;
557
558 let crc32_tested = crc32(&buf[4..20]);
559
560 let ok = crc32_tested == crc32_header;
561
562 if !ok {
563 debug!(
564 "segment with lsn {} had computed crc {}, \
565 but stored crc {}",
566 lsn, crc32_tested, crc32_header
567 );
568 }
569
570 Self { lsn, max_stable_lsn, ok }
571 }
572 }
573}
574
575impl Into<[u8; SEG_HEADER_LEN]> for SegmentHeader {
576 fn into(self) -> [u8; SEG_HEADER_LEN] {
577 let mut buf = [0; SEG_HEADER_LEN];
578
579 let xor_lsn = self.lsn ^ 0x7FFF_FFFF_FFFF_FFFF;
580 let lsn_arr = lsn_to_arr(xor_lsn);
581
582 let xor_max_stable_lsn = self.max_stable_lsn ^ 0x7FFF_FFFF_FFFF_FFFF;
583 let highest_stable_lsn_arr = lsn_to_arr(xor_max_stable_lsn);
584
585 #[allow(unsafe_code)]
586 unsafe {
587 std::ptr::copy_nonoverlapping(
588 lsn_arr.as_ptr(),
589 buf.as_mut_ptr().add(4),
590 std::mem::size_of::<u64>(),
591 );
592 std::ptr::copy_nonoverlapping(
593 highest_stable_lsn_arr.as_ptr(),
594 buf.as_mut_ptr().add(12),
595 std::mem::size_of::<u64>(),
596 );
597 }
598
599 let crc32 = u32_to_arr(crc32(&buf[4..20]) ^ 0xFFFF_FFFF);
600
601 #[allow(unsafe_code)]
602 unsafe {
603 std::ptr::copy_nonoverlapping(
604 crc32.as_ptr(),
605 buf.as_mut_ptr(),
606 std::mem::size_of::<u32>(),
607 );
608 }
609
610 buf
611 }
612}
613
614pub(crate) fn read_segment_header(
615 file: &File,
616 lid: LogOffset,
617) -> Result<SegmentHeader> {
618 trace!("reading segment header at {}", lid);
619
620 let mut seg_header_buf = [0; SEG_HEADER_LEN];
621 pread_exact(file, &mut seg_header_buf, lid)?;
622 let segment_header = SegmentHeader::from(seg_header_buf);
623
624 if segment_header.lsn < Lsn::try_from(lid).unwrap() {
625 debug!(
626 "segment had lsn {} but we expected something \
627 greater, as the base lid is {}",
628 segment_header.lsn, lid
629 );
630 }
631
632 Ok(segment_header)
633}
634
635pub(crate) trait ReadAt {
636 fn pread_exact(&self, dst: &mut [u8], at: u64) -> std::io::Result<()>;
637
638 fn pread_exact_or_eof(
639 &self,
640 dst: &mut [u8],
641 at: u64,
642 ) -> std::io::Result<usize>;
643}
644
645impl ReadAt for File {
646 fn pread_exact(&self, dst: &mut [u8], at: u64) -> std::io::Result<()> {
647 pread_exact(self, dst, at)
648 }
649
650 fn pread_exact_or_eof(
651 &self,
652 dst: &mut [u8],
653 at: u64,
654 ) -> std::io::Result<usize> {
655 pread_exact_or_eof(self, dst, at)
656 }
657}
658
659impl ReadAt for BasedBuf {
660 fn pread_exact(&self, dst: &mut [u8], mut at: u64) -> std::io::Result<()> {
661 if at < self.offset
662 || u64::try_from(dst.len()).unwrap() + at
663 > u64::try_from(self.buf.len()).unwrap() + self.offset
664 {
665 return Err(std::io::Error::new(
666 std::io::ErrorKind::UnexpectedEof,
667 "failed to fill buffer",
668 ));
669 }
670 at -= self.offset;
671 let at_usize = usize::try_from(at).unwrap();
672 let to_usize = at_usize + dst.len();
673 dst.copy_from_slice(self.buf[at_usize..to_usize].as_ref());
674 Ok(())
675 }
676
677 fn pread_exact_or_eof(
678 &self,
679 dst: &mut [u8],
680 mut at: u64,
681 ) -> std::io::Result<usize> {
682 if at < self.offset
683 || u64::try_from(self.buf.len()).unwrap() < at - self.offset
684 {
685 return Err(std::io::Error::new(
686 std::io::ErrorKind::UnexpectedEof,
687 "failed to fill buffer",
688 ));
689 }
690 at -= self.offset;
691
692 let at_usize = usize::try_from(at).unwrap();
693
694 let len = std::cmp::min(dst.len(), self.buf.len() - at_usize);
695
696 let start = at_usize;
697 let end = start + len;
698 dst[..len].copy_from_slice(self.buf[start..end].as_ref());
699 Ok(len)
700 }
701}
702
703pub(crate) fn read_message<R: ReadAt>(
705 file: &R,
706 lid: LogOffset,
707 expected_segment_number: SegmentNumber,
708 config: &Config,
709) -> Result<LogRead> {
710 let _measure = Measure::new(&M.read);
711 let segment_len = config.segment_size;
712 let seg_start = lid / segment_len as LogOffset * segment_len as LogOffset;
713 trace!("reading message from segment: {} at lid: {}", seg_start, lid);
714 assert!(seg_start + SEG_HEADER_LEN as LogOffset <= lid);
715 assert!(
716 (seg_start + segment_len as LogOffset) - lid
717 >= MAX_MSG_HEADER_LEN as LogOffset,
718 "tried to read a message from the red zone"
719 );
720
721 let msg_header_buf = &mut [0; 128];
722 let _read_bytes = file.pread_exact_or_eof(msg_header_buf, lid)?;
723 let header_cursor = &mut msg_header_buf.as_ref();
724 let len_before = header_cursor.len();
725 let header = MessageHeader::deserialize(header_cursor)?;
726 let len_after = header_cursor.len();
727 trace!("read message header at lid {}: {:?}", lid, header);
728 let message_offset = len_before - len_after;
729
730 let ceiling = seg_start + segment_len as LogOffset;
731
732 assert!(lid + message_offset as LogOffset <= ceiling);
733
734 let max_possible_len =
735 assert_usize(ceiling - lid - message_offset as LogOffset);
736
737 if header.len > max_possible_len as u64 {
738 trace!(
739 "read a corrupted message with impossibly long length {:?}",
740 header
741 );
742 return Ok(LogRead::Corrupted);
743 }
744
745 let header_len = usize::try_from(header.len).unwrap();
746
747 if header.kind == MessageKind::Corrupted {
748 trace!(
749 "read a corrupted message with Corrupted MessageKind: {:?}",
750 header
751 );
752 return Ok(LogRead::Corrupted);
753 }
754
755 let mut buf = vec![0; header_len];
757
758 if header_len > len_after {
759 file.pread_exact(&mut buf, lid + message_offset as LogOffset)?;
761 } else {
762 buf.copy_from_slice(header_cursor[..header_len].as_ref());
764 }
765
766 let crc32 = calculate_message_crc32(
767 msg_header_buf[..message_offset].as_ref(),
768 &buf,
769 );
770
771 if crc32 != header.crc32 {
772 trace!("read a message with a bad checksum with header {:?}", header);
773 return Ok(LogRead::Corrupted);
774 }
775
776 let inline_len = u32::try_from(message_offset).unwrap()
777 + u32::try_from(header.len).unwrap();
778
779 if header.segment_number != expected_segment_number {
780 debug!(
781 "header {:?} does not contain expected segment_number {:?}",
782 header, expected_segment_number
783 );
784 return Ok(LogRead::Corrupted);
785 }
786
787 match header.kind {
788 MessageKind::Canceled => {
789 trace!("read failed of len {}", header.len);
790 Ok(LogRead::Canceled(inline_len))
791 }
792 MessageKind::Cap => {
793 trace!("read pad in segment number {:?}", header.segment_number);
794 Ok(LogRead::Cap(header.segment_number))
795 }
796 MessageKind::BlobLink
797 | MessageKind::BlobNode
798 | MessageKind::BlobMeta => {
799 let id = arr_to_lsn(&buf);
800
801 match read_blob(id, config) {
802 Ok((kind, buf)) => {
803 assert_eq!(header.kind, kind);
804 trace!(
805 "read a successful blob message for blob {} in segment number {:?}",
806 id,
807 header.segment_number,
808 );
809
810 Ok(LogRead::Blob(header, buf, id, inline_len))
811 }
812 Err(Error::Io(ref e))
813 if e.kind() == std::io::ErrorKind::NotFound =>
814 {
815 debug!(
816 "underlying blob file not found for blob {} in segment number {:?}",
817 id, header.segment_number,
818 );
819 Ok(LogRead::DanglingBlob(header, id, inline_len))
820 }
821 Err(other_e) => {
822 debug!("failed to read blob: {:?}", other_e);
823 Err(other_e)
824 }
825 }
826 }
827 MessageKind::InlineLink
828 | MessageKind::InlineNode
829 | MessageKind::InlineMeta
830 | MessageKind::Free
831 | MessageKind::Counter => {
832 trace!("read a successful inline message");
833 let buf = if config.use_compression {
834 maybe_decompress(buf)?
835 } else {
836 buf
837 };
838
839 Ok(LogRead::Inline(header, buf, inline_len))
840 }
841 MessageKind::BatchManifest => {
842 assert_eq!(buf.len(), std::mem::size_of::<Lsn>());
843 let max_lsn = arr_to_lsn(&buf);
844 Ok(LogRead::BatchManifest(max_lsn, inline_len))
845 }
846 MessageKind::Corrupted => unreachable!(
847 "corrupted should have been handled \
848 before reading message length above"
849 ),
850 }
851}