sled/pagecache/
blob_io.rs

1use crate::pagecache::*;
2use crate::*;
3
4pub(crate) fn read_blob(
5    blob_ptr: Lsn,
6    config: &Config,
7) -> Result<(MessageKind, Vec<u8>)> {
8    let path = config.blob_path(blob_ptr);
9    let f_res = std::fs::OpenOptions::new().read(true).open(&path);
10
11    if let Err(e) = &f_res {
12        debug!("failed to open file for blob read at {}: {:?}", blob_ptr, e);
13    }
14
15    let mut f = f_res?;
16
17    let mut crc_expected_bytes = [0_u8; std::mem::size_of::<u32>()];
18
19    if let Err(e) = f.read_exact(&mut crc_expected_bytes) {
20        debug!(
21            "failed to read the initial CRC bytes in the blob at {}: {:?}",
22            blob_ptr, e,
23        );
24        return Err(e.into());
25    }
26
27    let crc_expected = arr_to_u32(&crc_expected_bytes);
28
29    let mut kind_byte = [0_u8];
30
31    if let Err(e) = f.read_exact(&mut kind_byte) {
32        debug!(
33            "failed to read the initial CRC bytes in the blob at {}: {:?}",
34            blob_ptr, e,
35        );
36        return Err(e.into());
37    }
38
39    let mut buf = vec![];
40
41    if let Err(e) = f.read_to_end(&mut buf) {
42        debug!(
43            "failed to read data after the CRC bytes in blob at {}: {:?}",
44            blob_ptr, e,
45        );
46        return Err(e.into());
47    }
48
49    let mut hasher = crc32fast::Hasher::new();
50    hasher.update(&kind_byte);
51    hasher.update(&buf);
52    let crc_actual = hasher.finalize();
53
54    if crc_expected == crc_actual {
55        let buf =
56            if config.use_compression { maybe_decompress(buf)? } else { buf };
57        Ok((MessageKind::from(kind_byte[0]), buf))
58    } else {
59        warn!("blob {} failed crc check!", blob_ptr);
60
61        Err(Error::corruption(Some(DiskPtr::Blob(0, blob_ptr))))
62    }
63}
64
65pub(crate) fn write_blob<T: Serialize>(
66    config: &Config,
67    kind: MessageKind,
68    id: Lsn,
69    item: &T,
70) -> Result<()> {
71    let path = config.blob_path(id);
72    let mut f =
73        std::fs::OpenOptions::new().write(true).create_new(true).open(&path)?;
74
75    let kind_buf = &[kind.into()];
76
77    let mut hasher = crc32fast::Hasher::new();
78    hasher.update(kind_buf);
79
80    let data = {
81        let _ = Measure::new(&M.serialize);
82        item.serialize()
83    };
84
85    hasher.update(&data);
86    let crc = u32_to_arr(hasher.finalize());
87
88    io_fail!(config, "write_blob write crc");
89    f.write_all(&crc)?;
90    io_fail!(config, "write_blob write kind_byte");
91    f.write_all(kind_buf)?;
92    io_fail!(config, "write_blob write buf");
93    f.write_all(&data)
94        .map(|r| {
95            trace!("successfully wrote blob at {:?}", path);
96            r
97        })
98        .map_err(|e| e.into())
99}
100
101pub(crate) fn gc_blobs(config: &Config, stable_lsn: Lsn) -> Result<()> {
102    let mut base_dir = config.get_path();
103    base_dir.push("blobs");
104    let blob_dir = base_dir;
105    let blobs = std::fs::read_dir(blob_dir)?;
106
107    debug!("gc_blobs removing any blob with an lsn above {}", stable_lsn);
108
109    let mut to_remove = vec![];
110    for blob in blobs {
111        let path = blob?.path();
112        let lsn_str = path.file_name().unwrap().to_str().unwrap();
113        let lsn_res: std::result::Result<Lsn, _> = lsn_str.parse();
114
115        if let Err(e) = lsn_res {
116            warn!(
117                "blobs directory contains \
118                 unparsable path ({:?}): {}",
119                path, e
120            );
121            continue;
122        }
123
124        let lsn = lsn_res.unwrap();
125
126        if lsn >= stable_lsn {
127            to_remove.push(path);
128        }
129    }
130
131    if !to_remove.is_empty() {
132        warn!(
133            "removing {} blobs that have \
134             a higher lsn than our stable log: {:?}",
135            to_remove.len(),
136            stable_lsn
137        );
138    }
139
140    for path in to_remove {
141        std::fs::remove_file(&path)?;
142    }
143
144    Ok(())
145}
146
147pub(crate) fn remove_blob(id: Lsn, config: &Config) -> Result<()> {
148    let path = config.blob_path(id);
149    if let Err(e) = std::fs::remove_file(&path) {
150        debug!("removing blob at {:?} failed: {}", path, e);
151    } else {
152        trace!("successfully removed blob at {:?}", path);
153    }
154
155    // TODO return a future
156    Ok(())
157}