sled/pagecache/
iobuf.rs

1use std::{
2    alloc::{alloc, dealloc, Layout},
3    cell::UnsafeCell,
4    sync::atomic::AtomicPtr,
5};
6
7use crate::{pagecache::*, *};
8
9macro_rules! io_fail {
10    ($self:expr, $e:expr) => {
11        #[cfg(feature = "failpoints")]
12        {
13            debug_delay();
14            if crate::fail::is_active($e) {
15                $self.config.set_global_error(Error::FailPoint);
16                // wake up any waiting threads so they don't stall forever
17                let _mu = $self.intervals.lock();
18
19                // having held the mutex makes this linearized
20                // with the notify below.
21                drop(_mu);
22
23                let _notified = $self.interval_updated.notify_all();
24                return Err(Error::FailPoint);
25            }
26        };
27    };
28}
29
30struct AlignedBuf(*mut u8, usize);
31
32#[allow(unsafe_code)]
33unsafe impl Send for AlignedBuf {}
34
35#[allow(unsafe_code)]
36unsafe impl Sync for AlignedBuf {}
37
38impl AlignedBuf {
39    fn new(len: usize) -> AlignedBuf {
40        let layout = Layout::from_size_align(len, 8192).unwrap();
41        let ptr = unsafe { alloc(layout) };
42
43        assert!(!ptr.is_null(), "failed to allocate critical IO buffer");
44
45        AlignedBuf(ptr, len)
46    }
47}
48
49impl Drop for AlignedBuf {
50    fn drop(&mut self) {
51        let layout = Layout::from_size_align(self.1, 8192).unwrap();
52        unsafe {
53            dealloc(self.0, layout);
54        }
55    }
56}
57
58pub(crate) struct IoBuf {
59    buf: Arc<UnsafeCell<AlignedBuf>>,
60    header: CachePadded<AtomicU64>,
61    base: usize,
62    pub offset: LogOffset,
63    pub lsn: Lsn,
64    pub capacity: usize,
65    stored_max_stable_lsn: Lsn,
66}
67
68#[allow(unsafe_code)]
69unsafe impl Sync for IoBuf {}
70
71#[allow(unsafe_code)]
72unsafe impl Send for IoBuf {}
73
74impl IoBuf {
75    /// # Safety
76    ///
77    /// This operation provides access to a mutable buffer of
78    /// uninitialized memory. For this to be correct, we must
79    /// ensure that:
80    /// 1. overlapping mutable slices are never created.
81    /// 2. a read to any subslice of this slice only happens
82    ///    after a write has initialized that memory
83    ///
84    /// It is intended that the log reservation code guarantees
85    /// that no two `Reservation` objects will hold overlapping
86    /// mutable slices to our io buffer.
87    ///
88    /// It is intended that the `write_to_log` function only
89    /// tries to write initialized bytes to the underlying storage.
90    ///
91    /// It is intended that the `write_to_log` function will
92    /// initialize any yet-to-be-initialized bytes before writing
93    /// the buffer to storage. #1040 added logic that was intended
94    /// to meet this requirement.
95    ///
96    /// The safety of this method was discussed in #1044.
97    pub(crate) fn get_mut_range(
98        &self,
99        at: usize,
100        len: usize,
101    ) -> &'static mut [u8] {
102        let buf_ptr = self.buf.get();
103
104        unsafe {
105            assert!((*buf_ptr).1 >= at + len);
106            std::slice::from_raw_parts_mut(
107                (*buf_ptr).0.add(self.base + at),
108                len,
109            )
110        }
111    }
112
113    // This is called upon the initialization of a fresh segment.
114    // We write a new segment header to the beginning of the buffer
115    // for assistance during recovery. The caller is responsible
116    // for ensuring that the IoBuf's capacity has been set properly.
117    fn store_segment_header(
118        &mut self,
119        last: Header,
120        lsn: Lsn,
121        max_stable_lsn: Lsn,
122    ) {
123        debug!("storing lsn {} in beginning of buffer", lsn);
124        assert!(self.capacity >= SEG_HEADER_LEN);
125
126        self.stored_max_stable_lsn = max_stable_lsn;
127
128        self.lsn = lsn;
129
130        let header = SegmentHeader { lsn, max_stable_lsn, ok: true };
131        let header_bytes: [u8; SEG_HEADER_LEN] = header.into();
132
133        #[allow(unsafe_code)]
134        unsafe {
135            std::ptr::copy_nonoverlapping(
136                header_bytes.as_ptr(),
137                (*self.buf.get()).0,
138                SEG_HEADER_LEN,
139            );
140        }
141
142        // ensure writes to the buffer land after our header.
143        let last_salt = header::salt(last);
144        let new_salt = header::bump_salt(last_salt);
145        let bumped = header::bump_offset(new_salt, SEG_HEADER_LEN);
146        self.set_header(bumped);
147    }
148
149    pub(crate) fn get_header(&self) -> Header {
150        debug_delay();
151        self.header.load(Acquire)
152    }
153
154    pub(crate) fn set_header(&self, new: Header) {
155        debug_delay();
156        self.header.store(new, Release);
157    }
158
159    pub(crate) fn cas_header(
160        &self,
161        old: Header,
162        new: Header,
163    ) -> std::result::Result<Header, Header> {
164        debug_delay();
165        let res = self.header.compare_and_swap(old, new, SeqCst);
166        if res == old {
167            Ok(new)
168        } else {
169            Err(res)
170        }
171    }
172}
173
174#[derive(Debug)]
175pub(crate) struct StabilityIntervals {
176    fsynced_ranges: Vec<(Lsn, Lsn)>,
177    batches: BTreeMap<Lsn, Lsn>,
178    stable_lsn: Lsn,
179}
180
181impl StabilityIntervals {
182    fn new(lsn: Lsn) -> StabilityIntervals {
183        StabilityIntervals {
184            stable_lsn: lsn,
185            fsynced_ranges: vec![],
186            batches: BTreeMap::default(),
187        }
188    }
189
190    pub(crate) fn mark_batch(&mut self, interval: (Lsn, Lsn)) {
191        assert!(interval.0 > self.stable_lsn);
192        self.batches.insert(interval.0, interval.1);
193    }
194
195    fn mark_fsync(&mut self, interval: (Lsn, Lsn)) -> Option<Lsn> {
196        trace!(
197            "pushing interval {:?} into fsynced_ranges {:?}",
198            interval,
199            self.fsynced_ranges
200        );
201        if let Some((low, high)) = self.fsynced_ranges.last_mut() {
202            if *low == interval.1 + 1 {
203                *low = interval.0
204            } else if *high + 1 == interval.0 {
205                *high = interval.1
206            } else {
207                self.fsynced_ranges.push(interval);
208            }
209        } else {
210            self.fsynced_ranges.push(interval);
211        }
212
213        #[cfg(any(test, feature = "event_log", feature = "lock_free_delays"))]
214        assert!(
215            self.fsynced_ranges.len() < 10000,
216            "intervals is getting strangely long... {:?}",
217            self
218        );
219
220        // reverse sort
221        self.fsynced_ranges
222            .sort_unstable_by_key(|&range| std::cmp::Reverse(range));
223
224        while let Some(&(low, high)) = self.fsynced_ranges.last() {
225            assert!(low <= high);
226            let cur_stable = self.stable_lsn;
227            assert!(
228                low > cur_stable,
229                "somehow, we marked offset {} stable while \
230                 interval {}-{} had not yet been applied!",
231                cur_stable,
232                low,
233                high
234            );
235            if cur_stable + 1 == low {
236                debug!("new highest interval: {} - {}", low, high);
237                self.fsynced_ranges.pop().unwrap();
238                self.stable_lsn = high;
239            } else {
240                break;
241            }
242        }
243
244        let mut batch_stable_lsn = None;
245
246        // batches must be atomically recoverable, which
247        // means that we should wait until the entire
248        // batch has been stabilized before any parts
249        // of the batch are allowed to be reused
250        // due to having marked them as stable.
251        while let Some((low, high)) =
252            self.batches.iter().map(|(l, h)| (*l, *h)).next()
253        {
254            assert!(
255                low < high,
256                "expected batch low mark {} to be below high mark {}",
257                low,
258                high
259            );
260
261            if high <= self.stable_lsn {
262                // the entire batch has been written to disk
263                // and fsynced, so we can propagate its stability
264                // through the `batch_stable_lsn` variable.
265                if let Some(bsl) = batch_stable_lsn {
266                    assert!(bsl < high);
267                }
268                batch_stable_lsn = Some(high);
269                self.batches.remove(&low).unwrap();
270            } else {
271                if low <= self.stable_lsn {
272                    // the batch has not been fully written
273                    // to disk, but we can communicate that
274                    // the region before the batch has
275                    // stabilized.
276                    batch_stable_lsn = Some(low - 1);
277                }
278                break;
279            }
280        }
281
282        if self.batches.is_empty() {
283            Some(self.stable_lsn)
284        } else {
285            batch_stable_lsn
286        }
287    }
288}
289
290pub(crate) struct IoBufs {
291    pub config: RunningConfig,
292
293    // A pointer to the current IoBuf. This relies on crossbeam-epoch
294    // for garbage collection when it gets swapped out, to ensure that
295    // no witnessing threads experience use-after-free.
296    // mutated from the maybe_seal_and_write_iobuf method.
297    // finally dropped in the Drop impl, without using crossbeam-epoch,
298    // because if this drops, all witnessing threads should be done.
299    pub iobuf: AtomicPtr<IoBuf>,
300
301    // Pending intervals that have been written to stable storage, but may be
302    // higher than the current value of `stable` due to interesting thread
303    // interleavings.
304    pub intervals: Mutex<StabilityIntervals>,
305    pub interval_updated: Condvar,
306
307    // The highest CONTIGUOUS log sequence number that has been written to
308    // stable storage. This may be lower than the length of the underlying
309    // file, and there may be buffers that have been written out-of-order
310    // to stable storage due to interesting thread interleavings.
311    pub stable_lsn: AtomicLsn,
312    pub max_reserved_lsn: AtomicLsn,
313    pub max_header_stable_lsn: Arc<AtomicLsn>,
314    pub segment_accountant: Mutex<SegmentAccountant>,
315    pub segment_cleaner: SegmentCleaner,
316    deferred_segment_ops: stack::Stack<SegmentOp>,
317    #[cfg(feature = "io_uring")]
318    pub submission_mutex: Mutex<()>,
319    #[cfg(feature = "io_uring")]
320    pub io_uring: rio::Rio,
321}
322
323impl Drop for IoBufs {
324    fn drop(&mut self) {
325        let ptr = self.iobuf.swap(std::ptr::null_mut(), SeqCst);
326        assert!(!ptr.is_null());
327        unsafe {
328            Arc::from_raw(ptr);
329        }
330    }
331}
332
333/// `IoBufs` is a set of lock-free buffers for coordinating
334/// writes to underlying storage.
335impl IoBufs {
336    pub fn start(config: RunningConfig, snapshot: &Snapshot) -> Result<IoBufs> {
337        let segment_cleaner = SegmentCleaner::default();
338
339        let mut segment_accountant: SegmentAccountant =
340            SegmentAccountant::start(
341                config.clone(),
342                snapshot,
343                segment_cleaner.clone(),
344            )?;
345
346        let segment_size = config.segment_size;
347
348        let (recovered_lid, recovered_lsn) =
349            snapshot.recovered_coords(config.segment_size);
350
351        let (next_lid, next_lsn) = match (recovered_lid, recovered_lsn) {
352            (Some(next_lid), Some(next_lsn)) => {
353                debug!(
354                    "starting log at recovered active \
355                    offset {}, recovered lsn {}",
356                    next_lid, next_lsn
357                );
358                (next_lid, next_lsn)
359            }
360            (None, None) => {
361                debug!("starting log for a totally fresh system");
362                let next_lsn = 0;
363                let next_lid = segment_accountant.next(next_lsn)?;
364                (next_lid, next_lsn)
365            }
366            (None, Some(next_lsn)) => {
367                let next_lid = segment_accountant.next(next_lsn)?;
368                debug!(
369                    "starting log at clean offset {}, recovered lsn {}",
370                    next_lid, next_lsn
371                );
372                (next_lid, next_lsn)
373            }
374            (Some(_), None) => unreachable!(),
375        };
376
377        assert!(next_lsn >= Lsn::try_from(next_lid).unwrap());
378
379        debug!(
380            "starting IoBufs with next_lsn: {} \
381             next_lid: {}",
382            next_lsn, next_lid
383        );
384
385        // we want stable to begin at -1 if the 0th byte
386        // of our file has not yet been written.
387        let stable = next_lsn - 1;
388
389        // the tip offset is not completely full yet, reuse it
390        let base = assert_usize(next_lid % segment_size as LogOffset);
391
392        let mut iobuf = IoBuf {
393            buf: Arc::new(UnsafeCell::new(AlignedBuf::new(segment_size))),
394            header: CachePadded::new(AtomicU64::new(0)),
395            base,
396            offset: next_lid,
397            lsn: next_lsn,
398            capacity: segment_size - base,
399            stored_max_stable_lsn: -1,
400        };
401
402        if snapshot.active_segment.is_none() {
403            iobuf.store_segment_header(0, next_lsn, stable);
404        }
405
406        Ok(IoBufs {
407            config,
408
409            iobuf: AtomicPtr::new(Arc::into_raw(Arc::new(iobuf)) as *mut IoBuf),
410
411            intervals: Mutex::new(StabilityIntervals::new(stable)),
412            interval_updated: Condvar::new(),
413
414            stable_lsn: AtomicLsn::new(stable),
415            max_reserved_lsn: AtomicLsn::new(stable),
416            max_header_stable_lsn: Arc::new(AtomicLsn::new(next_lsn)),
417            segment_accountant: Mutex::new(segment_accountant),
418            segment_cleaner,
419            deferred_segment_ops: stack::Stack::default(),
420            #[cfg(feature = "io_uring")]
421            submission_mutex: Mutex::new(()),
422            #[cfg(feature = "io_uring")]
423            io_uring: rio::new()?,
424        })
425    }
426
427    pub(in crate::pagecache) fn sa_mark_link(
428        &self,
429        pid: PageId,
430        cache_info: CacheInfo,
431        guard: &Guard,
432    ) {
433        let op = SegmentOp::Link { pid, cache_info };
434        self.deferred_segment_ops.push(op, guard);
435    }
436
437    pub(in crate::pagecache) fn sa_mark_replace(
438        &self,
439        pid: PageId,
440        lsn: Lsn,
441        old_cache_infos: &[CacheInfo],
442        new_cache_info: CacheInfo,
443        guard: &Guard,
444    ) -> Result<()> {
445        debug_delay();
446        if let Some(mut sa) = self.segment_accountant.try_lock() {
447            let start = clock();
448            sa.mark_replace(pid, lsn, old_cache_infos, new_cache_info)?;
449            for op in self.deferred_segment_ops.take_iter(guard) {
450                sa.apply_op(op)?;
451            }
452            M.accountant_hold.measure(clock() - start);
453        } else {
454            let op = SegmentOp::Replace {
455                pid,
456                lsn,
457                old_cache_infos: old_cache_infos.to_vec(),
458                new_cache_info,
459            };
460            self.deferred_segment_ops.push(op, guard);
461        }
462        Ok(())
463    }
464
465    pub(in crate::pagecache) fn sa_stabilize(
466        &self,
467        lsn: Lsn,
468        guard: &Guard,
469    ) -> Result<()> {
470        self.with_sa(|sa| {
471            for op in self.deferred_segment_ops.take_iter(guard) {
472                sa.apply_op(op)?;
473            }
474            sa.stabilize(lsn)?;
475            Ok(())
476        })
477    }
478
479    /// `SegmentAccountant` access for coordination with the `PageCache`
480    pub(in crate::pagecache) fn with_sa<B, F>(&self, f: F) -> B
481    where
482        F: FnOnce(&mut SegmentAccountant) -> B,
483    {
484        let start = clock();
485
486        debug_delay();
487        let mut sa = self.segment_accountant.lock();
488
489        let locked_at = clock();
490
491        M.accountant_lock.measure(locked_at - start);
492
493        let ret = f(&mut sa);
494
495        drop(sa);
496
497        M.accountant_hold.measure(clock() - locked_at);
498
499        ret
500    }
501
502    /// Return an iterator over the log, starting with
503    /// a specified offset.
504    pub(crate) fn iter_from(&self, lsn: Lsn) -> LogIter {
505        trace!("iterating from lsn {}", lsn);
506        let segments = self.with_sa(|sa| sa.segment_snapshot_iter_from(lsn));
507
508        LogIter {
509            config: self.config.clone(),
510            max_lsn: Some(self.stable()),
511            cur_lsn: None,
512            segment_base: None,
513            segments,
514            last_stage: false,
515        }
516    }
517
518    /// Returns the last stable offset in storage.
519    pub(in crate::pagecache) fn stable(&self) -> Lsn {
520        debug_delay();
521        self.stable_lsn.load(Acquire)
522    }
523
524    // Adds a header to the front of the buffer
525    #[allow(clippy::mut_mut)]
526    pub(crate) fn encapsulate<T: Serialize + Debug>(
527        &self,
528        item: &T,
529        header: MessageHeader,
530        mut out_buf: &mut [u8],
531        blob_id: Option<Lsn>,
532    ) -> Result<()> {
533        // we create this double ref to allow scooting
534        // the slice forward without doing anything
535        // to the argument
536        let out_buf_ref: &mut &mut [u8] = &mut out_buf;
537        {
538            let _ = Measure::new(&M.serialize);
539            header.serialize_into(out_buf_ref);
540        }
541
542        if let Some(blob_id) = blob_id {
543            // write blob to file
544            io_fail!(self, "blob blob write");
545            write_blob(&self.config, header.kind, blob_id, item)?;
546
547            let _ = Measure::new(&M.serialize);
548            blob_id.serialize_into(out_buf_ref);
549        } else {
550            let _ = Measure::new(&M.serialize);
551            item.serialize_into(out_buf_ref);
552        };
553
554        assert_eq!(
555            out_buf_ref.len(),
556            0,
557            "trying to serialize header {:?} \
558             and item {:?} but there were \
559             buffer leftovers at the end",
560            header,
561            item
562        );
563
564        Ok(())
565    }
566
567    // Write an IO buffer's data to stable storage and set up the
568    // next IO buffer for writing.
569    pub(crate) fn write_to_log(&self, iobuf: &IoBuf) -> Result<()> {
570        let _measure = Measure::new(&M.write_to_log);
571        let header = iobuf.get_header();
572        let log_offset = iobuf.offset;
573        let base_lsn = iobuf.lsn;
574        let capacity = iobuf.capacity;
575
576        let segment_size = self.config.segment_size;
577
578        assert_eq!(
579            Lsn::try_from(log_offset % segment_size as LogOffset).unwrap(),
580            base_lsn % segment_size as Lsn
581        );
582
583        assert_ne!(
584            log_offset,
585            LogOffset::max_value(),
586            "created reservation for uninitialized slot",
587        );
588
589        assert!(header::is_sealed(header));
590
591        let bytes_to_write = header::offset(header);
592
593        trace!(
594            "write_to_log log_offset {} lsn {} len {}",
595            log_offset,
596            base_lsn,
597            bytes_to_write
598        );
599
600        let maxed = header::is_maxed(header);
601        let unused_space = capacity - bytes_to_write;
602        let should_pad = maxed && unused_space >= MAX_MSG_HEADER_LEN;
603
604        // a pad is a null message written to the end of a buffer
605        // to signify that nothing else will be written into it
606        if should_pad {
607            let pad_len = unused_space - MAX_MSG_HEADER_LEN;
608            let data = iobuf.get_mut_range(bytes_to_write, unused_space);
609
610            let segment_number = SegmentNumber(
611                u64::try_from(base_lsn).unwrap()
612                    / u64::try_from(self.config.segment_size).unwrap(),
613            );
614
615            let header = MessageHeader {
616                kind: MessageKind::Cap,
617                pid: PageId::max_value(),
618                segment_number,
619                len: u64::try_from(pad_len).unwrap(),
620                crc32: 0,
621            };
622
623            let header_bytes = header.serialize();
624
625            // initialize the remainder of this buffer (only pad_len of this
626            // will be part of the Cap message)
627            let padding_bytes = vec![
628                MessageKind::Corrupted.into();
629                unused_space - header_bytes.len()
630            ];
631
632            #[allow(unsafe_code)]
633            unsafe {
634                std::ptr::copy_nonoverlapping(
635                    header_bytes.as_ptr(),
636                    data.as_mut_ptr(),
637                    header_bytes.len(),
638                );
639                std::ptr::copy_nonoverlapping(
640                    padding_bytes.as_ptr(),
641                    data.as_mut_ptr().add(header_bytes.len()),
642                    padding_bytes.len(),
643                );
644            }
645
646            // this as to stay aligned with the hashing
647            let crc32_arr = u32_to_arr(calculate_message_crc32(
648                &header_bytes,
649                &padding_bytes[..pad_len],
650            ));
651
652            #[allow(unsafe_code)]
653            unsafe {
654                std::ptr::copy_nonoverlapping(
655                    crc32_arr.as_ptr(),
656                    // the crc32 is the first part of the buffer
657                    data.as_mut_ptr(),
658                    std::mem::size_of::<u32>(),
659                );
660            }
661        } else if maxed {
662            // initialize the remainder of this buffer's red zone
663            let data = iobuf.get_mut_range(bytes_to_write, unused_space);
664
665            #[allow(unsafe_code)]
666            unsafe {
667                // note: this could use slice::fill() if it stabilizes
668                std::ptr::write_bytes(
669                    data.as_mut_ptr(),
670                    MessageKind::Corrupted.into(),
671                    unused_space,
672                );
673            }
674        }
675
676        let total_len = if maxed { capacity } else { bytes_to_write };
677
678        let data = iobuf.get_mut_range(0, total_len);
679        let stored_max_stable_lsn = iobuf.stored_max_stable_lsn;
680
681        io_fail!(self, "buffer write");
682        #[cfg(feature = "io_uring")]
683        {
684            let mut wrote = 0;
685            while wrote < total_len {
686                let to_write = &data[wrote..];
687                let offset = log_offset + wrote as u64;
688
689                // we take out this mutex to guarantee
690                // that our `Link` write operation below
691                // is serialized with the following sync.
692                // we don't put the `Rio` instance into
693                // the `Mutex` because we want to drop the
694                // `Mutex` right after beginning the async
695                // submission.
696                let link_mu = self.submission_mutex.lock();
697
698                // using the `Link` ordering, we specify
699                // that `io_uring` should not begin
700                // the following `sync_file_range`
701                // until the previous write is
702                // complete.
703                let wrote_completion = self.io_uring.write_at_ordered(
704                    &*self.config.file,
705                    &to_write,
706                    offset,
707                    rio::Ordering::Link,
708                );
709
710                let sync_completion = self.io_uring.sync_file_range(
711                    &*self.config.file,
712                    offset,
713                    to_write.len(),
714                );
715
716                sync_completion.wait()?;
717
718                // TODO we want to move this above the previous `wait`
719                // but there seems to be an issue in `rio` that is
720                // triggered when multiple threads are submitting
721                // events while events from other threads are in play.
722                drop(link_mu);
723
724                wrote += wrote_completion.wait()?;
725            }
726        }
727        #[cfg(not(feature = "io_uring"))]
728        {
729            let f = &self.config.file;
730            pwrite_all(f, data, log_offset)?;
731            if !self.config.temporary {
732                #[cfg(target_os = "linux")]
733                {
734                    use std::os::unix::io::AsRawFd;
735                    let ret = unsafe {
736                        libc::sync_file_range(
737                            f.as_raw_fd(),
738                            i64::try_from(log_offset).unwrap(),
739                            i64::try_from(total_len).unwrap(),
740                            libc::SYNC_FILE_RANGE_WAIT_BEFORE
741                                | libc::SYNC_FILE_RANGE_WRITE
742                                | libc::SYNC_FILE_RANGE_WAIT_AFTER,
743                        )
744                    };
745                    if ret < 0 {
746                        let err = std::io::Error::last_os_error();
747                        if let Some(libc::ENOSYS) = err.raw_os_error() {
748                            f.sync_all()?;
749                        } else {
750                            return Err(err.into());
751                        }
752                    }
753                }
754
755                #[cfg(not(target_os = "linux"))]
756                f.sync_all()?;
757            }
758        }
759        io_fail!(self, "buffer write post");
760
761        if total_len > 0 {
762            let complete_len = if maxed {
763                let lsn_idx = base_lsn / segment_size as Lsn;
764                let next_seg_beginning = (lsn_idx + 1) * segment_size as Lsn;
765                assert_usize(next_seg_beginning - base_lsn)
766            } else {
767                total_len
768            };
769
770            debug!(
771                "wrote lsns {}-{} to disk at offsets {}-{}, maxed {} complete_len {}",
772                base_lsn,
773                base_lsn + total_len as Lsn - 1,
774                log_offset,
775                log_offset + total_len as LogOffset - 1,
776                maxed,
777                complete_len
778            );
779            self.mark_interval(base_lsn, complete_len);
780        }
781
782        M.written_bytes.measure(total_len as u64);
783
784        // NB the below deferred logic is important to ensure
785        // that we never actually free a segment until all threads
786        // that may have witnessed a DiskPtr that points into it
787        // have completed their (crossbeam-epoch)-pinned operations.
788        let guard = pin();
789        let max_header_stable_lsn = self.max_header_stable_lsn.clone();
790        guard.defer(move || {
791            trace!("bumping atomic header lsn to {}", stored_max_stable_lsn);
792            bump_atomic_lsn(&max_header_stable_lsn, stored_max_stable_lsn)
793        });
794
795        guard.flush();
796
797        let current_max_header_stable_lsn =
798            self.max_header_stable_lsn.load(Acquire);
799
800        self.sa_stabilize(current_max_header_stable_lsn, &guard)
801    }
802
803    // It's possible that IO buffers are written out of order!
804    // So we need to use this to keep track of them, and only
805    // increment self.stable. If we didn't do this, then we would
806    // accidentally decrement self.stable sometimes, or bump stable
807    // above an offset that corresponds to a buffer that hasn't actually
808    // been written yet! It's OK to use a mutex here because it is pretty
809    // fast, compared to the other operations on shared state.
810    fn mark_interval(&self, whence: Lsn, len: usize) {
811        debug!("mark_interval({}, {})", whence, len);
812        assert!(
813            len > 0,
814            "mark_interval called with an empty length at {}",
815            whence
816        );
817        let mut intervals = self.intervals.lock();
818
819        let interval = (whence, whence + len as Lsn - 1);
820
821        let updated = intervals.mark_fsync(interval);
822
823        if let Some(new_stable_lsn) = updated {
824            trace!("mark_interval new highest lsn {}", new_stable_lsn);
825            self.stable_lsn.store(new_stable_lsn, SeqCst);
826
827            #[cfg(feature = "event_log")]
828            {
829                // We add 1 because we want it to stay monotonic with recovery
830                // LSN, which deals with the next LSN after the last stable one.
831                // We need to do this while intervals is held otherwise it
832                // may race with another thread that stabilizes something
833                // lower.
834                self.config.event_log.stabilized_lsn(new_stable_lsn + 1);
835            }
836
837            // having held the mutex makes this linearized
838            // with the notify below.
839            drop(intervals);
840        }
841        let _notified = self.interval_updated.notify_all();
842    }
843
844    pub(in crate::pagecache) fn current_iobuf(&self) -> Arc<IoBuf> {
845        // we bump up the ref count, and forget the arc to retain a +1.
846        // If we didn't forget it, it would then go back down again,
847        // even though we just created a new reference to it, leading
848        // to double-frees.
849        let arc = unsafe { Arc::from_raw(self.iobuf.load(Acquire)) };
850        #[allow(clippy::mem_forget)]
851        std::mem::forget(arc.clone());
852        arc
853    }
854}
855
856pub(crate) fn roll_iobuf(iobufs: &Arc<IoBufs>) -> Result<usize> {
857    let iobuf = iobufs.current_iobuf();
858    let header = iobuf.get_header();
859    if header::is_sealed(header) {
860        trace!("skipping roll_iobuf due to already-sealed header");
861        return Ok(0);
862    }
863    if header::offset(header) == 0 {
864        trace!("skipping roll_iobuf due to empty segment");
865    } else {
866        trace!("sealing ioubuf from  roll_iobuf");
867        maybe_seal_and_write_iobuf(iobufs, &iobuf, header, false)?;
868    }
869
870    Ok(header::offset(header))
871}
872
873/// Blocks until the specified log sequence number has
874/// been made stable on disk. Returns the number of
875/// bytes written. Suitable as a full consistency
876/// barrier.
877pub(in crate::pagecache) fn make_stable(
878    iobufs: &Arc<IoBufs>,
879    lsn: Lsn,
880) -> Result<usize> {
881    make_stable_inner(iobufs, lsn, false)
882}
883
884/// Blocks until the specified log sequence number
885/// has been written to disk. it's assumed that
886/// log messages are always written contiguously
887/// due to the way reservations manage io buffer
888/// tenancy. this is only suitable for use
889/// before trying to read a message from the log,
890/// so that the system can avoid a full barrier
891/// if the desired item has already been made
892/// durable.
893pub(in crate::pagecache) fn make_durable(
894    iobufs: &Arc<IoBufs>,
895    lsn: Lsn,
896) -> Result<usize> {
897    make_stable_inner(iobufs, lsn, true)
898}
899
900pub(in crate::pagecache) fn make_stable_inner(
901    iobufs: &Arc<IoBufs>,
902    lsn: Lsn,
903    partial_durability: bool,
904) -> Result<usize> {
905    let _measure = Measure::new(&M.make_stable);
906
907    // NB before we write the 0th byte of the file, stable  is -1
908    let first_stable = iobufs.stable();
909    if first_stable >= lsn {
910        return Ok(0);
911    }
912
913    let mut stable = first_stable;
914
915    while stable < lsn {
916        if let Err(e) = iobufs.config.global_error() {
917            let intervals = iobufs.intervals.lock();
918
919            // having held the mutex makes this linearized
920            // with the notify below.
921            drop(intervals);
922
923            let _notified = iobufs.interval_updated.notify_all();
924            return Err(e);
925        }
926
927        let iobuf = iobufs.current_iobuf();
928        let header = iobuf.get_header();
929        if header::offset(header) == 0
930            || header::is_sealed(header)
931            || iobuf.lsn > lsn
932        {
933            // nothing to write, don't bother sealing
934            // current IO buffer.
935        } else {
936            maybe_seal_and_write_iobuf(iobufs, &iobuf, header, false)?;
937            stable = iobufs.stable();
938            // NB we have to continue here to possibly clear
939            // the next io buffer, which may have dirty
940            // data we need to flush (and maybe no other
941            // thread is still alive to do so)
942            continue;
943        }
944
945        // block until another thread updates the stable lsn
946        let mut waiter = iobufs.intervals.lock();
947
948        // check global error again now that we are holding a mutex
949        if let Err(e) = iobufs.config.global_error() {
950            // having held the mutex makes this linearized
951            // with the notify below.
952            drop(waiter);
953
954            let _notified = iobufs.interval_updated.notify_all();
955            return Err(e);
956        }
957
958        stable = iobufs.stable();
959
960        if partial_durability {
961            if waiter.stable_lsn > lsn {
962                return Ok(assert_usize(stable - first_stable));
963            }
964
965            for (low, high) in &waiter.fsynced_ranges {
966                if *low <= lsn && *high > lsn {
967                    return Ok(assert_usize(stable - first_stable));
968                }
969            }
970        }
971
972        if stable < lsn {
973            trace!("waiting on cond var for make_stable({})", lsn);
974
975            if cfg!(feature = "event_log") {
976                let timeout = iobufs
977                    .interval_updated
978                    .wait_for(&mut waiter, std::time::Duration::from_secs(30));
979                if timeout.timed_out() {
980                    fn tn() -> String {
981                        std::thread::current()
982                            .name()
983                            .unwrap_or("unknown")
984                            .to_owned()
985                    }
986                    panic!(
987                        "{} failed to make_stable after 30 seconds. \
988                         waiting to stabilize lsn {}, current stable {} \
989                         intervals: {:?}",
990                        tn(),
991                        lsn,
992                        iobufs.stable(),
993                        waiter
994                    );
995                }
996            } else {
997                iobufs.interval_updated.wait(&mut waiter);
998            }
999        } else {
1000            debug!("make_stable({}) returning", lsn);
1001            break;
1002        }
1003    }
1004
1005    Ok(assert_usize(stable - first_stable))
1006}
1007
1008/// Called by users who wish to force the current buffer
1009/// to flush some pending writes. Returns the number
1010/// of bytes written during this call.
1011pub(in crate::pagecache) fn flush(iobufs: &Arc<IoBufs>) -> Result<usize> {
1012    let _cc = concurrency_control::read();
1013    let max_reserved_lsn = iobufs.max_reserved_lsn.load(Acquire);
1014    make_stable(iobufs, max_reserved_lsn)
1015}
1016
1017/// Attempt to seal the current IO buffer, possibly
1018/// writing it to disk if there are no other writers
1019/// operating on it.
1020pub(in crate::pagecache) fn maybe_seal_and_write_iobuf(
1021    iobufs: &Arc<IoBufs>,
1022    iobuf: &Arc<IoBuf>,
1023    header: Header,
1024    from_reserve: bool,
1025) -> Result<()> {
1026    if header::is_sealed(header) {
1027        // this buffer is already sealed. nothing to do here.
1028        return Ok(());
1029    }
1030
1031    // NB need to do this before CAS because it can get
1032    // written and reset by another thread afterward
1033    let lid = iobuf.offset;
1034    let lsn = iobuf.lsn;
1035    let capacity = iobuf.capacity;
1036    let segment_size = iobufs.config.segment_size;
1037
1038    if header::offset(header) > capacity {
1039        // a race happened, nothing we can do
1040        return Ok(());
1041    }
1042
1043    let res_len = header::offset(header);
1044    let maxed = from_reserve || capacity - res_len < MAX_MSG_HEADER_LEN;
1045    let sealed = if maxed {
1046        trace!("setting maxed to true for iobuf with lsn {}", lsn);
1047        header::mk_maxed(header::mk_sealed(header))
1048    } else {
1049        header::mk_sealed(header)
1050    };
1051
1052    let worked = iobuf.cas_header(header, sealed).is_ok();
1053    if !worked {
1054        return Ok(());
1055    }
1056
1057    trace!("sealed iobuf with lsn {}", lsn);
1058
1059    assert!(
1060        capacity + SEG_HEADER_LEN >= res_len,
1061        "res_len of {} higher than buffer capacity {}",
1062        res_len,
1063        capacity
1064    );
1065
1066    assert_ne!(
1067        lid,
1068        LogOffset::max_value(),
1069        "sealing something that should never have \
1070         been claimed (iobuf lsn {})\n{:?}",
1071        lsn,
1072        iobufs
1073    );
1074
1075    // open new slot
1076    let mut next_lsn = lsn;
1077
1078    let measure_assign_offset = Measure::new(&M.assign_offset);
1079
1080    let next_offset = if maxed {
1081        // roll lsn to the next offset
1082        let lsn_idx = lsn / segment_size as Lsn;
1083        next_lsn = (lsn_idx + 1) * segment_size as Lsn;
1084
1085        // mark unused as clear
1086        debug!(
1087            "rolling to new segment after clearing {}-{}",
1088            lid,
1089            lid + res_len as LogOffset,
1090        );
1091
1092        match iobufs.with_sa(|sa| sa.next(next_lsn)) {
1093            Ok(ret) => ret,
1094            Err(e) => {
1095                iobufs.config.set_global_error(e.clone());
1096                let intervals = iobufs.intervals.lock();
1097
1098                // having held the mutex makes this linearized
1099                // with the notify below.
1100                drop(intervals);
1101
1102                let _notified = iobufs.interval_updated.notify_all();
1103                return Err(e);
1104            }
1105        }
1106    } else {
1107        debug!(
1108            "advancing offset within the current segment from {} to {}",
1109            lid,
1110            lid + res_len as LogOffset
1111        );
1112        next_lsn += res_len as Lsn;
1113
1114        lid + res_len as LogOffset
1115    };
1116
1117    // NB as soon as the "sealed" bit is 0, this allows new threads
1118    // to start writing into this buffer, so do that after it's all
1119    // set up. expect this thread to block until the buffer completes
1120    // its entire life cycle as soon as we do that.
1121    let next_iobuf = if maxed {
1122        let mut next_iobuf = IoBuf {
1123            buf: Arc::new(UnsafeCell::new(AlignedBuf::new(segment_size))),
1124            header: CachePadded::new(AtomicU64::new(0)),
1125            base: 0,
1126            offset: next_offset,
1127            lsn: next_lsn,
1128            capacity: segment_size,
1129            stored_max_stable_lsn: -1,
1130        };
1131
1132        next_iobuf.store_segment_header(sealed, next_lsn, iobufs.stable());
1133
1134        next_iobuf
1135    } else {
1136        let new_cap = capacity - res_len;
1137        assert_ne!(new_cap, 0);
1138        let last_salt = header::salt(sealed);
1139        let new_salt = header::bump_salt(last_salt);
1140
1141        IoBuf {
1142            // reuse the previous io buffer
1143            buf: iobuf.buf.clone(),
1144            header: CachePadded::new(AtomicU64::new(new_salt)),
1145            base: iobuf.base + res_len,
1146            offset: next_offset,
1147            lsn: next_lsn,
1148            capacity: new_cap,
1149            stored_max_stable_lsn: -1,
1150        }
1151    };
1152
1153    // we acquire this mutex to guarantee that any threads that
1154    // are going to wait on the condition variable will observe
1155    // the change.
1156    debug_delay();
1157    let intervals = iobufs.intervals.lock();
1158    let old_ptr = iobufs
1159        .iobuf
1160        .swap(Arc::into_raw(Arc::new(next_iobuf)) as *mut IoBuf, SeqCst);
1161
1162    let old_arc = unsafe { Arc::from_raw(old_ptr) };
1163
1164    pin().defer(move || drop(old_arc));
1165
1166    // having held the mutex makes this linearized
1167    // with the notify below.
1168    drop(intervals);
1169
1170    let _notified = iobufs.interval_updated.notify_all();
1171
1172    drop(measure_assign_offset);
1173
1174    // if writers is 0, it's our responsibility to write the buffer.
1175    if header::n_writers(sealed) == 0 {
1176        iobufs.config.global_error()?;
1177        trace!(
1178            "asynchronously writing iobuf with lsn {} to log from maybe_seal",
1179            lsn
1180        );
1181        let iobufs = iobufs.clone();
1182        let iobuf = iobuf.clone();
1183        let _result = threadpool::spawn(move || {
1184            if let Err(e) = iobufs.write_to_log(&iobuf) {
1185                error!(
1186                    "hit error while writing iobuf with lsn {}: {:?}",
1187                    lsn, e
1188                );
1189
1190                // store error before notifying so that waiting threads will see
1191                // it
1192                iobufs.config.set_global_error(e);
1193
1194                let intervals = iobufs.intervals.lock();
1195
1196                // having held the mutex makes this linearized
1197                // with the notify below.
1198                drop(intervals);
1199
1200                let _notified = iobufs.interval_updated.notify_all();
1201            }
1202        })?;
1203
1204        #[cfg(feature = "event_log")]
1205        _result.wait();
1206
1207        Ok(())
1208    } else {
1209        Ok(())
1210    }
1211}
1212
1213impl Debug for IoBufs {
1214    fn fmt(
1215        &self,
1216        formatter: &mut fmt::Formatter<'_>,
1217    ) -> std::result::Result<(), fmt::Error> {
1218        formatter.write_fmt(format_args!("IoBufs {{ buf: {:?} }}", self.iobuf))
1219    }
1220}
1221
1222impl Debug for IoBuf {
1223    fn fmt(
1224        &self,
1225        formatter: &mut fmt::Formatter<'_>,
1226    ) -> std::result::Result<(), fmt::Error> {
1227        let header = self.get_header();
1228        formatter.write_fmt(format_args!(
1229            "\n\tIoBuf {{ lid: {}, n_writers: {}, offset: \
1230             {}, sealed: {} }}",
1231            self.offset,
1232            header::n_writers(header),
1233            header::offset(header),
1234            header::is_sealed(header)
1235        ))
1236    }
1237}