sled/pagecache/
segment.rs

1//! The `SegmentAccountant` is an allocator for equally-
2//! sized chunks of the underlying storage file (segments).
3//!
4//! It must maintain these critical safety properties:
5//!
6//! A. We must not overwrite existing segments when they
7//!    contain the most-recent stable state for a page.
8//! B. We must not overwrite existing segments when active
9//!    threads may have references to `LogOffset`'s that point
10//!    into those segments.
11//!
12//! To complicate matters, the `PageCache` only knows
13//! when it has put a page into an IO buffer, but it
14//! doesn't keep track of when that IO buffer is
15//! stabilized (until write coalescing is implemented).
16//!
17//! To address these safety concerns, we rely on
18//! these techniques:
19//!
20//! 1. We delay the reuse of any existing segment
21//!    by ensuring that we never deactivate a
22//!    segment until all data written into it, as
23//!    well as all data written to earlier segments,
24//!    has been written to disk and fsynced.
25//! 2. we use a `epoch::Guard::defer()` from
26//!    `IoBufs::write_to_log` that guarantees
27//!    that we defer all segment deactivation
28//!    until all threads are finished that
29//!    may have witnessed pointers into a segment
30//!    that will be marked for reuse in the future.
31//!
32//! Another concern that arises due to the fact that
33//! IO buffers may be written out-of-order is the
34//! correct recovery of segments. If there is data
35//! loss in recently written segments, we must be
36//! careful to preserve linearizability in the log.
37//! To do this, we must detect "torn segments" that
38//! were not able to be fully written before a crash
39//! happened.
40//!
41//! But what if we wrote a later segment before we
42//! were able to write its immediate predecessor segment,
43//! and then a crash happened? We must preserve
44//! linearizability, so we must not recover the later
45//! segment when its predecessor was lost in the crash.
46//!
47//! 3. This case is solved again by having a concept of
48//!    an "unstable tail" of segments that, during recovery,
49//!    must appear consecutively among the recovered
50//!    segments with the highest LSN numbers. We
51//!    prevent reuse of segments while they remain in
52//!    this "unstable tail" by only allowing them to be
53//!    reallocated after another later segment has written
54//!    a "stable consecutive lsn" into its own header
55//!    that is higher than ours.
56
57#![allow(unused_results)]
58
59use std::{collections::BTreeSet, mem};
60
61use super::PageState;
62
63use crate::pagecache::*;
64use crate::*;
65
66/// A operation that can be applied asynchronously.
67#[derive(Debug, Clone)]
68pub(crate) enum SegmentOp {
69    Link {
70        pid: PageId,
71        cache_info: CacheInfo,
72    },
73    Replace {
74        pid: PageId,
75        lsn: Lsn,
76        old_cache_infos: Vec<CacheInfo>,
77        new_cache_info: CacheInfo,
78    },
79}
80
81/// The segment accountant keeps track of the logical blocks
82/// of storage. It scans through all segments quickly during
83/// recovery and attempts to locate torn segments.
84#[derive(Debug)]
85pub(crate) struct SegmentAccountant {
86    // static or one-time set
87    config: RunningConfig,
88
89    // TODO these should be sharded to improve performance
90    segments: Vec<Segment>,
91
92    // TODO put behind a single mutex
93    free: BTreeSet<LogOffset>,
94    tip: LogOffset,
95    max_stabilized_lsn: Lsn,
96    segment_cleaner: SegmentCleaner,
97    ordering: BTreeMap<Lsn, LogOffset>,
98    async_truncations: BTreeMap<LogOffset, OneShot<Result<()>>>,
99}
100
101#[derive(Debug, Clone, Default)]
102pub(crate) struct SegmentCleaner {
103    inner: Arc<Mutex<BTreeMap<LogOffset, BTreeSet<PageId>>>>,
104}
105
106impl SegmentCleaner {
107    pub(crate) fn pop(&self) -> Option<(PageId, LogOffset)> {
108        let mut inner = self.inner.lock();
109        let offset = {
110            let (offset, pids) = inner.iter_mut().next()?;
111            if !pids.is_empty() {
112                let pid = pids.iter().next().copied().unwrap();
113                pids.remove(&pid);
114                return Some((pid, *offset));
115            }
116            *offset
117        };
118        inner.remove(&offset);
119        None
120    }
121
122    fn add_pids(&self, offset: LogOffset, pids: BTreeSet<PageId>) {
123        let mut inner = self.inner.lock();
124        let prev = inner.insert(offset, pids);
125        assert!(prev.is_none());
126    }
127
128    fn remove_pids(&self, offset: LogOffset) {
129        let mut inner = self.inner.lock();
130        inner.remove(&offset);
131    }
132}
133
134impl Drop for SegmentAccountant {
135    fn drop(&mut self) {
136        for segment in &self.segments {
137            let segment_utilization = match segment {
138                Segment::Free(_) | Segment::Draining(_) => 0,
139                Segment::Active(Active { rss, .. })
140                | Segment::Inactive(Inactive { rss, .. }) => *rss,
141            };
142            M.segment_utilization_shutdown.measure(segment_utilization as u64);
143        }
144    }
145}
146
147#[derive(Debug, Clone, Default)]
148struct Free {
149    previous_lsn: Option<Lsn>,
150}
151
152#[derive(Debug, Clone, Default)]
153struct Active {
154    lsn: Lsn,
155    rss: usize,
156    deferred_replaced_rss: usize,
157    deferred_replaced_pids: BTreeSet<PageId>,
158    pids: BTreeSet<PageId>,
159    latest_replacement_lsn: Lsn,
160    can_free_upon_deactivation: FastSet8<Lsn>,
161    deferred_rm_blob: FastSet8<BlobPointer>,
162}
163
164#[derive(Debug, Clone, Default)]
165struct Inactive {
166    lsn: Lsn,
167    rss: usize,
168    pids: BTreeSet<PageId>,
169    max_pids: usize,
170    replaced_pids: usize,
171    latest_replacement_lsn: Lsn,
172}
173
174#[derive(Debug, Clone, Default)]
175struct Draining {
176    lsn: Lsn,
177    max_pids: usize,
178    replaced_pids: usize,
179    latest_replacement_lsn: Lsn,
180}
181
182/// A `Segment` holds the bookkeeping information for
183/// a contiguous block of the disk. It may contain many
184/// fragments from different pages. Over time, we track
185/// when segments become reusable and allow them to be
186/// overwritten for new data.
187#[derive(Clone, Debug)]
188enum Segment {
189    /// the segment is marked for reuse, should never receive
190    /// new pids,
191    Free(Free),
192
193    /// the segment is being written to or actively recovered, and
194    /// will have pages assigned to it
195    Active(Active),
196
197    /// the segment is no longer being written to or recovered, and
198    /// will have pages marked as relocated from it
199    Inactive(Inactive),
200
201    /// the segment is having its resident pages relocated before
202    /// becoming free
203    Draining(Draining),
204}
205
206impl Default for Segment {
207    fn default() -> Self {
208        Segment::Free(Free { previous_lsn: None })
209    }
210}
211
212impl Segment {
213    fn is_free(&self) -> bool {
214        if let Segment::Free(_) = self {
215            true
216        } else {
217            false
218        }
219    }
220
221    fn is_active(&self) -> bool {
222        if let Segment::Active { .. } = self {
223            true
224        } else {
225            false
226        }
227    }
228
229    fn is_inactive(&self) -> bool {
230        if let Segment::Inactive { .. } = self {
231            true
232        } else {
233            false
234        }
235    }
236
237    fn free_to_active(&mut self, new_lsn: Lsn) {
238        trace!("setting Segment to Active with new lsn {:?}", new_lsn,);
239        assert!(self.is_free());
240
241        *self = Segment::Active(Active {
242            lsn: new_lsn,
243            rss: 0,
244            deferred_replaced_rss: 0,
245            deferred_replaced_pids: BTreeSet::default(),
246            pids: BTreeSet::default(),
247            latest_replacement_lsn: 0,
248            can_free_upon_deactivation: FastSet8::default(),
249            deferred_rm_blob: FastSet8::default(),
250        })
251    }
252
253    /// Transitions a segment to being in the `Inactive` state.
254    /// Returns the set of page replacements that happened
255    /// while this Segment was Active
256    fn active_to_inactive(
257        &mut self,
258        lsn: Lsn,
259        config: &Config,
260    ) -> Result<FastSet8<Lsn>> {
261        trace!("setting Segment with lsn {:?} to Inactive", self.lsn());
262
263        let (inactive, ret) = if let Segment::Active(active) = self {
264            assert!(lsn >= active.lsn);
265
266            // now we can push any deferred blob removals to the removed set
267            for ptr in &active.deferred_rm_blob {
268                trace!(
269                    "removing blob {} while transitioning \
270                     segment lsn {:?} to Inactive",
271                    ptr,
272                    active.lsn,
273                );
274                remove_blob(*ptr, config)?;
275            }
276
277            let inactive = Segment::Inactive(Inactive {
278                lsn: active.lsn,
279                rss: active
280                    .rss
281                    .checked_sub(active.deferred_replaced_rss)
282                    .unwrap(),
283                max_pids: active.pids.len(),
284                replaced_pids: active.deferred_replaced_pids.len(),
285                pids: &active.pids - &active.deferred_replaced_pids,
286                latest_replacement_lsn: active.latest_replacement_lsn,
287            });
288
289            let can_free = mem::replace(
290                &mut active.can_free_upon_deactivation,
291                Default::default(),
292            );
293
294            (inactive, can_free)
295        } else {
296            panic!("called active_to_inactive on {:?}", self);
297        };
298
299        *self = inactive;
300        Ok(ret)
301    }
302
303    fn inactive_to_draining(&mut self, lsn: Lsn) -> BTreeSet<PageId> {
304        trace!("setting Segment with lsn {:?} to Draining", self.lsn());
305
306        if let Segment::Inactive(inactive) = self {
307            assert!(lsn >= inactive.lsn);
308            let ret = mem::replace(&mut inactive.pids, Default::default());
309            *self = Segment::Draining(Draining {
310                lsn: inactive.lsn,
311                max_pids: inactive.max_pids,
312                replaced_pids: inactive.replaced_pids,
313                latest_replacement_lsn: inactive.latest_replacement_lsn,
314            });
315            ret
316        } else {
317            panic!("called inactive_to_draining on {:?}", self);
318        }
319    }
320
321    fn defer_free_lsn(&mut self, lsn: Lsn) {
322        if let Segment::Active(active) = self {
323            active.can_free_upon_deactivation.insert(lsn);
324        } else {
325            panic!("called defer_free_lsn on segment {:?}", self);
326        }
327    }
328
329    fn draining_to_free(&mut self, lsn: Lsn) -> Lsn {
330        trace!("setting Segment with lsn {:?} to Free", self.lsn());
331
332        if let Segment::Draining(draining) = self {
333            let old_lsn = draining.lsn;
334            assert!(lsn >= old_lsn);
335            let replacement_lsn = draining.latest_replacement_lsn;
336            *self = Segment::Free(Free { previous_lsn: Some(old_lsn) });
337            replacement_lsn
338        } else {
339            panic!("called draining_to_free on {:?}", self);
340        }
341    }
342
343    fn recovery_ensure_initialized(&mut self, lsn: Lsn) {
344        if self.is_free() {
345            trace!("(snapshot) recovering segment with base lsn {}", lsn);
346            self.free_to_active(lsn);
347        }
348    }
349
350    fn lsn(&self) -> Lsn {
351        match self {
352            Segment::Active(Active { lsn, .. })
353            | Segment::Inactive(Inactive { lsn, .. })
354            | Segment::Draining(Draining { lsn, .. }) => *lsn,
355            Segment::Free(_) => panic!("called lsn on Segment::Free"),
356        }
357    }
358
359    /// Add a pid to the Segment. The caller must provide
360    /// the Segment's LSN.
361    fn insert_pid(&mut self, pid: PageId, lsn: Lsn, size: usize) {
362        // if this breaks, maybe we didn't implement the transition
363        // logic right in write_to_log, and maybe a thread is
364        // using the SA to add pids AFTER their calls to
365        // res.complete() worked.
366        if let Segment::Active(active) = self {
367            assert_eq!(
368                lsn, active.lsn,
369                "insert_pid specified lsn {} for pid {} in segment {:?}",
370                lsn, pid, active
371            );
372            active.pids.insert(pid);
373            active.rss += size;
374        } else {
375            panic!("called insert_pid on {:?}", self);
376        }
377    }
378
379    fn remove_pid(&mut self, pid: PageId, replacement_lsn: Lsn, sz: usize) {
380        trace!(
381            "removing pid {} at lsn {} from segment {:?}",
382            pid,
383            replacement_lsn,
384            self
385        );
386        match self {
387            Segment::Active(active) => {
388                assert!(active.lsn <= replacement_lsn);
389                if replacement_lsn != active.lsn {
390                    active.deferred_replaced_pids.insert(pid);
391                }
392                active.deferred_replaced_rss += sz;
393                if replacement_lsn > active.latest_replacement_lsn {
394                    active.latest_replacement_lsn = replacement_lsn;
395                }
396            }
397            Segment::Inactive(Inactive {
398                pids,
399                lsn,
400                rss,
401                latest_replacement_lsn,
402                replaced_pids,
403                ..
404            }) => {
405                assert!(*lsn <= replacement_lsn);
406                if replacement_lsn != *lsn {
407                    pids.remove(&pid);
408                    *replaced_pids += 1;
409                }
410                *rss = rss.checked_sub(sz).unwrap();
411                if replacement_lsn > *latest_replacement_lsn {
412                    *latest_replacement_lsn = replacement_lsn;
413                }
414            }
415            Segment::Draining(Draining {
416                lsn,
417                latest_replacement_lsn,
418                replaced_pids,
419                ..
420            }) => {
421                assert!(*lsn <= replacement_lsn);
422                if replacement_lsn != *lsn {
423                    *replaced_pids += 1;
424                }
425                if replacement_lsn > *latest_replacement_lsn {
426                    *latest_replacement_lsn = replacement_lsn;
427                }
428            }
429            Segment::Free(_) => {
430                panic!("called remove pid {} on Segment::Free", pid)
431            }
432        }
433    }
434
435    fn remove_blob(
436        &mut self,
437        blob_ptr: BlobPointer,
438        config: &Config,
439    ) -> Result<()> {
440        match self {
441            Segment::Active(active) => {
442                // we have received a removal before
443                // transferring this segment to Inactive, so
444                // we defer this pid's removal until the transfer.
445                active.deferred_rm_blob.insert(blob_ptr);
446            }
447            Segment::Inactive(_) | Segment::Draining(_) => {
448                trace!(
449                    "directly removing blob {} that was referred-to \
450                     in a segment that has already been marked as Inactive \
451                     or Draining.",
452                    blob_ptr,
453                );
454                remove_blob(blob_ptr, config)?;
455            }
456            Segment::Free(_) => panic!("remove_blob called on a Free Segment"),
457        }
458
459        Ok(())
460    }
461
462    fn can_free(&self) -> bool {
463        if let Segment::Draining(draining) = self {
464            draining.replaced_pids == draining.max_pids
465        } else {
466            false
467        }
468    }
469}
470
471impl SegmentAccountant {
472    /// Create a new `SegmentAccountant` from previously recovered segments.
473    pub(super) fn start(
474        config: RunningConfig,
475        snapshot: &Snapshot,
476        segment_cleaner: SegmentCleaner,
477    ) -> Result<Self> {
478        let _measure = Measure::new(&M.start_segment_accountant);
479        let mut ret = Self {
480            config,
481            segments: vec![],
482            free: BTreeSet::default(),
483            tip: 0,
484            max_stabilized_lsn: -1,
485            segment_cleaner,
486            ordering: BTreeMap::default(),
487            async_truncations: BTreeMap::default(),
488        };
489
490        ret.initialize_from_snapshot(snapshot)?;
491
492        if let Some(max_free) = ret.free.iter().max() {
493            assert!(
494                ret.tip > *max_free,
495                "expected recovered tip {} to \
496                be above max item in recovered \
497                free list {:?}",
498                ret.tip,
499                ret.free
500            );
501        }
502
503        debug!(
504            "SA starting with tip {} stable {} free {:?}",
505            ret.tip, ret.max_stabilized_lsn, ret.free,
506        );
507
508        Ok(ret)
509    }
510
511    fn initial_segments(&self, snapshot: &Snapshot) -> Result<Vec<Segment>> {
512        let segment_size = self.config.segment_size;
513        let file_len = self.config.file.metadata()?.len();
514        let number_of_segments =
515            usize::try_from(file_len / segment_size as u64).unwrap()
516                + if file_len % segment_size as u64 == 0 { 0 } else { 1 };
517
518        // generate segments from snapshot lids
519        let mut segments = vec![Segment::default(); number_of_segments];
520
521        // sometimes the current segment is still empty, after only
522        // recovering the segment header but no valid messages yet
523        if let Some(tip_lid) = snapshot.active_segment {
524            let tip_idx =
525                usize::try_from(tip_lid / segment_size as LogOffset).unwrap();
526            if tip_idx == number_of_segments {
527                segments.push(Segment::default());
528            }
529            if segments.len() <= tip_idx {
530                error!("failed to properly initialize segments, suspected disk corruption");
531                return Err(Error::corruption(None));
532            }
533            trace!(
534                "setting segment for tip_lid {} to stable_lsn {}",
535                tip_lid,
536                self.config.normalize(snapshot.stable_lsn.unwrap_or(0))
537            );
538            segments[tip_idx].recovery_ensure_initialized(
539                self.config.normalize(snapshot.stable_lsn.unwrap_or(0)),
540            );
541        }
542
543        let add =
544            |pid, lsn: Lsn, sz, lid: LogOffset, segments: &mut Vec<Segment>| {
545                let idx = assert_usize(lid / segment_size as LogOffset);
546                trace!(
547                    "adding lsn: {} lid: {} sz: {} for \
548                     pid {} to segment {} during SA recovery",
549                    lsn,
550                    lid,
551                    sz,
552                    pid,
553                    idx
554                );
555                let segment_lsn = self.config.normalize(lsn);
556                segments[idx].recovery_ensure_initialized(segment_lsn);
557                segments[idx].insert_pid(
558                    pid,
559                    segment_lsn,
560                    usize::try_from(sz).unwrap(),
561                );
562            };
563
564        for (pid, state) in snapshot.pt.iter().enumerate() {
565            match state {
566                PageState::Present { base, frags } => {
567                    add(
568                        pid as PageId,
569                        base.0,
570                        base.2,
571                        base.1.lid(),
572                        &mut segments,
573                    );
574                    for (lsn, ptr, sz) in frags {
575                        add(pid as PageId, *lsn, *sz, ptr.lid(), &mut segments);
576                    }
577                }
578                PageState::Free(lsn, ptr) => {
579                    add(
580                        pid as PageId,
581                        *lsn,
582                        u64::try_from(MAX_MSG_HEADER_LEN).unwrap(),
583                        ptr.lid(),
584                        &mut segments,
585                    );
586                }
587                _ => panic!("tried to recover pagestate from a {:?}", state),
588            }
589        }
590
591        Ok(segments)
592    }
593
594    fn initialize_from_snapshot(&mut self, snapshot: &Snapshot) -> Result<()> {
595        let segment_size = self.config.segment_size;
596        let segments = self.initial_segments(snapshot)?;
597
598        self.segments = segments;
599
600        let mut to_free = vec![];
601        let mut maybe_clean = vec![];
602
603        let currently_active_segment = snapshot
604            .active_segment
605            .map(|tl| usize::try_from(tl / segment_size as LogOffset).unwrap());
606
607        for (idx, segment) in self.segments.iter_mut().enumerate() {
608            let segment_base = idx as LogOffset * segment_size as LogOffset;
609
610            if segment_base >= self.tip {
611                // set tip above the beginning of any
612                self.tip = segment_base + segment_size as LogOffset;
613                trace!(
614                    "raised self.tip to {} during SA initialization",
615                    self.tip
616                );
617            }
618
619            if segment.is_free() {
620                // this segment was not used in the recovered
621                // snapshot, so we can assume it is free
622                to_free.push(segment_base);
623                continue;
624            }
625
626            let segment_lsn = segment.lsn();
627
628            if let Some(tip_idx) = currently_active_segment {
629                if tip_idx != idx {
630                    maybe_clean.push((idx, segment_lsn));
631                }
632            }
633        }
634
635        for segment_base in to_free {
636            self.free_segment(segment_base)?;
637            io_fail!(self.config, "zero garbage segment SA");
638            pwrite_all(
639                &self.config.file,
640                &*vec![MessageKind::Corrupted.into(); self.config.segment_size],
641                segment_base,
642            )?;
643        }
644
645        // we want to complete all truncations because
646        // they could cause calls to `next` to block.
647        for (_, promise) in self.async_truncations.split_off(&0) {
648            promise.wait().expect("threadpool should not crash")?;
649        }
650
651        for (idx, segment_lsn) in maybe_clean {
652            self.possibly_clean_or_free_segment(idx, segment_lsn)?;
653        }
654
655        trace!("initialized self.segments to {:?}", self.segments);
656
657        self.ordering = self
658            .segments
659            .iter()
660            .enumerate()
661            .filter_map(|(id, s)| {
662                if s.is_free() {
663                    None
664                } else {
665                    Some((s.lsn(), id as LogOffset * segment_size as LogOffset))
666                }
667            })
668            .collect();
669
670        trace!("initialized self.ordering to {:?}", self.ordering);
671
672        Ok(())
673    }
674
675    fn free_segment(&mut self, lid: LogOffset) -> Result<()> {
676        debug!("freeing segment {}", lid);
677        trace!("free list before free {:?}", self.free);
678        self.segment_cleaner.remove_pids(lid);
679
680        let idx = self.segment_id(lid);
681        assert!(
682            self.tip > lid,
683            "freed a segment at {} above our current file tip {}, \
684             please report this bug!",
685            lid,
686            self.tip,
687        );
688        assert!(self.segments[idx].is_free());
689        assert!(!self.free.contains(&lid), "double-free of a segment occurred");
690
691        self.free.insert(lid);
692
693        // remove the old ordering from our list
694        if let Segment::Free(Free { previous_lsn: Some(last_lsn) }) =
695            self.segments[idx]
696        {
697            trace!(
698                "removing segment {} with lsn {} from ordering",
699                lid,
700                last_lsn
701            );
702            self.ordering.remove(&last_lsn);
703        }
704
705        // we want to avoid aggressive truncation because it can cause
706        // blocking if we allocate a segment that was just truncated.
707        let laziness_factor = 1;
708
709        // truncate if possible
710        while self.tip != 0 && self.free.len() > laziness_factor {
711            let last_segment = self.tip - self.config.segment_size as LogOffset;
712            if self.free.contains(&last_segment) {
713                self.free.remove(&last_segment);
714                self.truncate(last_segment)?;
715            } else {
716                break;
717            }
718        }
719
720        Ok(())
721    }
722
723    /// Asynchronously apply a GC-related operation. Used in a flat-combining
724    /// style that allows callers to avoid blocking while sending these
725    /// messages to this module.
726    pub(super) fn apply_op(&mut self, op: &SegmentOp) -> Result<()> {
727        use SegmentOp::*;
728        match op {
729            Link { pid, cache_info } => self.mark_link(*pid, *cache_info),
730            Replace { pid, lsn, old_cache_infos, new_cache_info } => {
731                self.mark_replace(*pid, *lsn, old_cache_infos, *new_cache_info)?
732            }
733        }
734        Ok(())
735    }
736
737    /// Called by the `PageCache` when a page has been rewritten completely.
738    /// We mark all of the old segments that contained the previous state
739    /// from the page, and if the old segments are empty or clear enough to
740    /// begin accelerated cleaning we mark them as so.
741    pub(super) fn mark_replace(
742        &mut self,
743        pid: PageId,
744        lsn: Lsn,
745        old_cache_infos: &[CacheInfo],
746        new_cache_info: CacheInfo,
747    ) -> Result<()> {
748        let _measure = Measure::new(&M.accountant_mark_replace);
749
750        self.mark_link(pid, new_cache_info);
751
752        trace!(
753            "mark_replace pid {} from cache infos {:?} to cache info {:?} with lsn {}",
754            pid,
755            old_cache_infos,
756            new_cache_info,
757            self.config.normalize(lsn)
758        );
759
760        let new_idx = self.segment_id(new_cache_info.pointer.lid());
761
762        // Do we need to schedule any blob cleanups?
763        // Not if we just moved the pointer without changing
764        // the underlying blob, as is the case with a single Blob
765        // with nothing else.
766        let schedule_rm_blob = !(old_cache_infos.len() == 1
767            && old_cache_infos[0].pointer.is_blob());
768
769        let mut removals = FastMap8::default();
770
771        for old_cache_info in old_cache_infos {
772            let old_ptr = &old_cache_info.pointer;
773            let old_lid = old_ptr.lid();
774
775            if schedule_rm_blob && old_ptr.is_blob() {
776                trace!(
777                    "queueing blob removal for {} in our own segment",
778                    old_ptr
779                );
780                self.segments[new_idx]
781                    .remove_blob(old_ptr.blob().1, &self.config)?;
782            }
783
784            let old_idx = self.segment_id(old_lid);
785            let entry = removals.entry(old_idx).or_insert(0);
786            *entry += old_cache_info.log_size;
787        }
788
789        for (old_idx, replaced_size) in removals {
790            self.segments[old_idx].remove_pid(
791                pid,
792                self.config.normalize(lsn),
793                usize::try_from(replaced_size).unwrap(),
794            );
795            self.possibly_clean_or_free_segment(old_idx, new_cache_info.lsn)?;
796        }
797
798        Ok(())
799    }
800
801    /// Called from `PageCache` when some state has been added
802    /// to a logical page at a particular offset. We ensure the
803    /// page is present in the segment's page set.
804    pub(super) fn mark_link(&mut self, pid: PageId, cache_info: CacheInfo) {
805        let _measure = Measure::new(&M.accountant_mark_link);
806
807        trace!("mark_link pid {} at cache info {:?}", pid, cache_info);
808        let idx = self.segment_id(cache_info.pointer.lid());
809
810        let segment = &mut self.segments[idx];
811
812        let segment_lsn = cache_info.lsn / self.config.segment_size as Lsn
813            * self.config.segment_size as Lsn;
814
815        // a race happened, and our Lsn does not apply anymore
816        assert_eq!(
817            segment.lsn(),
818            segment_lsn,
819            "segment somehow got reused by the time a link was \
820             marked on it. expected lsn: {} actual: {}",
821            segment_lsn,
822            segment.lsn()
823        );
824
825        segment.insert_pid(
826            pid,
827            segment_lsn,
828            usize::try_from(cache_info.log_size).unwrap(),
829        );
830    }
831
832    fn possibly_clean_or_free_segment(
833        &mut self,
834        idx: usize,
835        lsn: Lsn,
836    ) -> Result<()> {
837        let cleanup_threshold = SEGMENT_CLEANUP_THRESHOLD;
838
839        let segment_start = (idx * self.config.segment_size) as LogOffset;
840
841        if let Segment::Inactive(inactive) = &mut self.segments[idx] {
842            let live_pct = inactive.rss * 100 / self.config.segment_size;
843
844            let can_drain = live_pct <= cleanup_threshold;
845
846            if can_drain {
847                // can be cleaned
848                trace!(
849                    "SA inserting {} into to_clean from possibly_clean_or_free_segment",
850                    segment_start
851                );
852                let to_clean = self.segments[idx].inactive_to_draining(lsn);
853                self.segment_cleaner.add_pids(segment_start, to_clean);
854            }
855        }
856
857        let segment_lsn = self.segments[idx].lsn();
858
859        if self.segments[idx].can_free() {
860            // can be reused immediately
861            let replacement_lsn = self.segments[idx].draining_to_free(lsn);
862
863            if self.ordering.contains_key(&replacement_lsn) {
864                let replacement_lid = self.ordering[&replacement_lsn];
865                let replacement_idx = usize::try_from(
866                    replacement_lid / self.config.segment_size as u64,
867                )
868                .unwrap();
869
870                if self.segments[replacement_idx].is_active() {
871                    trace!(
872                        "deferring free of segment {} in possibly_clean_or_free_segment",
873                        segment_start
874                    );
875                    self.segments[replacement_idx].defer_free_lsn(segment_lsn);
876                } else {
877                    assert!(replacement_lsn <= self.max_stabilized_lsn);
878                    self.free_segment(segment_start)?;
879                }
880            } else {
881                // replacement segment has already been freed, so we can
882                // go right to freeing this one too
883                self.free_segment(segment_start)?;
884            }
885        }
886
887        Ok(())
888    }
889
890    pub(super) fn stabilize(&mut self, stable_lsn: Lsn) -> Result<()> {
891        let segment_size = self.config.segment_size as Lsn;
892        let lsn = ((stable_lsn / segment_size) - 1) * segment_size;
893        trace!(
894            "stabilize({}), normalized: {}, last: {}",
895            stable_lsn,
896            lsn,
897            self.max_stabilized_lsn
898        );
899        if self.max_stabilized_lsn >= lsn {
900            trace!(
901                "expected stabilization lsn {} \
902                 to be greater than the previous value of {}",
903                lsn,
904                self.max_stabilized_lsn
905            );
906            return Ok(());
907        }
908
909        let bounds = (
910            std::ops::Bound::Excluded(self.max_stabilized_lsn),
911            std::ops::Bound::Included(lsn),
912        );
913
914        let can_deactivate = self
915            .ordering
916            .range(bounds)
917            .map(|(lsn, _lid)| *lsn)
918            .collect::<Vec<_>>();
919
920        self.max_stabilized_lsn = lsn;
921
922        for lsn in can_deactivate {
923            self.deactivate_segment(lsn)?;
924        }
925
926        Ok(())
927    }
928
929    /// Called after the trailer of a segment has been written to disk,
930    /// indicating that no more pids will be added to a segment. Moves
931    /// the segment into the Inactive state.
932    ///
933    /// # Panics
934    /// The provided lsn and lid must exactly match the existing segment.
935    fn deactivate_segment(&mut self, lsn: Lsn) -> Result<()> {
936        let lid = self.ordering[&lsn];
937        let idx = self.segment_id(lid);
938
939        trace!(
940            "deactivating segment with lid {} lsn {}: {:?}",
941            lid,
942            lsn,
943            self.segments[idx]
944        );
945
946        let freeable_segments = if self.segments[idx].is_active() {
947            self.segments[idx].active_to_inactive(lsn, &self.config)?
948        } else {
949            Default::default()
950        };
951
952        for lsn in freeable_segments {
953            let segment_start = self.ordering[&lsn];
954            assert_ne!(segment_start, lid);
955            self.free_segment(segment_start)?;
956        }
957
958        self.possibly_clean_or_free_segment(idx, lsn)?;
959
960        // if we have a lot of free segments in our whole file,
961        // let's start relocating the current tip to boil it down
962        let free_segs = self.segments.iter().filter(|s| s.is_free()).count();
963        let inactive_segs =
964            self.segments.iter().filter(|s| s.is_inactive()).count();
965        let free_ratio = (free_segs * 100) / (1 + free_segs + inactive_segs);
966
967        if free_ratio >= SEGMENT_CLEANUP_THRESHOLD && inactive_segs > 5 {
968            let last_index =
969                self.segments.iter().rposition(Segment::is_inactive).unwrap();
970
971            let segment_start =
972                (last_index * self.config.segment_size) as LogOffset;
973
974            let to_clean = self.segments[last_index].inactive_to_draining(lsn);
975            self.segment_cleaner.add_pids(segment_start, to_clean);
976        }
977
978        Ok(())
979    }
980
981    fn bump_tip(&mut self) -> Result<LogOffset> {
982        let lid = self.tip;
983
984        let truncations = self.async_truncations.split_off(&lid);
985
986        for (_at, truncation) in truncations {
987            match truncation.wait().unwrap() {
988                Ok(()) => {}
989                Err(error) => {
990                    error!("failed to shrink file: {:?}", error);
991                    return Err(error);
992                }
993            }
994        }
995
996        self.tip += self.config.segment_size as LogOffset;
997
998        trace!("advancing file tip from {} to {}", lid, self.tip);
999
1000        Ok(lid)
1001    }
1002
1003    /// Returns the next offset to write a new segment in.
1004    pub(super) fn next(&mut self, lsn: Lsn) -> Result<LogOffset> {
1005        let _measure = Measure::new(&M.accountant_next);
1006
1007        assert_eq!(
1008            lsn % self.config.segment_size as Lsn,
1009            0,
1010            "unaligned Lsn provided to next!"
1011        );
1012
1013        trace!("evaluating free list {:?} in SA::next", &self.free);
1014
1015        // pop free or add to end
1016        let safe = self.free.iter().next().copied();
1017
1018        let lid = if let Some(next) = safe {
1019            self.free.remove(&next);
1020            next
1021        } else {
1022            self.bump_tip()?
1023        };
1024
1025        // pin lsn to this segment
1026        let idx = self.segment_id(lid);
1027
1028        self.segments[idx].free_to_active(lsn);
1029
1030        self.ordering.insert(lsn, lid);
1031
1032        debug!(
1033            "segment accountant returning offset: {} for lsn {} \
1034             on deck: {:?}",
1035            lid, lsn, self.free,
1036        );
1037
1038        assert!(
1039            lsn >= Lsn::try_from(lid).unwrap(),
1040            "lsn {} should always be greater than or equal to lid {}",
1041            lsn,
1042            lid
1043        );
1044
1045        Ok(lid)
1046    }
1047
1048    /// Returns an iterator over a snapshot of current segment
1049    /// log sequence numbers and their corresponding file offsets.
1050    pub(super) fn segment_snapshot_iter_from(
1051        &mut self,
1052        lsn: Lsn,
1053    ) -> BTreeMap<Lsn, LogOffset> {
1054        assert!(
1055            !self.ordering.is_empty(),
1056            "expected ordering to have been initialized already"
1057        );
1058
1059        let normalized_lsn = self.config.normalize(lsn);
1060
1061        trace!(
1062            "generated iterator over {:?} where lsn >= {}",
1063            self.ordering,
1064            normalized_lsn
1065        );
1066
1067        self.ordering
1068            .iter()
1069            .filter_map(move |(l, r)| {
1070                if *l >= normalized_lsn {
1071                    Some((*l, *r))
1072                } else {
1073                    None
1074                }
1075            })
1076            .collect()
1077    }
1078
1079    // truncate the file to the desired length
1080    fn truncate(&mut self, at: LogOffset) -> Result<()> {
1081        trace!("asynchronously truncating file to length {}", at);
1082
1083        assert_eq!(
1084            at % self.config.segment_size as LogOffset,
1085            0,
1086            "new length must be io-buf-len aligned"
1087        );
1088
1089        self.tip = at;
1090
1091        assert!(!self.free.contains(&at), "double-free of a segment occurred");
1092
1093        let (completer, promise) = OneShot::pair();
1094
1095        let config = self.config.clone();
1096
1097        io_fail!(&config, "file truncation");
1098        threadpool::spawn(move || {
1099            debug!("truncating file to length {}", at);
1100            let res = config
1101                .file
1102                .set_len(at)
1103                .and_then(|_| config.file.sync_all())
1104                .map_err(|e| e.into());
1105            completer.fill(res);
1106        })?;
1107
1108        if self.async_truncations.insert(at, promise).is_some() {
1109            panic!(
1110                "somehow segment {} was truncated before \
1111                 the previous truncation completed",
1112                at
1113            );
1114        }
1115
1116        Ok(())
1117    }
1118
1119    fn segment_id(&mut self, lid: LogOffset) -> SegmentId {
1120        let idx = assert_usize(lid / self.config.segment_size as LogOffset);
1121
1122        // TODO never resize like this, make it a single
1123        // responsibility when the tip is bumped / truncated.
1124        if self.segments.len() < idx + 1 {
1125            self.segments.resize(idx + 1, Segment::default());
1126        }
1127
1128        idx
1129    }
1130}