sled/pagecache/
blob_io.rs1use crate::pagecache::*;
2use crate::*;
3
4pub(crate) fn read_blob(
5 blob_ptr: Lsn,
6 config: &Config,
7) -> Result<(MessageKind, Vec<u8>)> {
8 let path = config.blob_path(blob_ptr);
9 let f_res = std::fs::OpenOptions::new().read(true).open(&path);
10
11 if let Err(e) = &f_res {
12 debug!("failed to open file for blob read at {}: {:?}", blob_ptr, e);
13 }
14
15 let mut f = f_res?;
16
17 let mut crc_expected_bytes = [0_u8; std::mem::size_of::<u32>()];
18
19 if let Err(e) = f.read_exact(&mut crc_expected_bytes) {
20 debug!(
21 "failed to read the initial CRC bytes in the blob at {}: {:?}",
22 blob_ptr, e,
23 );
24 return Err(e.into());
25 }
26
27 let crc_expected = arr_to_u32(&crc_expected_bytes);
28
29 let mut kind_byte = [0_u8];
30
31 if let Err(e) = f.read_exact(&mut kind_byte) {
32 debug!(
33 "failed to read the initial CRC bytes in the blob at {}: {:?}",
34 blob_ptr, e,
35 );
36 return Err(e.into());
37 }
38
39 let mut buf = vec![];
40
41 if let Err(e) = f.read_to_end(&mut buf) {
42 debug!(
43 "failed to read data after the CRC bytes in blob at {}: {:?}",
44 blob_ptr, e,
45 );
46 return Err(e.into());
47 }
48
49 let mut hasher = crc32fast::Hasher::new();
50 hasher.update(&kind_byte);
51 hasher.update(&buf);
52 let crc_actual = hasher.finalize();
53
54 if crc_expected == crc_actual {
55 let buf =
56 if config.use_compression { maybe_decompress(buf)? } else { buf };
57 Ok((MessageKind::from(kind_byte[0]), buf))
58 } else {
59 warn!("blob {} failed crc check!", blob_ptr);
60
61 Err(Error::corruption(Some(DiskPtr::Blob(0, blob_ptr))))
62 }
63}
64
65pub(crate) fn write_blob<T: Serialize>(
66 config: &Config,
67 kind: MessageKind,
68 id: Lsn,
69 item: &T,
70) -> Result<()> {
71 let path = config.blob_path(id);
72 let mut f =
73 std::fs::OpenOptions::new().write(true).create_new(true).open(&path)?;
74
75 let kind_buf = &[kind.into()];
76
77 let mut hasher = crc32fast::Hasher::new();
78 hasher.update(kind_buf);
79
80 let data = {
81 let _ = Measure::new(&M.serialize);
82 item.serialize()
83 };
84
85 hasher.update(&data);
86 let crc = u32_to_arr(hasher.finalize());
87
88 io_fail!(config, "write_blob write crc");
89 f.write_all(&crc)?;
90 io_fail!(config, "write_blob write kind_byte");
91 f.write_all(kind_buf)?;
92 io_fail!(config, "write_blob write buf");
93 f.write_all(&data)
94 .map(|r| {
95 trace!("successfully wrote blob at {:?}", path);
96 r
97 })
98 .map_err(|e| e.into())
99}
100
101pub(crate) fn gc_blobs(config: &Config, stable_lsn: Lsn) -> Result<()> {
102 let mut base_dir = config.get_path();
103 base_dir.push("blobs");
104 let blob_dir = base_dir;
105 let blobs = std::fs::read_dir(blob_dir)?;
106
107 debug!("gc_blobs removing any blob with an lsn above {}", stable_lsn);
108
109 let mut to_remove = vec![];
110 for blob in blobs {
111 let path = blob?.path();
112 let lsn_str = path.file_name().unwrap().to_str().unwrap();
113 let lsn_res: std::result::Result<Lsn, _> = lsn_str.parse();
114
115 if let Err(e) = lsn_res {
116 warn!(
117 "blobs directory contains \
118 unparsable path ({:?}): {}",
119 path, e
120 );
121 continue;
122 }
123
124 let lsn = lsn_res.unwrap();
125
126 if lsn >= stable_lsn {
127 to_remove.push(path);
128 }
129 }
130
131 if !to_remove.is_empty() {
132 warn!(
133 "removing {} blobs that have \
134 a higher lsn than our stable log: {:?}",
135 to_remove.len(),
136 stable_lsn
137 );
138 }
139
140 for path in to_remove {
141 std::fs::remove_file(&path)?;
142 }
143
144 Ok(())
145}
146
147pub(crate) fn remove_blob(id: Lsn, config: &Config) -> Result<()> {
148 let path = config.blob_path(id);
149 if let Err(e) = std::fs::remove_file(&path) {
150 debug!("removing blob at {:?} failed: {}", path, e);
151 } else {
152 trace!("successfully removed blob at {:?}", path);
153 }
154
155 Ok(())
157}