1use std::{collections::BTreeMap, io};
2
3use super::{
4 pread_exact_or_eof, read_message, read_segment_header, BasedBuf, DiskPtr,
5 LogKind, LogOffset, LogRead, Lsn, SegmentHeader, SegmentNumber,
6 MAX_MSG_HEADER_LEN, SEG_HEADER_LEN,
7};
8use crate::*;
9
10#[derive(Debug)]
11pub struct LogIter {
12 pub config: RunningConfig,
13 pub segments: BTreeMap<Lsn, LogOffset>,
14 pub segment_base: Option<BasedBuf>,
15 pub max_lsn: Option<Lsn>,
16 pub cur_lsn: Option<Lsn>,
17 pub last_stage: bool,
18}
19
20impl Iterator for LogIter {
21 type Item = (LogKind, PageId, Lsn, DiskPtr, u64);
22
23 fn next(&mut self) -> Option<Self::Item> {
24 loop {
28 let remaining_seg_too_small_for_msg = !valid_entry_offset(
29 LogOffset::try_from(self.cur_lsn.unwrap_or(0)).unwrap(),
30 self.config.segment_size,
31 );
32
33 if remaining_seg_too_small_for_msg {
34 self.segment_base = None;
39 }
40
41 if self.segment_base.is_none() {
42 if let Err(e) = self.read_segment() {
43 debug!("unable to load new segment: {:?}", e);
44 return None;
45 }
46 }
47
48 let lsn = self.cur_lsn.unwrap();
49
50 let _measure = Measure::new(&M.read_segment_message);
52
53 if let Some(max_lsn) = self.max_lsn {
57 if let Some(cur_lsn) = self.cur_lsn {
58 if cur_lsn > max_lsn {
59 debug!("hit max_lsn {} in iterator, stopping", max_lsn);
61 return None;
62 }
63 }
64 }
65
66 let segment_base = &self.segment_base.as_ref().unwrap();
67
68 let lid = segment_base.offset
69 + LogOffset::try_from(lsn % self.config.segment_size as Lsn)
70 .unwrap();
71
72 let expected_segment_number = SegmentNumber(
73 u64::try_from(lsn).unwrap()
74 / u64::try_from(self.config.segment_size).unwrap(),
75 );
76
77 match read_message(
78 &**segment_base,
79 lid,
80 expected_segment_number,
81 &self.config,
82 ) {
83 Ok(LogRead::Blob(header, _buf, blob_ptr, inline_len)) => {
84 trace!("read blob flush in LogIter::next");
85 self.cur_lsn = Some(lsn + Lsn::from(inline_len));
86
87 return Some((
88 LogKind::from(header.kind),
89 header.pid,
90 lsn,
91 DiskPtr::Blob(lid, blob_ptr),
92 u64::from(inline_len),
93 ));
94 }
95 Ok(LogRead::Inline(header, _buf, inline_len)) => {
96 trace!(
97 "read inline flush with header {:?} in LogIter::next",
98 header,
99 );
100 self.cur_lsn = Some(lsn + Lsn::from(inline_len));
101
102 return Some((
103 LogKind::from(header.kind),
104 header.pid,
105 lsn,
106 DiskPtr::Inline(lid),
107 u64::from(inline_len),
108 ));
109 }
110 Ok(LogRead::BatchManifest(last_lsn_in_batch, inline_len)) => {
111 if let Some(max_lsn) = self.max_lsn {
112 if last_lsn_in_batch > max_lsn {
113 debug!(
114 "cutting recovery short due to torn batch. \
115 required stable lsn: {} actual max possible lsn: {}",
116 last_lsn_in_batch,
117 self.max_lsn.unwrap()
118 );
119 return None;
120 }
121 }
122 self.cur_lsn = Some(lsn + Lsn::from(inline_len));
123 continue;
124 }
125 Ok(LogRead::Canceled(inline_len)) => {
126 trace!("read zeroed in LogIter::next");
127 self.cur_lsn = Some(lsn + Lsn::from(inline_len));
128 }
129 Ok(LogRead::Corrupted) => {
130 trace!(
131 "read corrupted msg in LogIter::next as lid {} lsn {}",
132 lid,
133 lsn
134 );
135 if self.last_stage {
136 let _taken = self.segment_base.take().unwrap();
143
144 continue;
145 } else {
146 return None;
148 }
149 }
150 Ok(LogRead::Cap(_segment_number)) => {
151 trace!("read cap in LogIter::next");
152 let _taken = self.segment_base.take().unwrap();
153
154 continue;
155 }
156 Ok(LogRead::DanglingBlob(_, blob_ptr, inline_len)) => {
157 debug!(
158 "encountered dangling blob \
159 pointer at lsn {} ptr {}",
160 lsn, blob_ptr
161 );
162 self.cur_lsn = Some(lsn + Lsn::from(inline_len));
163 continue;
164 }
165 Err(e) => {
166 debug!(
167 "failed to read log message at lid {} \
168 with expected lsn {} during iteration: {}",
169 lid, lsn, e
170 );
171 return None;
172 }
173 }
174 }
175 }
176}
177
178impl LogIter {
179 fn read_segment(&mut self) -> Result<()> {
182 let _measure = Measure::new(&M.segment_read);
183 if self.segments.is_empty() {
184 return Err(io::Error::new(
185 io::ErrorKind::Other,
186 "no segments remaining to iterate over",
187 )
188 .into());
189 }
190
191 let first_ref = self.segments.iter().next().unwrap();
192 let (lsn, offset) = (*first_ref.0, *first_ref.1);
193
194 if let Some(max_lsn) = self.max_lsn {
195 if lsn > max_lsn {
196 return Err(io::Error::new(
197 io::ErrorKind::Other,
198 "next segment is above our configured max_lsn",
199 )
200 .into());
201 }
202 }
203
204 assert!(
205 lsn + (self.config.segment_size as Lsn)
206 >= self.cur_lsn.unwrap_or(0),
207 "caller is responsible for providing segments \
208 that contain the initial cur_lsn value or higher"
209 );
210
211 trace!(
212 "LogIter::read_segment lsn: {:?} cur_lsn: {:?}",
213 lsn,
214 self.cur_lsn
215 );
216 assert!(
219 lsn + self.config.segment_size as Lsn >= self.cur_lsn.unwrap_or(0)
220 );
221 let f = &self.config.file;
222 let segment_header = read_segment_header(f, offset)?;
223 if offset % self.config.segment_size as LogOffset != 0 {
224 debug!("segment offset not divisible by segment length");
225 return Err(Error::corruption(None));
226 }
227 if segment_header.lsn % self.config.segment_size as Lsn != 0 {
228 debug!(
229 "expected a segment header lsn that is divisible \
230 by the segment_size ({}) instead it was {}",
231 self.config.segment_size, segment_header.lsn
232 );
233 return Err(Error::corruption(None));
234 }
235
236 if segment_header.lsn != lsn {
237 debug!(
239 "segment header lsn ({}) != expected lsn ({})",
240 segment_header.lsn, lsn
241 );
242 return Err(io::Error::new(
243 io::ErrorKind::Other,
244 "encountered torn segment",
245 )
246 .into());
247 }
248
249 trace!("read segment header {:?}", segment_header);
250
251 let mut buf = vec![0; self.config.segment_size];
252 let size = pread_exact_or_eof(f, &mut buf, offset)?;
253
254 trace!("setting stored segment buffer length to {} after read", size);
255 buf.truncate(size);
256
257 self.cur_lsn = Some(segment_header.lsn + SEG_HEADER_LEN as Lsn);
258
259 self.segment_base = Some(BasedBuf { buf, offset });
260
261 self.segments.remove(&lsn);
266
267 Ok(())
268 }
269}
270
271fn valid_entry_offset(lid: LogOffset, segment_len: usize) -> bool {
272 let seg_start = lid / segment_len as LogOffset * segment_len as LogOffset;
273
274 let max_lid =
275 seg_start + segment_len as LogOffset - MAX_MSG_HEADER_LEN as LogOffset;
276
277 let min_lid = seg_start + SEG_HEADER_LEN as LogOffset;
278
279 lid >= min_lid && lid <= max_lid
280}
281
282fn scan_segment_headers_and_tail(
285 min: Lsn,
286 config: &RunningConfig,
287) -> Result<(BTreeMap<Lsn, LogOffset>, Lsn)> {
288 fn fetch(
289 idx: u64,
290 min: Lsn,
291 config: &RunningConfig,
292 ) -> Option<(LogOffset, SegmentHeader)> {
293 let segment_len = u64::try_from(config.segment_size).unwrap();
294 let base_lid = idx * segment_len;
295 let segment = read_segment_header(&config.file, base_lid).ok()?;
296 trace!(
297 "SA scanned header at lid {} during startup: {:?}",
298 base_lid,
299 segment
300 );
301 if segment.ok && segment.lsn >= min {
302 assert_ne!(segment.lsn, Lsn::max_value());
303 Some((base_lid, segment))
304 } else {
305 trace!(
306 "not using segment at lid {}, ok: {} lsn: {} min lsn: {}",
307 base_lid,
308 segment.ok,
309 segment.lsn,
310 min
311 );
312 None
313 }
314 };
315
316 let segment_len = LogOffset::try_from(config.segment_size).unwrap();
317
318 let f = &config.file;
319 let file_len = f.metadata()?.len();
320 let segments = (file_len / segment_len)
321 + if file_len % segment_len
322 < LogOffset::try_from(SEG_HEADER_LEN).unwrap()
323 {
324 0
325 } else {
326 1
327 };
328
329 trace!(
330 "file len: {} segment len {} segments: {}",
331 file_len,
332 segment_len,
333 segments
334 );
335
336 let header_promises_res: Result<Vec<_>> = (0..segments)
338 .map({
339 move |idx| {
341 threadpool::spawn({
342 let config = config.clone();
343 move || fetch(idx, min, &config)
344 })
345 }
346 })
347 .collect();
348
349 let header_promises = header_promises_res?;
350
351 let mut headers: Vec<(LogOffset, SegmentHeader)> = vec![];
353 for promise in header_promises {
354 let read_attempt =
355 promise.wait().expect("thread pool should not crash");
356
357 if let Some(completed_result) = read_attempt {
358 headers.push(completed_result);
359 }
360 }
361
362 let mut ordering = BTreeMap::new();
364 let mut max_header_stable_lsn = min;
365
366 for (lid, header) in headers {
367 max_header_stable_lsn =
368 std::cmp::max(header.max_stable_lsn, max_header_stable_lsn);
369
370 if let Some(old) = ordering.insert(header.lsn, lid) {
371 assert_eq!(
372 old, lid,
373 "duplicate segment LSN {} detected at both {} and {}, \
374 one should have been zeroed out during recovery",
375 header.lsn, old, lid
376 );
377 }
378 }
379
380 debug!(
381 "ordering before clearing tears: {:?}, \
382 max_header_stable_lsn: {}",
383 ordering, max_header_stable_lsn
384 );
385
386 let end_of_last_contiguous_message_in_unstable_tail =
389 check_contiguity_in_unstable_tail(
390 max_header_stable_lsn,
391 &ordering,
392 config,
393 )?;
394
395 Ok((ordering, end_of_last_contiguous_message_in_unstable_tail))
396}
397
398fn check_contiguity_in_unstable_tail(
404 max_header_stable_lsn: Lsn,
405 ordering: &BTreeMap<Lsn, LogOffset>,
406 config: &RunningConfig,
407) -> Result<Lsn> {
408 let segment_size = config.segment_size as Lsn;
409
410 let lowest_lsn_in_tail: Lsn =
413 std::cmp::max(0, (max_header_stable_lsn / segment_size) * segment_size);
414
415 let mut expected_present = lowest_lsn_in_tail;
416 let mut missing_item_in_tail = None;
417
418 let logical_tail = ordering
419 .range(lowest_lsn_in_tail..)
420 .map(|(lsn, lid)| (*lsn, *lid))
421 .take_while(|(lsn, _lid)| {
422 let matches = expected_present == *lsn;
423 if !matches {
424 debug!(
425 "failed to find expected segment \
426 at lsn {}, tear detected",
427 expected_present
428 );
429 missing_item_in_tail = Some(expected_present);
430 }
431 expected_present += segment_size;
432 matches
433 })
434 .collect();
435
436 debug!(
437 "in clean_tail_tears, found missing item in tail: {:?} \
438 and we'll scan segments {:?} above lowest lsn {}",
439 missing_item_in_tail, logical_tail, lowest_lsn_in_tail
440 );
441
442 let mut iter = LogIter {
443 config: config.clone(),
444 segments: logical_tail,
445 segment_base: None,
446 max_lsn: missing_item_in_tail,
447 cur_lsn: None,
448 last_stage: false,
449 };
450
451 while let Some(_) = iter.next() {}
453
454 let end_of_last_message = iter.cur_lsn.unwrap_or(0) - 1;
457
458 debug!(
459 "filtering out segments after detected tear at (lsn, lid) {:?}",
460 end_of_last_message,
461 );
462
463 Ok(end_of_last_message)
464}
465
466pub fn raw_segment_iter_from(
471 lsn: Lsn,
472 config: &RunningConfig,
473) -> Result<LogIter> {
474 let segment_len = config.segment_size as Lsn;
475 let normalized_lsn = lsn / segment_len * segment_len;
476
477 let (ordering, end_of_last_msg) =
478 scan_segment_headers_and_tail(normalized_lsn, config)?;
479
480 let tip_segment_iter: BTreeMap<_, _> = ordering
482 .iter()
483 .next_back()
484 .map(|(a, b)| (*a, *b))
485 .into_iter()
486 .collect();
487
488 trace!(
489 "trying to find the max stable tip for \
490 bounding batch manifests with segment iter {:?} \
491 of segments >= first_tip {}",
492 tip_segment_iter,
493 end_of_last_msg,
494 );
495
496 trace!(
497 "generated iterator over segments {:?} with lsn >= {}",
498 ordering,
499 normalized_lsn,
500 );
501
502 let ordering = ordering;
503
504 let segments = ordering
505 .into_iter()
506 .filter(move |&(l, _)| l >= normalized_lsn)
507 .collect();
508
509 Ok(LogIter {
510 config: config.clone(),
511 max_lsn: Some(end_of_last_msg),
512 cur_lsn: None,
513 segment_base: None,
514 segments,
515 last_stage: true,
516 })
517}