sled/pagecache/
reservation.rs

1use crate::{pagecache::*, *};
2
3/// A pending log reservation which can be aborted or completed.
4/// NB the holder should quickly call `complete` or `abort` as
5/// taking too long to decide will cause the underlying IO
6/// buffer to become blocked.
7#[derive(Debug)]
8pub struct Reservation<'a> {
9    pub(super) log: &'a Log,
10    pub(super) iobuf: Arc<IoBuf>,
11    pub(super) buf: &'a mut [u8],
12    pub(super) flushed: bool,
13    pub(super) pointer: DiskPtr,
14    pub(super) lsn: Lsn,
15    pub(super) is_blob_rewrite: bool,
16    pub(super) header_len: usize,
17}
18
19impl<'a> Drop for Reservation<'a> {
20    fn drop(&mut self) {
21        // We auto-abort if the user never uses a reservation.
22        if !self.flushed {
23            if let Err(e) = self.flush(false) {
24                self.log.config.set_global_error(e);
25            }
26        }
27    }
28}
29
30impl<'a> Reservation<'a> {
31    /// Cancel the reservation, placing a failed flush on disk, returning
32    /// the (cancelled) log sequence number and file offset.
33    pub fn abort(mut self) -> Result<(Lsn, DiskPtr)> {
34        if self.pointer.is_blob() && !self.is_blob_rewrite {
35            // we don't want to remove this blob if something
36            // else may still be using it.
37
38            trace!(
39                "removing blob for aborted reservation at lsn {}",
40                self.pointer
41            );
42
43            remove_blob(self.pointer.blob().1, &self.log.config)?;
44        }
45
46        self.flush(false)
47    }
48
49    /// Complete the reservation, placing the buffer on disk. returns
50    /// the log sequence number of the write, and the file offset.
51    pub fn complete(mut self) -> Result<(Lsn, DiskPtr)> {
52        self.flush(true)
53    }
54
55    /// Get the log sequence number for this update.
56    pub const fn lsn(&self) -> Lsn {
57        self.lsn
58    }
59
60    /// Get the underlying storage location for the written value.
61    /// Note that an blob write still has a pointer in the
62    /// log at the provided lid location.
63    pub const fn pointer(&self) -> DiskPtr {
64        self.pointer
65    }
66
67    /// Returns the length of the on-log reservation.
68    pub(crate) fn reservation_len(&self) -> usize {
69        self.buf.len()
70    }
71
72    /// Refills the reservation buffer with new data.
73    /// Must supply a buffer of an identical length
74    /// as the one initially provided. Don't use this
75    /// on messages subject to compression etc...
76    ///
77    /// # Panics
78    ///
79    /// Will panic if the reservation is not the correct
80    /// size to hold a serialized Lsn.
81    #[doc(hidden)]
82    pub fn mark_writebatch(self, peg_lsn: Lsn) -> Result<(Lsn, DiskPtr)> {
83        trace!(
84            "writing batch required stable lsn {} into \
85             BatchManifest at lid {} peg_lsn {}",
86            peg_lsn,
87            self.pointer.lid(),
88            self.lsn
89        );
90
91        if self.lsn == peg_lsn {
92            // this can happen because high-level tree updates
93            // may result in no work happening.
94            self.abort()
95        } else {
96            self.buf[4] = MessageKind::BatchManifest.into();
97
98            let buf = lsn_to_arr(peg_lsn);
99
100            let dst = &mut self.buf[self.header_len..];
101
102            dst.copy_from_slice(&buf);
103
104            let mut intervals = self.log.iobufs.intervals.lock();
105            intervals.mark_batch((self.lsn, peg_lsn));
106            drop(intervals);
107
108            self.complete()
109        }
110    }
111
112    fn flush(&mut self, valid: bool) -> Result<(Lsn, DiskPtr)> {
113        if self.flushed {
114            panic!("flushing already-flushed reservation!");
115        }
116
117        self.flushed = true;
118
119        if !valid {
120            // don't actually zero the message, still check its hash
121            // on recovery to find corruption.
122            self.buf[4] = MessageKind::Canceled.into();
123        }
124
125        let crc32 = calculate_message_crc32(
126            self.buf[..self.header_len].as_ref(),
127            &self.buf[self.header_len..],
128        );
129        let crc32_arr = u32_to_arr(crc32);
130
131        #[allow(unsafe_code)]
132        unsafe {
133            std::ptr::copy_nonoverlapping(
134                crc32_arr.as_ptr(),
135                self.buf.as_mut_ptr(),
136                std::mem::size_of::<u32>(),
137            );
138        }
139        self.log.exit_reservation(&self.iobuf)?;
140
141        Ok((self.lsn(), self.pointer()))
142    }
143}