1#![allow(unused_results)]
58
59use std::{collections::BTreeSet, mem};
60
61use super::PageState;
62
63use crate::pagecache::*;
64use crate::*;
65
66#[derive(Debug, Clone)]
68pub(crate) enum SegmentOp {
69 Link {
70 pid: PageId,
71 cache_info: CacheInfo,
72 },
73 Replace {
74 pid: PageId,
75 lsn: Lsn,
76 old_cache_infos: Vec<CacheInfo>,
77 new_cache_info: CacheInfo,
78 },
79}
80
81#[derive(Debug)]
85pub(crate) struct SegmentAccountant {
86 config: RunningConfig,
88
89 segments: Vec<Segment>,
91
92 free: BTreeSet<LogOffset>,
94 tip: LogOffset,
95 max_stabilized_lsn: Lsn,
96 segment_cleaner: SegmentCleaner,
97 ordering: BTreeMap<Lsn, LogOffset>,
98 async_truncations: BTreeMap<LogOffset, OneShot<Result<()>>>,
99}
100
101#[derive(Debug, Clone, Default)]
102pub(crate) struct SegmentCleaner {
103 inner: Arc<Mutex<BTreeMap<LogOffset, BTreeSet<PageId>>>>,
104}
105
106impl SegmentCleaner {
107 pub(crate) fn pop(&self) -> Option<(PageId, LogOffset)> {
108 let mut inner = self.inner.lock();
109 let offset = {
110 let (offset, pids) = inner.iter_mut().next()?;
111 if !pids.is_empty() {
112 let pid = pids.iter().next().copied().unwrap();
113 pids.remove(&pid);
114 return Some((pid, *offset));
115 }
116 *offset
117 };
118 inner.remove(&offset);
119 None
120 }
121
122 fn add_pids(&self, offset: LogOffset, pids: BTreeSet<PageId>) {
123 let mut inner = self.inner.lock();
124 let prev = inner.insert(offset, pids);
125 assert!(prev.is_none());
126 }
127
128 fn remove_pids(&self, offset: LogOffset) {
129 let mut inner = self.inner.lock();
130 inner.remove(&offset);
131 }
132}
133
134impl Drop for SegmentAccountant {
135 fn drop(&mut self) {
136 for segment in &self.segments {
137 let segment_utilization = match segment {
138 Segment::Free(_) | Segment::Draining(_) => 0,
139 Segment::Active(Active { rss, .. })
140 | Segment::Inactive(Inactive { rss, .. }) => *rss,
141 };
142 M.segment_utilization_shutdown.measure(segment_utilization as u64);
143 }
144 }
145}
146
147#[derive(Debug, Clone, Default)]
148struct Free {
149 previous_lsn: Option<Lsn>,
150}
151
152#[derive(Debug, Clone, Default)]
153struct Active {
154 lsn: Lsn,
155 rss: usize,
156 deferred_replaced_rss: usize,
157 deferred_replaced_pids: BTreeSet<PageId>,
158 pids: BTreeSet<PageId>,
159 latest_replacement_lsn: Lsn,
160 can_free_upon_deactivation: FastSet8<Lsn>,
161 deferred_rm_blob: FastSet8<BlobPointer>,
162}
163
164#[derive(Debug, Clone, Default)]
165struct Inactive {
166 lsn: Lsn,
167 rss: usize,
168 pids: BTreeSet<PageId>,
169 max_pids: usize,
170 replaced_pids: usize,
171 latest_replacement_lsn: Lsn,
172}
173
174#[derive(Debug, Clone, Default)]
175struct Draining {
176 lsn: Lsn,
177 max_pids: usize,
178 replaced_pids: usize,
179 latest_replacement_lsn: Lsn,
180}
181
182#[derive(Clone, Debug)]
188enum Segment {
189 Free(Free),
192
193 Active(Active),
196
197 Inactive(Inactive),
200
201 Draining(Draining),
204}
205
206impl Default for Segment {
207 fn default() -> Self {
208 Segment::Free(Free { previous_lsn: None })
209 }
210}
211
212impl Segment {
213 fn is_free(&self) -> bool {
214 if let Segment::Free(_) = self {
215 true
216 } else {
217 false
218 }
219 }
220
221 fn is_active(&self) -> bool {
222 if let Segment::Active { .. } = self {
223 true
224 } else {
225 false
226 }
227 }
228
229 fn is_inactive(&self) -> bool {
230 if let Segment::Inactive { .. } = self {
231 true
232 } else {
233 false
234 }
235 }
236
237 fn free_to_active(&mut self, new_lsn: Lsn) {
238 trace!("setting Segment to Active with new lsn {:?}", new_lsn,);
239 assert!(self.is_free());
240
241 *self = Segment::Active(Active {
242 lsn: new_lsn,
243 rss: 0,
244 deferred_replaced_rss: 0,
245 deferred_replaced_pids: BTreeSet::default(),
246 pids: BTreeSet::default(),
247 latest_replacement_lsn: 0,
248 can_free_upon_deactivation: FastSet8::default(),
249 deferred_rm_blob: FastSet8::default(),
250 })
251 }
252
253 fn active_to_inactive(
257 &mut self,
258 lsn: Lsn,
259 config: &Config,
260 ) -> Result<FastSet8<Lsn>> {
261 trace!("setting Segment with lsn {:?} to Inactive", self.lsn());
262
263 let (inactive, ret) = if let Segment::Active(active) = self {
264 assert!(lsn >= active.lsn);
265
266 for ptr in &active.deferred_rm_blob {
268 trace!(
269 "removing blob {} while transitioning \
270 segment lsn {:?} to Inactive",
271 ptr,
272 active.lsn,
273 );
274 remove_blob(*ptr, config)?;
275 }
276
277 let inactive = Segment::Inactive(Inactive {
278 lsn: active.lsn,
279 rss: active
280 .rss
281 .checked_sub(active.deferred_replaced_rss)
282 .unwrap(),
283 max_pids: active.pids.len(),
284 replaced_pids: active.deferred_replaced_pids.len(),
285 pids: &active.pids - &active.deferred_replaced_pids,
286 latest_replacement_lsn: active.latest_replacement_lsn,
287 });
288
289 let can_free = mem::replace(
290 &mut active.can_free_upon_deactivation,
291 Default::default(),
292 );
293
294 (inactive, can_free)
295 } else {
296 panic!("called active_to_inactive on {:?}", self);
297 };
298
299 *self = inactive;
300 Ok(ret)
301 }
302
303 fn inactive_to_draining(&mut self, lsn: Lsn) -> BTreeSet<PageId> {
304 trace!("setting Segment with lsn {:?} to Draining", self.lsn());
305
306 if let Segment::Inactive(inactive) = self {
307 assert!(lsn >= inactive.lsn);
308 let ret = mem::replace(&mut inactive.pids, Default::default());
309 *self = Segment::Draining(Draining {
310 lsn: inactive.lsn,
311 max_pids: inactive.max_pids,
312 replaced_pids: inactive.replaced_pids,
313 latest_replacement_lsn: inactive.latest_replacement_lsn,
314 });
315 ret
316 } else {
317 panic!("called inactive_to_draining on {:?}", self);
318 }
319 }
320
321 fn defer_free_lsn(&mut self, lsn: Lsn) {
322 if let Segment::Active(active) = self {
323 active.can_free_upon_deactivation.insert(lsn);
324 } else {
325 panic!("called defer_free_lsn on segment {:?}", self);
326 }
327 }
328
329 fn draining_to_free(&mut self, lsn: Lsn) -> Lsn {
330 trace!("setting Segment with lsn {:?} to Free", self.lsn());
331
332 if let Segment::Draining(draining) = self {
333 let old_lsn = draining.lsn;
334 assert!(lsn >= old_lsn);
335 let replacement_lsn = draining.latest_replacement_lsn;
336 *self = Segment::Free(Free { previous_lsn: Some(old_lsn) });
337 replacement_lsn
338 } else {
339 panic!("called draining_to_free on {:?}", self);
340 }
341 }
342
343 fn recovery_ensure_initialized(&mut self, lsn: Lsn) {
344 if self.is_free() {
345 trace!("(snapshot) recovering segment with base lsn {}", lsn);
346 self.free_to_active(lsn);
347 }
348 }
349
350 fn lsn(&self) -> Lsn {
351 match self {
352 Segment::Active(Active { lsn, .. })
353 | Segment::Inactive(Inactive { lsn, .. })
354 | Segment::Draining(Draining { lsn, .. }) => *lsn,
355 Segment::Free(_) => panic!("called lsn on Segment::Free"),
356 }
357 }
358
359 fn insert_pid(&mut self, pid: PageId, lsn: Lsn, size: usize) {
362 if let Segment::Active(active) = self {
367 assert_eq!(
368 lsn, active.lsn,
369 "insert_pid specified lsn {} for pid {} in segment {:?}",
370 lsn, pid, active
371 );
372 active.pids.insert(pid);
373 active.rss += size;
374 } else {
375 panic!("called insert_pid on {:?}", self);
376 }
377 }
378
379 fn remove_pid(&mut self, pid: PageId, replacement_lsn: Lsn, sz: usize) {
380 trace!(
381 "removing pid {} at lsn {} from segment {:?}",
382 pid,
383 replacement_lsn,
384 self
385 );
386 match self {
387 Segment::Active(active) => {
388 assert!(active.lsn <= replacement_lsn);
389 if replacement_lsn != active.lsn {
390 active.deferred_replaced_pids.insert(pid);
391 }
392 active.deferred_replaced_rss += sz;
393 if replacement_lsn > active.latest_replacement_lsn {
394 active.latest_replacement_lsn = replacement_lsn;
395 }
396 }
397 Segment::Inactive(Inactive {
398 pids,
399 lsn,
400 rss,
401 latest_replacement_lsn,
402 replaced_pids,
403 ..
404 }) => {
405 assert!(*lsn <= replacement_lsn);
406 if replacement_lsn != *lsn {
407 pids.remove(&pid);
408 *replaced_pids += 1;
409 }
410 *rss = rss.checked_sub(sz).unwrap();
411 if replacement_lsn > *latest_replacement_lsn {
412 *latest_replacement_lsn = replacement_lsn;
413 }
414 }
415 Segment::Draining(Draining {
416 lsn,
417 latest_replacement_lsn,
418 replaced_pids,
419 ..
420 }) => {
421 assert!(*lsn <= replacement_lsn);
422 if replacement_lsn != *lsn {
423 *replaced_pids += 1;
424 }
425 if replacement_lsn > *latest_replacement_lsn {
426 *latest_replacement_lsn = replacement_lsn;
427 }
428 }
429 Segment::Free(_) => {
430 panic!("called remove pid {} on Segment::Free", pid)
431 }
432 }
433 }
434
435 fn remove_blob(
436 &mut self,
437 blob_ptr: BlobPointer,
438 config: &Config,
439 ) -> Result<()> {
440 match self {
441 Segment::Active(active) => {
442 active.deferred_rm_blob.insert(blob_ptr);
446 }
447 Segment::Inactive(_) | Segment::Draining(_) => {
448 trace!(
449 "directly removing blob {} that was referred-to \
450 in a segment that has already been marked as Inactive \
451 or Draining.",
452 blob_ptr,
453 );
454 remove_blob(blob_ptr, config)?;
455 }
456 Segment::Free(_) => panic!("remove_blob called on a Free Segment"),
457 }
458
459 Ok(())
460 }
461
462 fn can_free(&self) -> bool {
463 if let Segment::Draining(draining) = self {
464 draining.replaced_pids == draining.max_pids
465 } else {
466 false
467 }
468 }
469}
470
471impl SegmentAccountant {
472 pub(super) fn start(
474 config: RunningConfig,
475 snapshot: &Snapshot,
476 segment_cleaner: SegmentCleaner,
477 ) -> Result<Self> {
478 let _measure = Measure::new(&M.start_segment_accountant);
479 let mut ret = Self {
480 config,
481 segments: vec![],
482 free: BTreeSet::default(),
483 tip: 0,
484 max_stabilized_lsn: -1,
485 segment_cleaner,
486 ordering: BTreeMap::default(),
487 async_truncations: BTreeMap::default(),
488 };
489
490 ret.initialize_from_snapshot(snapshot)?;
491
492 if let Some(max_free) = ret.free.iter().max() {
493 assert!(
494 ret.tip > *max_free,
495 "expected recovered tip {} to \
496 be above max item in recovered \
497 free list {:?}",
498 ret.tip,
499 ret.free
500 );
501 }
502
503 debug!(
504 "SA starting with tip {} stable {} free {:?}",
505 ret.tip, ret.max_stabilized_lsn, ret.free,
506 );
507
508 Ok(ret)
509 }
510
511 fn initial_segments(&self, snapshot: &Snapshot) -> Result<Vec<Segment>> {
512 let segment_size = self.config.segment_size;
513 let file_len = self.config.file.metadata()?.len();
514 let number_of_segments =
515 usize::try_from(file_len / segment_size as u64).unwrap()
516 + if file_len % segment_size as u64 == 0 { 0 } else { 1 };
517
518 let mut segments = vec![Segment::default(); number_of_segments];
520
521 if let Some(tip_lid) = snapshot.active_segment {
524 let tip_idx =
525 usize::try_from(tip_lid / segment_size as LogOffset).unwrap();
526 if tip_idx == number_of_segments {
527 segments.push(Segment::default());
528 }
529 if segments.len() <= tip_idx {
530 error!("failed to properly initialize segments, suspected disk corruption");
531 return Err(Error::corruption(None));
532 }
533 trace!(
534 "setting segment for tip_lid {} to stable_lsn {}",
535 tip_lid,
536 self.config.normalize(snapshot.stable_lsn.unwrap_or(0))
537 );
538 segments[tip_idx].recovery_ensure_initialized(
539 self.config.normalize(snapshot.stable_lsn.unwrap_or(0)),
540 );
541 }
542
543 let add =
544 |pid, lsn: Lsn, sz, lid: LogOffset, segments: &mut Vec<Segment>| {
545 let idx = assert_usize(lid / segment_size as LogOffset);
546 trace!(
547 "adding lsn: {} lid: {} sz: {} for \
548 pid {} to segment {} during SA recovery",
549 lsn,
550 lid,
551 sz,
552 pid,
553 idx
554 );
555 let segment_lsn = self.config.normalize(lsn);
556 segments[idx].recovery_ensure_initialized(segment_lsn);
557 segments[idx].insert_pid(
558 pid,
559 segment_lsn,
560 usize::try_from(sz).unwrap(),
561 );
562 };
563
564 for (pid, state) in snapshot.pt.iter().enumerate() {
565 match state {
566 PageState::Present { base, frags } => {
567 add(
568 pid as PageId,
569 base.0,
570 base.2,
571 base.1.lid(),
572 &mut segments,
573 );
574 for (lsn, ptr, sz) in frags {
575 add(pid as PageId, *lsn, *sz, ptr.lid(), &mut segments);
576 }
577 }
578 PageState::Free(lsn, ptr) => {
579 add(
580 pid as PageId,
581 *lsn,
582 u64::try_from(MAX_MSG_HEADER_LEN).unwrap(),
583 ptr.lid(),
584 &mut segments,
585 );
586 }
587 _ => panic!("tried to recover pagestate from a {:?}", state),
588 }
589 }
590
591 Ok(segments)
592 }
593
594 fn initialize_from_snapshot(&mut self, snapshot: &Snapshot) -> Result<()> {
595 let segment_size = self.config.segment_size;
596 let segments = self.initial_segments(snapshot)?;
597
598 self.segments = segments;
599
600 let mut to_free = vec![];
601 let mut maybe_clean = vec![];
602
603 let currently_active_segment = snapshot
604 .active_segment
605 .map(|tl| usize::try_from(tl / segment_size as LogOffset).unwrap());
606
607 for (idx, segment) in self.segments.iter_mut().enumerate() {
608 let segment_base = idx as LogOffset * segment_size as LogOffset;
609
610 if segment_base >= self.tip {
611 self.tip = segment_base + segment_size as LogOffset;
613 trace!(
614 "raised self.tip to {} during SA initialization",
615 self.tip
616 );
617 }
618
619 if segment.is_free() {
620 to_free.push(segment_base);
623 continue;
624 }
625
626 let segment_lsn = segment.lsn();
627
628 if let Some(tip_idx) = currently_active_segment {
629 if tip_idx != idx {
630 maybe_clean.push((idx, segment_lsn));
631 }
632 }
633 }
634
635 for segment_base in to_free {
636 self.free_segment(segment_base)?;
637 io_fail!(self.config, "zero garbage segment SA");
638 pwrite_all(
639 &self.config.file,
640 &*vec![MessageKind::Corrupted.into(); self.config.segment_size],
641 segment_base,
642 )?;
643 }
644
645 for (_, promise) in self.async_truncations.split_off(&0) {
648 promise.wait().expect("threadpool should not crash")?;
649 }
650
651 for (idx, segment_lsn) in maybe_clean {
652 self.possibly_clean_or_free_segment(idx, segment_lsn)?;
653 }
654
655 trace!("initialized self.segments to {:?}", self.segments);
656
657 self.ordering = self
658 .segments
659 .iter()
660 .enumerate()
661 .filter_map(|(id, s)| {
662 if s.is_free() {
663 None
664 } else {
665 Some((s.lsn(), id as LogOffset * segment_size as LogOffset))
666 }
667 })
668 .collect();
669
670 trace!("initialized self.ordering to {:?}", self.ordering);
671
672 Ok(())
673 }
674
675 fn free_segment(&mut self, lid: LogOffset) -> Result<()> {
676 debug!("freeing segment {}", lid);
677 trace!("free list before free {:?}", self.free);
678 self.segment_cleaner.remove_pids(lid);
679
680 let idx = self.segment_id(lid);
681 assert!(
682 self.tip > lid,
683 "freed a segment at {} above our current file tip {}, \
684 please report this bug!",
685 lid,
686 self.tip,
687 );
688 assert!(self.segments[idx].is_free());
689 assert!(!self.free.contains(&lid), "double-free of a segment occurred");
690
691 self.free.insert(lid);
692
693 if let Segment::Free(Free { previous_lsn: Some(last_lsn) }) =
695 self.segments[idx]
696 {
697 trace!(
698 "removing segment {} with lsn {} from ordering",
699 lid,
700 last_lsn
701 );
702 self.ordering.remove(&last_lsn);
703 }
704
705 let laziness_factor = 1;
708
709 while self.tip != 0 && self.free.len() > laziness_factor {
711 let last_segment = self.tip - self.config.segment_size as LogOffset;
712 if self.free.contains(&last_segment) {
713 self.free.remove(&last_segment);
714 self.truncate(last_segment)?;
715 } else {
716 break;
717 }
718 }
719
720 Ok(())
721 }
722
723 pub(super) fn apply_op(&mut self, op: &SegmentOp) -> Result<()> {
727 use SegmentOp::*;
728 match op {
729 Link { pid, cache_info } => self.mark_link(*pid, *cache_info),
730 Replace { pid, lsn, old_cache_infos, new_cache_info } => {
731 self.mark_replace(*pid, *lsn, old_cache_infos, *new_cache_info)?
732 }
733 }
734 Ok(())
735 }
736
737 pub(super) fn mark_replace(
742 &mut self,
743 pid: PageId,
744 lsn: Lsn,
745 old_cache_infos: &[CacheInfo],
746 new_cache_info: CacheInfo,
747 ) -> Result<()> {
748 let _measure = Measure::new(&M.accountant_mark_replace);
749
750 self.mark_link(pid, new_cache_info);
751
752 trace!(
753 "mark_replace pid {} from cache infos {:?} to cache info {:?} with lsn {}",
754 pid,
755 old_cache_infos,
756 new_cache_info,
757 self.config.normalize(lsn)
758 );
759
760 let new_idx = self.segment_id(new_cache_info.pointer.lid());
761
762 let schedule_rm_blob = !(old_cache_infos.len() == 1
767 && old_cache_infos[0].pointer.is_blob());
768
769 let mut removals = FastMap8::default();
770
771 for old_cache_info in old_cache_infos {
772 let old_ptr = &old_cache_info.pointer;
773 let old_lid = old_ptr.lid();
774
775 if schedule_rm_blob && old_ptr.is_blob() {
776 trace!(
777 "queueing blob removal for {} in our own segment",
778 old_ptr
779 );
780 self.segments[new_idx]
781 .remove_blob(old_ptr.blob().1, &self.config)?;
782 }
783
784 let old_idx = self.segment_id(old_lid);
785 let entry = removals.entry(old_idx).or_insert(0);
786 *entry += old_cache_info.log_size;
787 }
788
789 for (old_idx, replaced_size) in removals {
790 self.segments[old_idx].remove_pid(
791 pid,
792 self.config.normalize(lsn),
793 usize::try_from(replaced_size).unwrap(),
794 );
795 self.possibly_clean_or_free_segment(old_idx, new_cache_info.lsn)?;
796 }
797
798 Ok(())
799 }
800
801 pub(super) fn mark_link(&mut self, pid: PageId, cache_info: CacheInfo) {
805 let _measure = Measure::new(&M.accountant_mark_link);
806
807 trace!("mark_link pid {} at cache info {:?}", pid, cache_info);
808 let idx = self.segment_id(cache_info.pointer.lid());
809
810 let segment = &mut self.segments[idx];
811
812 let segment_lsn = cache_info.lsn / self.config.segment_size as Lsn
813 * self.config.segment_size as Lsn;
814
815 assert_eq!(
817 segment.lsn(),
818 segment_lsn,
819 "segment somehow got reused by the time a link was \
820 marked on it. expected lsn: {} actual: {}",
821 segment_lsn,
822 segment.lsn()
823 );
824
825 segment.insert_pid(
826 pid,
827 segment_lsn,
828 usize::try_from(cache_info.log_size).unwrap(),
829 );
830 }
831
832 fn possibly_clean_or_free_segment(
833 &mut self,
834 idx: usize,
835 lsn: Lsn,
836 ) -> Result<()> {
837 let cleanup_threshold = SEGMENT_CLEANUP_THRESHOLD;
838
839 let segment_start = (idx * self.config.segment_size) as LogOffset;
840
841 if let Segment::Inactive(inactive) = &mut self.segments[idx] {
842 let live_pct = inactive.rss * 100 / self.config.segment_size;
843
844 let can_drain = live_pct <= cleanup_threshold;
845
846 if can_drain {
847 trace!(
849 "SA inserting {} into to_clean from possibly_clean_or_free_segment",
850 segment_start
851 );
852 let to_clean = self.segments[idx].inactive_to_draining(lsn);
853 self.segment_cleaner.add_pids(segment_start, to_clean);
854 }
855 }
856
857 let segment_lsn = self.segments[idx].lsn();
858
859 if self.segments[idx].can_free() {
860 let replacement_lsn = self.segments[idx].draining_to_free(lsn);
862
863 if self.ordering.contains_key(&replacement_lsn) {
864 let replacement_lid = self.ordering[&replacement_lsn];
865 let replacement_idx = usize::try_from(
866 replacement_lid / self.config.segment_size as u64,
867 )
868 .unwrap();
869
870 if self.segments[replacement_idx].is_active() {
871 trace!(
872 "deferring free of segment {} in possibly_clean_or_free_segment",
873 segment_start
874 );
875 self.segments[replacement_idx].defer_free_lsn(segment_lsn);
876 } else {
877 assert!(replacement_lsn <= self.max_stabilized_lsn);
878 self.free_segment(segment_start)?;
879 }
880 } else {
881 self.free_segment(segment_start)?;
884 }
885 }
886
887 Ok(())
888 }
889
890 pub(super) fn stabilize(&mut self, stable_lsn: Lsn) -> Result<()> {
891 let segment_size = self.config.segment_size as Lsn;
892 let lsn = ((stable_lsn / segment_size) - 1) * segment_size;
893 trace!(
894 "stabilize({}), normalized: {}, last: {}",
895 stable_lsn,
896 lsn,
897 self.max_stabilized_lsn
898 );
899 if self.max_stabilized_lsn >= lsn {
900 trace!(
901 "expected stabilization lsn {} \
902 to be greater than the previous value of {}",
903 lsn,
904 self.max_stabilized_lsn
905 );
906 return Ok(());
907 }
908
909 let bounds = (
910 std::ops::Bound::Excluded(self.max_stabilized_lsn),
911 std::ops::Bound::Included(lsn),
912 );
913
914 let can_deactivate = self
915 .ordering
916 .range(bounds)
917 .map(|(lsn, _lid)| *lsn)
918 .collect::<Vec<_>>();
919
920 self.max_stabilized_lsn = lsn;
921
922 for lsn in can_deactivate {
923 self.deactivate_segment(lsn)?;
924 }
925
926 Ok(())
927 }
928
929 fn deactivate_segment(&mut self, lsn: Lsn) -> Result<()> {
936 let lid = self.ordering[&lsn];
937 let idx = self.segment_id(lid);
938
939 trace!(
940 "deactivating segment with lid {} lsn {}: {:?}",
941 lid,
942 lsn,
943 self.segments[idx]
944 );
945
946 let freeable_segments = if self.segments[idx].is_active() {
947 self.segments[idx].active_to_inactive(lsn, &self.config)?
948 } else {
949 Default::default()
950 };
951
952 for lsn in freeable_segments {
953 let segment_start = self.ordering[&lsn];
954 assert_ne!(segment_start, lid);
955 self.free_segment(segment_start)?;
956 }
957
958 self.possibly_clean_or_free_segment(idx, lsn)?;
959
960 let free_segs = self.segments.iter().filter(|s| s.is_free()).count();
963 let inactive_segs =
964 self.segments.iter().filter(|s| s.is_inactive()).count();
965 let free_ratio = (free_segs * 100) / (1 + free_segs + inactive_segs);
966
967 if free_ratio >= SEGMENT_CLEANUP_THRESHOLD && inactive_segs > 5 {
968 let last_index =
969 self.segments.iter().rposition(Segment::is_inactive).unwrap();
970
971 let segment_start =
972 (last_index * self.config.segment_size) as LogOffset;
973
974 let to_clean = self.segments[last_index].inactive_to_draining(lsn);
975 self.segment_cleaner.add_pids(segment_start, to_clean);
976 }
977
978 Ok(())
979 }
980
981 fn bump_tip(&mut self) -> Result<LogOffset> {
982 let lid = self.tip;
983
984 let truncations = self.async_truncations.split_off(&lid);
985
986 for (_at, truncation) in truncations {
987 match truncation.wait().unwrap() {
988 Ok(()) => {}
989 Err(error) => {
990 error!("failed to shrink file: {:?}", error);
991 return Err(error);
992 }
993 }
994 }
995
996 self.tip += self.config.segment_size as LogOffset;
997
998 trace!("advancing file tip from {} to {}", lid, self.tip);
999
1000 Ok(lid)
1001 }
1002
1003 pub(super) fn next(&mut self, lsn: Lsn) -> Result<LogOffset> {
1005 let _measure = Measure::new(&M.accountant_next);
1006
1007 assert_eq!(
1008 lsn % self.config.segment_size as Lsn,
1009 0,
1010 "unaligned Lsn provided to next!"
1011 );
1012
1013 trace!("evaluating free list {:?} in SA::next", &self.free);
1014
1015 let safe = self.free.iter().next().copied();
1017
1018 let lid = if let Some(next) = safe {
1019 self.free.remove(&next);
1020 next
1021 } else {
1022 self.bump_tip()?
1023 };
1024
1025 let idx = self.segment_id(lid);
1027
1028 self.segments[idx].free_to_active(lsn);
1029
1030 self.ordering.insert(lsn, lid);
1031
1032 debug!(
1033 "segment accountant returning offset: {} for lsn {} \
1034 on deck: {:?}",
1035 lid, lsn, self.free,
1036 );
1037
1038 assert!(
1039 lsn >= Lsn::try_from(lid).unwrap(),
1040 "lsn {} should always be greater than or equal to lid {}",
1041 lsn,
1042 lid
1043 );
1044
1045 Ok(lid)
1046 }
1047
1048 pub(super) fn segment_snapshot_iter_from(
1051 &mut self,
1052 lsn: Lsn,
1053 ) -> BTreeMap<Lsn, LogOffset> {
1054 assert!(
1055 !self.ordering.is_empty(),
1056 "expected ordering to have been initialized already"
1057 );
1058
1059 let normalized_lsn = self.config.normalize(lsn);
1060
1061 trace!(
1062 "generated iterator over {:?} where lsn >= {}",
1063 self.ordering,
1064 normalized_lsn
1065 );
1066
1067 self.ordering
1068 .iter()
1069 .filter_map(move |(l, r)| {
1070 if *l >= normalized_lsn {
1071 Some((*l, *r))
1072 } else {
1073 None
1074 }
1075 })
1076 .collect()
1077 }
1078
1079 fn truncate(&mut self, at: LogOffset) -> Result<()> {
1081 trace!("asynchronously truncating file to length {}", at);
1082
1083 assert_eq!(
1084 at % self.config.segment_size as LogOffset,
1085 0,
1086 "new length must be io-buf-len aligned"
1087 );
1088
1089 self.tip = at;
1090
1091 assert!(!self.free.contains(&at), "double-free of a segment occurred");
1092
1093 let (completer, promise) = OneShot::pair();
1094
1095 let config = self.config.clone();
1096
1097 io_fail!(&config, "file truncation");
1098 threadpool::spawn(move || {
1099 debug!("truncating file to length {}", at);
1100 let res = config
1101 .file
1102 .set_len(at)
1103 .and_then(|_| config.file.sync_all())
1104 .map_err(|e| e.into());
1105 completer.fill(res);
1106 })?;
1107
1108 if self.async_truncations.insert(at, promise).is_some() {
1109 panic!(
1110 "somehow segment {} was truncated before \
1111 the previous truncation completed",
1112 at
1113 );
1114 }
1115
1116 Ok(())
1117 }
1118
1119 fn segment_id(&mut self, lid: LogOffset) -> SegmentId {
1120 let idx = assert_usize(lid / self.config.segment_size as LogOffset);
1121
1122 if self.segments.len() < idx + 1 {
1125 self.segments.resize(idx + 1, Segment::default());
1126 }
1127
1128 idx
1129 }
1130}