sled/pagecache/
reservation.rs1use crate::{pagecache::*, *};
2
3#[derive(Debug)]
8pub struct Reservation<'a> {
9 pub(super) log: &'a Log,
10 pub(super) iobuf: Arc<IoBuf>,
11 pub(super) buf: &'a mut [u8],
12 pub(super) flushed: bool,
13 pub(super) pointer: DiskPtr,
14 pub(super) lsn: Lsn,
15 pub(super) is_blob_rewrite: bool,
16 pub(super) header_len: usize,
17}
18
19impl<'a> Drop for Reservation<'a> {
20 fn drop(&mut self) {
21 if !self.flushed {
23 if let Err(e) = self.flush(false) {
24 self.log.config.set_global_error(e);
25 }
26 }
27 }
28}
29
30impl<'a> Reservation<'a> {
31 pub fn abort(mut self) -> Result<(Lsn, DiskPtr)> {
34 if self.pointer.is_blob() && !self.is_blob_rewrite {
35 trace!(
39 "removing blob for aborted reservation at lsn {}",
40 self.pointer
41 );
42
43 remove_blob(self.pointer.blob().1, &self.log.config)?;
44 }
45
46 self.flush(false)
47 }
48
49 pub fn complete(mut self) -> Result<(Lsn, DiskPtr)> {
52 self.flush(true)
53 }
54
55 pub const fn lsn(&self) -> Lsn {
57 self.lsn
58 }
59
60 pub const fn pointer(&self) -> DiskPtr {
64 self.pointer
65 }
66
67 pub(crate) fn reservation_len(&self) -> usize {
69 self.buf.len()
70 }
71
72 #[doc(hidden)]
82 pub fn mark_writebatch(self, peg_lsn: Lsn) -> Result<(Lsn, DiskPtr)> {
83 trace!(
84 "writing batch required stable lsn {} into \
85 BatchManifest at lid {} peg_lsn {}",
86 peg_lsn,
87 self.pointer.lid(),
88 self.lsn
89 );
90
91 if self.lsn == peg_lsn {
92 self.abort()
95 } else {
96 self.buf[4] = MessageKind::BatchManifest.into();
97
98 let buf = lsn_to_arr(peg_lsn);
99
100 let dst = &mut self.buf[self.header_len..];
101
102 dst.copy_from_slice(&buf);
103
104 let mut intervals = self.log.iobufs.intervals.lock();
105 intervals.mark_batch((self.lsn, peg_lsn));
106 drop(intervals);
107
108 self.complete()
109 }
110 }
111
112 fn flush(&mut self, valid: bool) -> Result<(Lsn, DiskPtr)> {
113 if self.flushed {
114 panic!("flushing already-flushed reservation!");
115 }
116
117 self.flushed = true;
118
119 if !valid {
120 self.buf[4] = MessageKind::Canceled.into();
123 }
124
125 let crc32 = calculate_message_crc32(
126 self.buf[..self.header_len].as_ref(),
127 &self.buf[self.header_len..],
128 );
129 let crc32_arr = u32_to_arr(crc32);
130
131 #[allow(unsafe_code)]
132 unsafe {
133 std::ptr::copy_nonoverlapping(
134 crc32_arr.as_ptr(),
135 self.buf.as_mut_ptr(),
136 std::mem::size_of::<u32>(),
137 );
138 }
139 self.log.exit_reservation(&self.iobuf)?;
140
141 Ok((self.lsn(), self.pointer()))
142 }
143}