sled/pagecache/
iterator.rs

1use std::{collections::BTreeMap, io};
2
3use super::{
4    pread_exact_or_eof, read_message, read_segment_header, BasedBuf, DiskPtr,
5    LogKind, LogOffset, LogRead, Lsn, SegmentHeader, SegmentNumber,
6    MAX_MSG_HEADER_LEN, SEG_HEADER_LEN,
7};
8use crate::*;
9
10#[derive(Debug)]
11pub struct LogIter {
12    pub config: RunningConfig,
13    pub segments: BTreeMap<Lsn, LogOffset>,
14    pub segment_base: Option<BasedBuf>,
15    pub max_lsn: Option<Lsn>,
16    pub cur_lsn: Option<Lsn>,
17    pub last_stage: bool,
18}
19
20impl Iterator for LogIter {
21    type Item = (LogKind, PageId, Lsn, DiskPtr, u64);
22
23    fn next(&mut self) -> Option<Self::Item> {
24        // If segment is None, get next on segment_iter, panic
25        // if we can't read something we expect to be able to,
26        // return None if there are no more remaining segments.
27        loop {
28            let remaining_seg_too_small_for_msg = !valid_entry_offset(
29                LogOffset::try_from(self.cur_lsn.unwrap_or(0)).unwrap(),
30                self.config.segment_size,
31            );
32
33            if remaining_seg_too_small_for_msg {
34                // clearing this also communicates to code in
35                // the snapshot generation logic that there was
36                // no more available space for a message in the
37                // last read segment
38                self.segment_base = None;
39            }
40
41            if self.segment_base.is_none() {
42                if let Err(e) = self.read_segment() {
43                    debug!("unable to load new segment: {:?}", e);
44                    return None;
45                }
46            }
47
48            let lsn = self.cur_lsn.unwrap();
49
50            // self.segment_base is `Some` now.
51            let _measure = Measure::new(&M.read_segment_message);
52
53            // NB this inequality must be greater than or equal to the
54            // max_lsn. max_lsn may be set to the beginning of the first
55            // corrupt message encountered in the previous sweep of recovery.
56            if let Some(max_lsn) = self.max_lsn {
57                if let Some(cur_lsn) = self.cur_lsn {
58                    if cur_lsn > max_lsn {
59                        // all done
60                        debug!("hit max_lsn {} in iterator, stopping", max_lsn);
61                        return None;
62                    }
63                }
64            }
65
66            let segment_base = &self.segment_base.as_ref().unwrap();
67
68            let lid = segment_base.offset
69                + LogOffset::try_from(lsn % self.config.segment_size as Lsn)
70                    .unwrap();
71
72            let expected_segment_number = SegmentNumber(
73                u64::try_from(lsn).unwrap()
74                    / u64::try_from(self.config.segment_size).unwrap(),
75            );
76
77            match read_message(
78                &**segment_base,
79                lid,
80                expected_segment_number,
81                &self.config,
82            ) {
83                Ok(LogRead::Blob(header, _buf, blob_ptr, inline_len)) => {
84                    trace!("read blob flush in LogIter::next");
85                    self.cur_lsn = Some(lsn + Lsn::from(inline_len));
86
87                    return Some((
88                        LogKind::from(header.kind),
89                        header.pid,
90                        lsn,
91                        DiskPtr::Blob(lid, blob_ptr),
92                        u64::from(inline_len),
93                    ));
94                }
95                Ok(LogRead::Inline(header, _buf, inline_len)) => {
96                    trace!(
97                        "read inline flush with header {:?} in LogIter::next",
98                        header,
99                    );
100                    self.cur_lsn = Some(lsn + Lsn::from(inline_len));
101
102                    return Some((
103                        LogKind::from(header.kind),
104                        header.pid,
105                        lsn,
106                        DiskPtr::Inline(lid),
107                        u64::from(inline_len),
108                    ));
109                }
110                Ok(LogRead::BatchManifest(last_lsn_in_batch, inline_len)) => {
111                    if let Some(max_lsn) = self.max_lsn {
112                        if last_lsn_in_batch > max_lsn {
113                            debug!(
114                                "cutting recovery short due to torn batch. \
115                            required stable lsn: {} actual max possible lsn: {}",
116                                last_lsn_in_batch,
117                                self.max_lsn.unwrap()
118                            );
119                            return None;
120                        }
121                    }
122                    self.cur_lsn = Some(lsn + Lsn::from(inline_len));
123                    continue;
124                }
125                Ok(LogRead::Canceled(inline_len)) => {
126                    trace!("read zeroed in LogIter::next");
127                    self.cur_lsn = Some(lsn + Lsn::from(inline_len));
128                }
129                Ok(LogRead::Corrupted) => {
130                    trace!(
131                        "read corrupted msg in LogIter::next as lid {} lsn {}",
132                        lid,
133                        lsn
134                    );
135                    if self.last_stage {
136                        // this happens when the second half of a freed segment
137                        // is overwritten before its segment header. it's fine
138                        // to just treat it like a cap
139                        // because any already applied
140                        // state can be assumed to be replaced later on by
141                        // the stabilized state that came afterwards.
142                        let _taken = self.segment_base.take().unwrap();
143
144                        continue;
145                    } else {
146                        // found a tear
147                        return None;
148                    }
149                }
150                Ok(LogRead::Cap(_segment_number)) => {
151                    trace!("read cap in LogIter::next");
152                    let _taken = self.segment_base.take().unwrap();
153
154                    continue;
155                }
156                Ok(LogRead::DanglingBlob(_, blob_ptr, inline_len)) => {
157                    debug!(
158                        "encountered dangling blob \
159                         pointer at lsn {} ptr {}",
160                        lsn, blob_ptr
161                    );
162                    self.cur_lsn = Some(lsn + Lsn::from(inline_len));
163                    continue;
164                }
165                Err(e) => {
166                    debug!(
167                        "failed to read log message at lid {} \
168                         with expected lsn {} during iteration: {}",
169                        lid, lsn, e
170                    );
171                    return None;
172                }
173            }
174        }
175    }
176}
177
178impl LogIter {
179    /// read a segment of log messages. Only call after
180    /// pausing segment rewriting on the segment accountant!
181    fn read_segment(&mut self) -> Result<()> {
182        let _measure = Measure::new(&M.segment_read);
183        if self.segments.is_empty() {
184            return Err(io::Error::new(
185                io::ErrorKind::Other,
186                "no segments remaining to iterate over",
187            )
188            .into());
189        }
190
191        let first_ref = self.segments.iter().next().unwrap();
192        let (lsn, offset) = (*first_ref.0, *first_ref.1);
193
194        if let Some(max_lsn) = self.max_lsn {
195            if lsn > max_lsn {
196                return Err(io::Error::new(
197                    io::ErrorKind::Other,
198                    "next segment is above our configured max_lsn",
199                )
200                .into());
201            }
202        }
203
204        assert!(
205            lsn + (self.config.segment_size as Lsn)
206                >= self.cur_lsn.unwrap_or(0),
207            "caller is responsible for providing segments \
208             that contain the initial cur_lsn value or higher"
209        );
210
211        trace!(
212            "LogIter::read_segment lsn: {:?} cur_lsn: {:?}",
213            lsn,
214            self.cur_lsn
215        );
216        // we add segment_len to this check because we may be getting the
217        // initial segment that is a bit behind where we left off before.
218        assert!(
219            lsn + self.config.segment_size as Lsn >= self.cur_lsn.unwrap_or(0)
220        );
221        let f = &self.config.file;
222        let segment_header = read_segment_header(f, offset)?;
223        if offset % self.config.segment_size as LogOffset != 0 {
224            debug!("segment offset not divisible by segment length");
225            return Err(Error::corruption(None));
226        }
227        if segment_header.lsn % self.config.segment_size as Lsn != 0 {
228            debug!(
229                "expected a segment header lsn that is divisible \
230                 by the segment_size ({}) instead it was {}",
231                self.config.segment_size, segment_header.lsn
232            );
233            return Err(Error::corruption(None));
234        }
235
236        if segment_header.lsn != lsn {
237            // this page was torn, nothing to read
238            debug!(
239                "segment header lsn ({}) != expected lsn ({})",
240                segment_header.lsn, lsn
241            );
242            return Err(io::Error::new(
243                io::ErrorKind::Other,
244                "encountered torn segment",
245            )
246            .into());
247        }
248
249        trace!("read segment header {:?}", segment_header);
250
251        let mut buf = vec![0; self.config.segment_size];
252        let size = pread_exact_or_eof(f, &mut buf, offset)?;
253
254        trace!("setting stored segment buffer length to {} after read", size);
255        buf.truncate(size);
256
257        self.cur_lsn = Some(segment_header.lsn + SEG_HEADER_LEN as Lsn);
258
259        self.segment_base = Some(BasedBuf { buf, offset });
260
261        // NB this should only happen after we've successfully read
262        // the header, because we want to zero the segment if we
263        // fail to read that, and we use the remaining segment
264        // list to perform zeroing off of.
265        self.segments.remove(&lsn);
266
267        Ok(())
268    }
269}
270
271fn valid_entry_offset(lid: LogOffset, segment_len: usize) -> bool {
272    let seg_start = lid / segment_len as LogOffset * segment_len as LogOffset;
273
274    let max_lid =
275        seg_start + segment_len as LogOffset - MAX_MSG_HEADER_LEN as LogOffset;
276
277    let min_lid = seg_start + SEG_HEADER_LEN as LogOffset;
278
279    lid >= min_lid && lid <= max_lid
280}
281
282// Scan the log file if we don't know of any Lsn offsets yet,
283// and recover the order of segments, and the highest Lsn.
284fn scan_segment_headers_and_tail(
285    min: Lsn,
286    config: &RunningConfig,
287) -> Result<(BTreeMap<Lsn, LogOffset>, Lsn)> {
288    fn fetch(
289        idx: u64,
290        min: Lsn,
291        config: &RunningConfig,
292    ) -> Option<(LogOffset, SegmentHeader)> {
293        let segment_len = u64::try_from(config.segment_size).unwrap();
294        let base_lid = idx * segment_len;
295        let segment = read_segment_header(&config.file, base_lid).ok()?;
296        trace!(
297            "SA scanned header at lid {} during startup: {:?}",
298            base_lid,
299            segment
300        );
301        if segment.ok && segment.lsn >= min {
302            assert_ne!(segment.lsn, Lsn::max_value());
303            Some((base_lid, segment))
304        } else {
305            trace!(
306                "not using segment at lid {}, ok: {} lsn: {} min lsn: {}",
307                base_lid,
308                segment.ok,
309                segment.lsn,
310                min
311            );
312            None
313        }
314    };
315
316    let segment_len = LogOffset::try_from(config.segment_size).unwrap();
317
318    let f = &config.file;
319    let file_len = f.metadata()?.len();
320    let segments = (file_len / segment_len)
321        + if file_len % segment_len
322            < LogOffset::try_from(SEG_HEADER_LEN).unwrap()
323        {
324            0
325        } else {
326            1
327        };
328
329    trace!(
330        "file len: {} segment len {} segments: {}",
331        file_len,
332        segment_len,
333        segments
334    );
335
336    // scatter
337    let header_promises_res: Result<Vec<_>> = (0..segments)
338        .map({
339            // let config = config.clone();
340            move |idx| {
341                threadpool::spawn({
342                    let config = config.clone();
343                    move || fetch(idx, min, &config)
344                })
345            }
346        })
347        .collect();
348
349    let header_promises = header_promises_res?;
350
351    // gather
352    let mut headers: Vec<(LogOffset, SegmentHeader)> = vec![];
353    for promise in header_promises {
354        let read_attempt =
355            promise.wait().expect("thread pool should not crash");
356
357        if let Some(completed_result) = read_attempt {
358            headers.push(completed_result);
359        }
360    }
361
362    // find max stable LSN recorded in segment headers
363    let mut ordering = BTreeMap::new();
364    let mut max_header_stable_lsn = min;
365
366    for (lid, header) in headers {
367        max_header_stable_lsn =
368            std::cmp::max(header.max_stable_lsn, max_header_stable_lsn);
369
370        if let Some(old) = ordering.insert(header.lsn, lid) {
371            assert_eq!(
372                old, lid,
373                "duplicate segment LSN {} detected at both {} and {}, \
374                 one should have been zeroed out during recovery",
375                header.lsn, old, lid
376            );
377        }
378    }
379
380    debug!(
381        "ordering before clearing tears: {:?}, \
382         max_header_stable_lsn: {}",
383        ordering, max_header_stable_lsn
384    );
385
386    // Check that the segments above max_header_stable_lsn
387    // properly link their previous segment pointers.
388    let end_of_last_contiguous_message_in_unstable_tail =
389        check_contiguity_in_unstable_tail(
390            max_header_stable_lsn,
391            &ordering,
392            config,
393        )?;
394
395    Ok((ordering, end_of_last_contiguous_message_in_unstable_tail))
396}
397
398// This ensures that the last <# io buffers> segments on
399// disk connect via their previous segment pointers in
400// the header. This is important because we expect that
401// the last <# io buffers> segments will join up, and we
402// never reuse buffers within this safety range.
403fn check_contiguity_in_unstable_tail(
404    max_header_stable_lsn: Lsn,
405    ordering: &BTreeMap<Lsn, LogOffset>,
406    config: &RunningConfig,
407) -> Result<Lsn> {
408    let segment_size = config.segment_size as Lsn;
409
410    // -1..(2 *  segment_size) - 1 => 0
411    // otherwise the floor of the buffer
412    let lowest_lsn_in_tail: Lsn =
413        std::cmp::max(0, (max_header_stable_lsn / segment_size) * segment_size);
414
415    let mut expected_present = lowest_lsn_in_tail;
416    let mut missing_item_in_tail = None;
417
418    let logical_tail = ordering
419        .range(lowest_lsn_in_tail..)
420        .map(|(lsn, lid)| (*lsn, *lid))
421        .take_while(|(lsn, _lid)| {
422            let matches = expected_present == *lsn;
423            if !matches {
424                debug!(
425                    "failed to find expected segment \
426                     at lsn {}, tear detected",
427                    expected_present
428                );
429                missing_item_in_tail = Some(expected_present);
430            }
431            expected_present += segment_size;
432            matches
433        })
434        .collect();
435
436    debug!(
437        "in clean_tail_tears, found missing item in tail: {:?} \
438         and we'll scan segments {:?} above lowest lsn {}",
439        missing_item_in_tail, logical_tail, lowest_lsn_in_tail
440    );
441
442    let mut iter = LogIter {
443        config: config.clone(),
444        segments: logical_tail,
445        segment_base: None,
446        max_lsn: missing_item_in_tail,
447        cur_lsn: None,
448        last_stage: false,
449    };
450
451    // run the iterator to completion
452    while let Some(_) = iter.next() {}
453
454    // `cur_lsn` is set to the beginning
455    // of the next message
456    let end_of_last_message = iter.cur_lsn.unwrap_or(0) - 1;
457
458    debug!(
459        "filtering out segments after detected tear at (lsn, lid) {:?}",
460        end_of_last_message,
461    );
462
463    Ok(end_of_last_message)
464}
465
466/// Returns a log iterator, the max stable lsn,
467/// and a set of segments that can be
468/// zeroed after the new snapshot is written,
469/// but no sooner, otherwise it is not crash-safe.
470pub fn raw_segment_iter_from(
471    lsn: Lsn,
472    config: &RunningConfig,
473) -> Result<LogIter> {
474    let segment_len = config.segment_size as Lsn;
475    let normalized_lsn = lsn / segment_len * segment_len;
476
477    let (ordering, end_of_last_msg) =
478        scan_segment_headers_and_tail(normalized_lsn, config)?;
479
480    // find the last stable tip, to properly handle batch manifests.
481    let tip_segment_iter: BTreeMap<_, _> = ordering
482        .iter()
483        .next_back()
484        .map(|(a, b)| (*a, *b))
485        .into_iter()
486        .collect();
487
488    trace!(
489        "trying to find the max stable tip for \
490         bounding batch manifests with segment iter {:?} \
491         of segments >= first_tip {}",
492        tip_segment_iter,
493        end_of_last_msg,
494    );
495
496    trace!(
497        "generated iterator over segments {:?} with lsn >= {}",
498        ordering,
499        normalized_lsn,
500    );
501
502    let ordering = ordering;
503
504    let segments = ordering
505        .into_iter()
506        .filter(move |&(l, _)| l >= normalized_lsn)
507        .collect();
508
509    Ok(LogIter {
510        config: config.clone(),
511        max_lsn: Some(end_of_last_msg),
512        cur_lsn: None,
513        segment_base: None,
514        segments,
515        last_stage: true,
516    })
517}