1use std::{
2 alloc::{alloc, dealloc, Layout},
3 cell::UnsafeCell,
4 sync::atomic::AtomicPtr,
5};
6
7use crate::{pagecache::*, *};
8
9macro_rules! io_fail {
10 ($self:expr, $e:expr) => {
11 #[cfg(feature = "failpoints")]
12 {
13 debug_delay();
14 if crate::fail::is_active($e) {
15 $self.config.set_global_error(Error::FailPoint);
16 let _mu = $self.intervals.lock();
18
19 drop(_mu);
22
23 let _notified = $self.interval_updated.notify_all();
24 return Err(Error::FailPoint);
25 }
26 };
27 };
28}
29
30struct AlignedBuf(*mut u8, usize);
31
32#[allow(unsafe_code)]
33unsafe impl Send for AlignedBuf {}
34
35#[allow(unsafe_code)]
36unsafe impl Sync for AlignedBuf {}
37
38impl AlignedBuf {
39 fn new(len: usize) -> AlignedBuf {
40 let layout = Layout::from_size_align(len, 8192).unwrap();
41 let ptr = unsafe { alloc(layout) };
42
43 assert!(!ptr.is_null(), "failed to allocate critical IO buffer");
44
45 AlignedBuf(ptr, len)
46 }
47}
48
49impl Drop for AlignedBuf {
50 fn drop(&mut self) {
51 let layout = Layout::from_size_align(self.1, 8192).unwrap();
52 unsafe {
53 dealloc(self.0, layout);
54 }
55 }
56}
57
58pub(crate) struct IoBuf {
59 buf: Arc<UnsafeCell<AlignedBuf>>,
60 header: CachePadded<AtomicU64>,
61 base: usize,
62 pub offset: LogOffset,
63 pub lsn: Lsn,
64 pub capacity: usize,
65 stored_max_stable_lsn: Lsn,
66}
67
68#[allow(unsafe_code)]
69unsafe impl Sync for IoBuf {}
70
71#[allow(unsafe_code)]
72unsafe impl Send for IoBuf {}
73
74impl IoBuf {
75 pub(crate) fn get_mut_range(
98 &self,
99 at: usize,
100 len: usize,
101 ) -> &'static mut [u8] {
102 let buf_ptr = self.buf.get();
103
104 unsafe {
105 assert!((*buf_ptr).1 >= at + len);
106 std::slice::from_raw_parts_mut(
107 (*buf_ptr).0.add(self.base + at),
108 len,
109 )
110 }
111 }
112
113 fn store_segment_header(
118 &mut self,
119 last: Header,
120 lsn: Lsn,
121 max_stable_lsn: Lsn,
122 ) {
123 debug!("storing lsn {} in beginning of buffer", lsn);
124 assert!(self.capacity >= SEG_HEADER_LEN);
125
126 self.stored_max_stable_lsn = max_stable_lsn;
127
128 self.lsn = lsn;
129
130 let header = SegmentHeader { lsn, max_stable_lsn, ok: true };
131 let header_bytes: [u8; SEG_HEADER_LEN] = header.into();
132
133 #[allow(unsafe_code)]
134 unsafe {
135 std::ptr::copy_nonoverlapping(
136 header_bytes.as_ptr(),
137 (*self.buf.get()).0,
138 SEG_HEADER_LEN,
139 );
140 }
141
142 let last_salt = header::salt(last);
144 let new_salt = header::bump_salt(last_salt);
145 let bumped = header::bump_offset(new_salt, SEG_HEADER_LEN);
146 self.set_header(bumped);
147 }
148
149 pub(crate) fn get_header(&self) -> Header {
150 debug_delay();
151 self.header.load(Acquire)
152 }
153
154 pub(crate) fn set_header(&self, new: Header) {
155 debug_delay();
156 self.header.store(new, Release);
157 }
158
159 pub(crate) fn cas_header(
160 &self,
161 old: Header,
162 new: Header,
163 ) -> std::result::Result<Header, Header> {
164 debug_delay();
165 let res = self.header.compare_and_swap(old, new, SeqCst);
166 if res == old {
167 Ok(new)
168 } else {
169 Err(res)
170 }
171 }
172}
173
174#[derive(Debug)]
175pub(crate) struct StabilityIntervals {
176 fsynced_ranges: Vec<(Lsn, Lsn)>,
177 batches: BTreeMap<Lsn, Lsn>,
178 stable_lsn: Lsn,
179}
180
181impl StabilityIntervals {
182 fn new(lsn: Lsn) -> StabilityIntervals {
183 StabilityIntervals {
184 stable_lsn: lsn,
185 fsynced_ranges: vec![],
186 batches: BTreeMap::default(),
187 }
188 }
189
190 pub(crate) fn mark_batch(&mut self, interval: (Lsn, Lsn)) {
191 assert!(interval.0 > self.stable_lsn);
192 self.batches.insert(interval.0, interval.1);
193 }
194
195 fn mark_fsync(&mut self, interval: (Lsn, Lsn)) -> Option<Lsn> {
196 trace!(
197 "pushing interval {:?} into fsynced_ranges {:?}",
198 interval,
199 self.fsynced_ranges
200 );
201 if let Some((low, high)) = self.fsynced_ranges.last_mut() {
202 if *low == interval.1 + 1 {
203 *low = interval.0
204 } else if *high + 1 == interval.0 {
205 *high = interval.1
206 } else {
207 self.fsynced_ranges.push(interval);
208 }
209 } else {
210 self.fsynced_ranges.push(interval);
211 }
212
213 #[cfg(any(test, feature = "event_log", feature = "lock_free_delays"))]
214 assert!(
215 self.fsynced_ranges.len() < 10000,
216 "intervals is getting strangely long... {:?}",
217 self
218 );
219
220 self.fsynced_ranges
222 .sort_unstable_by_key(|&range| std::cmp::Reverse(range));
223
224 while let Some(&(low, high)) = self.fsynced_ranges.last() {
225 assert!(low <= high);
226 let cur_stable = self.stable_lsn;
227 assert!(
228 low > cur_stable,
229 "somehow, we marked offset {} stable while \
230 interval {}-{} had not yet been applied!",
231 cur_stable,
232 low,
233 high
234 );
235 if cur_stable + 1 == low {
236 debug!("new highest interval: {} - {}", low, high);
237 self.fsynced_ranges.pop().unwrap();
238 self.stable_lsn = high;
239 } else {
240 break;
241 }
242 }
243
244 let mut batch_stable_lsn = None;
245
246 while let Some((low, high)) =
252 self.batches.iter().map(|(l, h)| (*l, *h)).next()
253 {
254 assert!(
255 low < high,
256 "expected batch low mark {} to be below high mark {}",
257 low,
258 high
259 );
260
261 if high <= self.stable_lsn {
262 if let Some(bsl) = batch_stable_lsn {
266 assert!(bsl < high);
267 }
268 batch_stable_lsn = Some(high);
269 self.batches.remove(&low).unwrap();
270 } else {
271 if low <= self.stable_lsn {
272 batch_stable_lsn = Some(low - 1);
277 }
278 break;
279 }
280 }
281
282 if self.batches.is_empty() {
283 Some(self.stable_lsn)
284 } else {
285 batch_stable_lsn
286 }
287 }
288}
289
290pub(crate) struct IoBufs {
291 pub config: RunningConfig,
292
293 pub iobuf: AtomicPtr<IoBuf>,
300
301 pub intervals: Mutex<StabilityIntervals>,
305 pub interval_updated: Condvar,
306
307 pub stable_lsn: AtomicLsn,
312 pub max_reserved_lsn: AtomicLsn,
313 pub max_header_stable_lsn: Arc<AtomicLsn>,
314 pub segment_accountant: Mutex<SegmentAccountant>,
315 pub segment_cleaner: SegmentCleaner,
316 deferred_segment_ops: stack::Stack<SegmentOp>,
317 #[cfg(feature = "io_uring")]
318 pub submission_mutex: Mutex<()>,
319 #[cfg(feature = "io_uring")]
320 pub io_uring: rio::Rio,
321}
322
323impl Drop for IoBufs {
324 fn drop(&mut self) {
325 let ptr = self.iobuf.swap(std::ptr::null_mut(), SeqCst);
326 assert!(!ptr.is_null());
327 unsafe {
328 Arc::from_raw(ptr);
329 }
330 }
331}
332
333impl IoBufs {
336 pub fn start(config: RunningConfig, snapshot: &Snapshot) -> Result<IoBufs> {
337 let segment_cleaner = SegmentCleaner::default();
338
339 let mut segment_accountant: SegmentAccountant =
340 SegmentAccountant::start(
341 config.clone(),
342 snapshot,
343 segment_cleaner.clone(),
344 )?;
345
346 let segment_size = config.segment_size;
347
348 let (recovered_lid, recovered_lsn) =
349 snapshot.recovered_coords(config.segment_size);
350
351 let (next_lid, next_lsn) = match (recovered_lid, recovered_lsn) {
352 (Some(next_lid), Some(next_lsn)) => {
353 debug!(
354 "starting log at recovered active \
355 offset {}, recovered lsn {}",
356 next_lid, next_lsn
357 );
358 (next_lid, next_lsn)
359 }
360 (None, None) => {
361 debug!("starting log for a totally fresh system");
362 let next_lsn = 0;
363 let next_lid = segment_accountant.next(next_lsn)?;
364 (next_lid, next_lsn)
365 }
366 (None, Some(next_lsn)) => {
367 let next_lid = segment_accountant.next(next_lsn)?;
368 debug!(
369 "starting log at clean offset {}, recovered lsn {}",
370 next_lid, next_lsn
371 );
372 (next_lid, next_lsn)
373 }
374 (Some(_), None) => unreachable!(),
375 };
376
377 assert!(next_lsn >= Lsn::try_from(next_lid).unwrap());
378
379 debug!(
380 "starting IoBufs with next_lsn: {} \
381 next_lid: {}",
382 next_lsn, next_lid
383 );
384
385 let stable = next_lsn - 1;
388
389 let base = assert_usize(next_lid % segment_size as LogOffset);
391
392 let mut iobuf = IoBuf {
393 buf: Arc::new(UnsafeCell::new(AlignedBuf::new(segment_size))),
394 header: CachePadded::new(AtomicU64::new(0)),
395 base,
396 offset: next_lid,
397 lsn: next_lsn,
398 capacity: segment_size - base,
399 stored_max_stable_lsn: -1,
400 };
401
402 if snapshot.active_segment.is_none() {
403 iobuf.store_segment_header(0, next_lsn, stable);
404 }
405
406 Ok(IoBufs {
407 config,
408
409 iobuf: AtomicPtr::new(Arc::into_raw(Arc::new(iobuf)) as *mut IoBuf),
410
411 intervals: Mutex::new(StabilityIntervals::new(stable)),
412 interval_updated: Condvar::new(),
413
414 stable_lsn: AtomicLsn::new(stable),
415 max_reserved_lsn: AtomicLsn::new(stable),
416 max_header_stable_lsn: Arc::new(AtomicLsn::new(next_lsn)),
417 segment_accountant: Mutex::new(segment_accountant),
418 segment_cleaner,
419 deferred_segment_ops: stack::Stack::default(),
420 #[cfg(feature = "io_uring")]
421 submission_mutex: Mutex::new(()),
422 #[cfg(feature = "io_uring")]
423 io_uring: rio::new()?,
424 })
425 }
426
427 pub(in crate::pagecache) fn sa_mark_link(
428 &self,
429 pid: PageId,
430 cache_info: CacheInfo,
431 guard: &Guard,
432 ) {
433 let op = SegmentOp::Link { pid, cache_info };
434 self.deferred_segment_ops.push(op, guard);
435 }
436
437 pub(in crate::pagecache) fn sa_mark_replace(
438 &self,
439 pid: PageId,
440 lsn: Lsn,
441 old_cache_infos: &[CacheInfo],
442 new_cache_info: CacheInfo,
443 guard: &Guard,
444 ) -> Result<()> {
445 debug_delay();
446 if let Some(mut sa) = self.segment_accountant.try_lock() {
447 let start = clock();
448 sa.mark_replace(pid, lsn, old_cache_infos, new_cache_info)?;
449 for op in self.deferred_segment_ops.take_iter(guard) {
450 sa.apply_op(op)?;
451 }
452 M.accountant_hold.measure(clock() - start);
453 } else {
454 let op = SegmentOp::Replace {
455 pid,
456 lsn,
457 old_cache_infos: old_cache_infos.to_vec(),
458 new_cache_info,
459 };
460 self.deferred_segment_ops.push(op, guard);
461 }
462 Ok(())
463 }
464
465 pub(in crate::pagecache) fn sa_stabilize(
466 &self,
467 lsn: Lsn,
468 guard: &Guard,
469 ) -> Result<()> {
470 self.with_sa(|sa| {
471 for op in self.deferred_segment_ops.take_iter(guard) {
472 sa.apply_op(op)?;
473 }
474 sa.stabilize(lsn)?;
475 Ok(())
476 })
477 }
478
479 pub(in crate::pagecache) fn with_sa<B, F>(&self, f: F) -> B
481 where
482 F: FnOnce(&mut SegmentAccountant) -> B,
483 {
484 let start = clock();
485
486 debug_delay();
487 let mut sa = self.segment_accountant.lock();
488
489 let locked_at = clock();
490
491 M.accountant_lock.measure(locked_at - start);
492
493 let ret = f(&mut sa);
494
495 drop(sa);
496
497 M.accountant_hold.measure(clock() - locked_at);
498
499 ret
500 }
501
502 pub(crate) fn iter_from(&self, lsn: Lsn) -> LogIter {
505 trace!("iterating from lsn {}", lsn);
506 let segments = self.with_sa(|sa| sa.segment_snapshot_iter_from(lsn));
507
508 LogIter {
509 config: self.config.clone(),
510 max_lsn: Some(self.stable()),
511 cur_lsn: None,
512 segment_base: None,
513 segments,
514 last_stage: false,
515 }
516 }
517
518 pub(in crate::pagecache) fn stable(&self) -> Lsn {
520 debug_delay();
521 self.stable_lsn.load(Acquire)
522 }
523
524 #[allow(clippy::mut_mut)]
526 pub(crate) fn encapsulate<T: Serialize + Debug>(
527 &self,
528 item: &T,
529 header: MessageHeader,
530 mut out_buf: &mut [u8],
531 blob_id: Option<Lsn>,
532 ) -> Result<()> {
533 let out_buf_ref: &mut &mut [u8] = &mut out_buf;
537 {
538 let _ = Measure::new(&M.serialize);
539 header.serialize_into(out_buf_ref);
540 }
541
542 if let Some(blob_id) = blob_id {
543 io_fail!(self, "blob blob write");
545 write_blob(&self.config, header.kind, blob_id, item)?;
546
547 let _ = Measure::new(&M.serialize);
548 blob_id.serialize_into(out_buf_ref);
549 } else {
550 let _ = Measure::new(&M.serialize);
551 item.serialize_into(out_buf_ref);
552 };
553
554 assert_eq!(
555 out_buf_ref.len(),
556 0,
557 "trying to serialize header {:?} \
558 and item {:?} but there were \
559 buffer leftovers at the end",
560 header,
561 item
562 );
563
564 Ok(())
565 }
566
567 pub(crate) fn write_to_log(&self, iobuf: &IoBuf) -> Result<()> {
570 let _measure = Measure::new(&M.write_to_log);
571 let header = iobuf.get_header();
572 let log_offset = iobuf.offset;
573 let base_lsn = iobuf.lsn;
574 let capacity = iobuf.capacity;
575
576 let segment_size = self.config.segment_size;
577
578 assert_eq!(
579 Lsn::try_from(log_offset % segment_size as LogOffset).unwrap(),
580 base_lsn % segment_size as Lsn
581 );
582
583 assert_ne!(
584 log_offset,
585 LogOffset::max_value(),
586 "created reservation for uninitialized slot",
587 );
588
589 assert!(header::is_sealed(header));
590
591 let bytes_to_write = header::offset(header);
592
593 trace!(
594 "write_to_log log_offset {} lsn {} len {}",
595 log_offset,
596 base_lsn,
597 bytes_to_write
598 );
599
600 let maxed = header::is_maxed(header);
601 let unused_space = capacity - bytes_to_write;
602 let should_pad = maxed && unused_space >= MAX_MSG_HEADER_LEN;
603
604 if should_pad {
607 let pad_len = unused_space - MAX_MSG_HEADER_LEN;
608 let data = iobuf.get_mut_range(bytes_to_write, unused_space);
609
610 let segment_number = SegmentNumber(
611 u64::try_from(base_lsn).unwrap()
612 / u64::try_from(self.config.segment_size).unwrap(),
613 );
614
615 let header = MessageHeader {
616 kind: MessageKind::Cap,
617 pid: PageId::max_value(),
618 segment_number,
619 len: u64::try_from(pad_len).unwrap(),
620 crc32: 0,
621 };
622
623 let header_bytes = header.serialize();
624
625 let padding_bytes = vec![
628 MessageKind::Corrupted.into();
629 unused_space - header_bytes.len()
630 ];
631
632 #[allow(unsafe_code)]
633 unsafe {
634 std::ptr::copy_nonoverlapping(
635 header_bytes.as_ptr(),
636 data.as_mut_ptr(),
637 header_bytes.len(),
638 );
639 std::ptr::copy_nonoverlapping(
640 padding_bytes.as_ptr(),
641 data.as_mut_ptr().add(header_bytes.len()),
642 padding_bytes.len(),
643 );
644 }
645
646 let crc32_arr = u32_to_arr(calculate_message_crc32(
648 &header_bytes,
649 &padding_bytes[..pad_len],
650 ));
651
652 #[allow(unsafe_code)]
653 unsafe {
654 std::ptr::copy_nonoverlapping(
655 crc32_arr.as_ptr(),
656 data.as_mut_ptr(),
658 std::mem::size_of::<u32>(),
659 );
660 }
661 } else if maxed {
662 let data = iobuf.get_mut_range(bytes_to_write, unused_space);
664
665 #[allow(unsafe_code)]
666 unsafe {
667 std::ptr::write_bytes(
669 data.as_mut_ptr(),
670 MessageKind::Corrupted.into(),
671 unused_space,
672 );
673 }
674 }
675
676 let total_len = if maxed { capacity } else { bytes_to_write };
677
678 let data = iobuf.get_mut_range(0, total_len);
679 let stored_max_stable_lsn = iobuf.stored_max_stable_lsn;
680
681 io_fail!(self, "buffer write");
682 #[cfg(feature = "io_uring")]
683 {
684 let mut wrote = 0;
685 while wrote < total_len {
686 let to_write = &data[wrote..];
687 let offset = log_offset + wrote as u64;
688
689 let link_mu = self.submission_mutex.lock();
697
698 let wrote_completion = self.io_uring.write_at_ordered(
704 &*self.config.file,
705 &to_write,
706 offset,
707 rio::Ordering::Link,
708 );
709
710 let sync_completion = self.io_uring.sync_file_range(
711 &*self.config.file,
712 offset,
713 to_write.len(),
714 );
715
716 sync_completion.wait()?;
717
718 drop(link_mu);
723
724 wrote += wrote_completion.wait()?;
725 }
726 }
727 #[cfg(not(feature = "io_uring"))]
728 {
729 let f = &self.config.file;
730 pwrite_all(f, data, log_offset)?;
731 if !self.config.temporary {
732 #[cfg(target_os = "linux")]
733 {
734 use std::os::unix::io::AsRawFd;
735 let ret = unsafe {
736 libc::sync_file_range(
737 f.as_raw_fd(),
738 i64::try_from(log_offset).unwrap(),
739 i64::try_from(total_len).unwrap(),
740 libc::SYNC_FILE_RANGE_WAIT_BEFORE
741 | libc::SYNC_FILE_RANGE_WRITE
742 | libc::SYNC_FILE_RANGE_WAIT_AFTER,
743 )
744 };
745 if ret < 0 {
746 let err = std::io::Error::last_os_error();
747 if let Some(libc::ENOSYS) = err.raw_os_error() {
748 f.sync_all()?;
749 } else {
750 return Err(err.into());
751 }
752 }
753 }
754
755 #[cfg(not(target_os = "linux"))]
756 f.sync_all()?;
757 }
758 }
759 io_fail!(self, "buffer write post");
760
761 if total_len > 0 {
762 let complete_len = if maxed {
763 let lsn_idx = base_lsn / segment_size as Lsn;
764 let next_seg_beginning = (lsn_idx + 1) * segment_size as Lsn;
765 assert_usize(next_seg_beginning - base_lsn)
766 } else {
767 total_len
768 };
769
770 debug!(
771 "wrote lsns {}-{} to disk at offsets {}-{}, maxed {} complete_len {}",
772 base_lsn,
773 base_lsn + total_len as Lsn - 1,
774 log_offset,
775 log_offset + total_len as LogOffset - 1,
776 maxed,
777 complete_len
778 );
779 self.mark_interval(base_lsn, complete_len);
780 }
781
782 M.written_bytes.measure(total_len as u64);
783
784 let guard = pin();
789 let max_header_stable_lsn = self.max_header_stable_lsn.clone();
790 guard.defer(move || {
791 trace!("bumping atomic header lsn to {}", stored_max_stable_lsn);
792 bump_atomic_lsn(&max_header_stable_lsn, stored_max_stable_lsn)
793 });
794
795 guard.flush();
796
797 let current_max_header_stable_lsn =
798 self.max_header_stable_lsn.load(Acquire);
799
800 self.sa_stabilize(current_max_header_stable_lsn, &guard)
801 }
802
803 fn mark_interval(&self, whence: Lsn, len: usize) {
811 debug!("mark_interval({}, {})", whence, len);
812 assert!(
813 len > 0,
814 "mark_interval called with an empty length at {}",
815 whence
816 );
817 let mut intervals = self.intervals.lock();
818
819 let interval = (whence, whence + len as Lsn - 1);
820
821 let updated = intervals.mark_fsync(interval);
822
823 if let Some(new_stable_lsn) = updated {
824 trace!("mark_interval new highest lsn {}", new_stable_lsn);
825 self.stable_lsn.store(new_stable_lsn, SeqCst);
826
827 #[cfg(feature = "event_log")]
828 {
829 self.config.event_log.stabilized_lsn(new_stable_lsn + 1);
835 }
836
837 drop(intervals);
840 }
841 let _notified = self.interval_updated.notify_all();
842 }
843
844 pub(in crate::pagecache) fn current_iobuf(&self) -> Arc<IoBuf> {
845 let arc = unsafe { Arc::from_raw(self.iobuf.load(Acquire)) };
850 #[allow(clippy::mem_forget)]
851 std::mem::forget(arc.clone());
852 arc
853 }
854}
855
856pub(crate) fn roll_iobuf(iobufs: &Arc<IoBufs>) -> Result<usize> {
857 let iobuf = iobufs.current_iobuf();
858 let header = iobuf.get_header();
859 if header::is_sealed(header) {
860 trace!("skipping roll_iobuf due to already-sealed header");
861 return Ok(0);
862 }
863 if header::offset(header) == 0 {
864 trace!("skipping roll_iobuf due to empty segment");
865 } else {
866 trace!("sealing ioubuf from roll_iobuf");
867 maybe_seal_and_write_iobuf(iobufs, &iobuf, header, false)?;
868 }
869
870 Ok(header::offset(header))
871}
872
873pub(in crate::pagecache) fn make_stable(
878 iobufs: &Arc<IoBufs>,
879 lsn: Lsn,
880) -> Result<usize> {
881 make_stable_inner(iobufs, lsn, false)
882}
883
884pub(in crate::pagecache) fn make_durable(
894 iobufs: &Arc<IoBufs>,
895 lsn: Lsn,
896) -> Result<usize> {
897 make_stable_inner(iobufs, lsn, true)
898}
899
900pub(in crate::pagecache) fn make_stable_inner(
901 iobufs: &Arc<IoBufs>,
902 lsn: Lsn,
903 partial_durability: bool,
904) -> Result<usize> {
905 let _measure = Measure::new(&M.make_stable);
906
907 let first_stable = iobufs.stable();
909 if first_stable >= lsn {
910 return Ok(0);
911 }
912
913 let mut stable = first_stable;
914
915 while stable < lsn {
916 if let Err(e) = iobufs.config.global_error() {
917 let intervals = iobufs.intervals.lock();
918
919 drop(intervals);
922
923 let _notified = iobufs.interval_updated.notify_all();
924 return Err(e);
925 }
926
927 let iobuf = iobufs.current_iobuf();
928 let header = iobuf.get_header();
929 if header::offset(header) == 0
930 || header::is_sealed(header)
931 || iobuf.lsn > lsn
932 {
933 } else {
936 maybe_seal_and_write_iobuf(iobufs, &iobuf, header, false)?;
937 stable = iobufs.stable();
938 continue;
943 }
944
945 let mut waiter = iobufs.intervals.lock();
947
948 if let Err(e) = iobufs.config.global_error() {
950 drop(waiter);
953
954 let _notified = iobufs.interval_updated.notify_all();
955 return Err(e);
956 }
957
958 stable = iobufs.stable();
959
960 if partial_durability {
961 if waiter.stable_lsn > lsn {
962 return Ok(assert_usize(stable - first_stable));
963 }
964
965 for (low, high) in &waiter.fsynced_ranges {
966 if *low <= lsn && *high > lsn {
967 return Ok(assert_usize(stable - first_stable));
968 }
969 }
970 }
971
972 if stable < lsn {
973 trace!("waiting on cond var for make_stable({})", lsn);
974
975 if cfg!(feature = "event_log") {
976 let timeout = iobufs
977 .interval_updated
978 .wait_for(&mut waiter, std::time::Duration::from_secs(30));
979 if timeout.timed_out() {
980 fn tn() -> String {
981 std::thread::current()
982 .name()
983 .unwrap_or("unknown")
984 .to_owned()
985 }
986 panic!(
987 "{} failed to make_stable after 30 seconds. \
988 waiting to stabilize lsn {}, current stable {} \
989 intervals: {:?}",
990 tn(),
991 lsn,
992 iobufs.stable(),
993 waiter
994 );
995 }
996 } else {
997 iobufs.interval_updated.wait(&mut waiter);
998 }
999 } else {
1000 debug!("make_stable({}) returning", lsn);
1001 break;
1002 }
1003 }
1004
1005 Ok(assert_usize(stable - first_stable))
1006}
1007
1008pub(in crate::pagecache) fn flush(iobufs: &Arc<IoBufs>) -> Result<usize> {
1012 let _cc = concurrency_control::read();
1013 let max_reserved_lsn = iobufs.max_reserved_lsn.load(Acquire);
1014 make_stable(iobufs, max_reserved_lsn)
1015}
1016
1017pub(in crate::pagecache) fn maybe_seal_and_write_iobuf(
1021 iobufs: &Arc<IoBufs>,
1022 iobuf: &Arc<IoBuf>,
1023 header: Header,
1024 from_reserve: bool,
1025) -> Result<()> {
1026 if header::is_sealed(header) {
1027 return Ok(());
1029 }
1030
1031 let lid = iobuf.offset;
1034 let lsn = iobuf.lsn;
1035 let capacity = iobuf.capacity;
1036 let segment_size = iobufs.config.segment_size;
1037
1038 if header::offset(header) > capacity {
1039 return Ok(());
1041 }
1042
1043 let res_len = header::offset(header);
1044 let maxed = from_reserve || capacity - res_len < MAX_MSG_HEADER_LEN;
1045 let sealed = if maxed {
1046 trace!("setting maxed to true for iobuf with lsn {}", lsn);
1047 header::mk_maxed(header::mk_sealed(header))
1048 } else {
1049 header::mk_sealed(header)
1050 };
1051
1052 let worked = iobuf.cas_header(header, sealed).is_ok();
1053 if !worked {
1054 return Ok(());
1055 }
1056
1057 trace!("sealed iobuf with lsn {}", lsn);
1058
1059 assert!(
1060 capacity + SEG_HEADER_LEN >= res_len,
1061 "res_len of {} higher than buffer capacity {}",
1062 res_len,
1063 capacity
1064 );
1065
1066 assert_ne!(
1067 lid,
1068 LogOffset::max_value(),
1069 "sealing something that should never have \
1070 been claimed (iobuf lsn {})\n{:?}",
1071 lsn,
1072 iobufs
1073 );
1074
1075 let mut next_lsn = lsn;
1077
1078 let measure_assign_offset = Measure::new(&M.assign_offset);
1079
1080 let next_offset = if maxed {
1081 let lsn_idx = lsn / segment_size as Lsn;
1083 next_lsn = (lsn_idx + 1) * segment_size as Lsn;
1084
1085 debug!(
1087 "rolling to new segment after clearing {}-{}",
1088 lid,
1089 lid + res_len as LogOffset,
1090 );
1091
1092 match iobufs.with_sa(|sa| sa.next(next_lsn)) {
1093 Ok(ret) => ret,
1094 Err(e) => {
1095 iobufs.config.set_global_error(e.clone());
1096 let intervals = iobufs.intervals.lock();
1097
1098 drop(intervals);
1101
1102 let _notified = iobufs.interval_updated.notify_all();
1103 return Err(e);
1104 }
1105 }
1106 } else {
1107 debug!(
1108 "advancing offset within the current segment from {} to {}",
1109 lid,
1110 lid + res_len as LogOffset
1111 );
1112 next_lsn += res_len as Lsn;
1113
1114 lid + res_len as LogOffset
1115 };
1116
1117 let next_iobuf = if maxed {
1122 let mut next_iobuf = IoBuf {
1123 buf: Arc::new(UnsafeCell::new(AlignedBuf::new(segment_size))),
1124 header: CachePadded::new(AtomicU64::new(0)),
1125 base: 0,
1126 offset: next_offset,
1127 lsn: next_lsn,
1128 capacity: segment_size,
1129 stored_max_stable_lsn: -1,
1130 };
1131
1132 next_iobuf.store_segment_header(sealed, next_lsn, iobufs.stable());
1133
1134 next_iobuf
1135 } else {
1136 let new_cap = capacity - res_len;
1137 assert_ne!(new_cap, 0);
1138 let last_salt = header::salt(sealed);
1139 let new_salt = header::bump_salt(last_salt);
1140
1141 IoBuf {
1142 buf: iobuf.buf.clone(),
1144 header: CachePadded::new(AtomicU64::new(new_salt)),
1145 base: iobuf.base + res_len,
1146 offset: next_offset,
1147 lsn: next_lsn,
1148 capacity: new_cap,
1149 stored_max_stable_lsn: -1,
1150 }
1151 };
1152
1153 debug_delay();
1157 let intervals = iobufs.intervals.lock();
1158 let old_ptr = iobufs
1159 .iobuf
1160 .swap(Arc::into_raw(Arc::new(next_iobuf)) as *mut IoBuf, SeqCst);
1161
1162 let old_arc = unsafe { Arc::from_raw(old_ptr) };
1163
1164 pin().defer(move || drop(old_arc));
1165
1166 drop(intervals);
1169
1170 let _notified = iobufs.interval_updated.notify_all();
1171
1172 drop(measure_assign_offset);
1173
1174 if header::n_writers(sealed) == 0 {
1176 iobufs.config.global_error()?;
1177 trace!(
1178 "asynchronously writing iobuf with lsn {} to log from maybe_seal",
1179 lsn
1180 );
1181 let iobufs = iobufs.clone();
1182 let iobuf = iobuf.clone();
1183 let _result = threadpool::spawn(move || {
1184 if let Err(e) = iobufs.write_to_log(&iobuf) {
1185 error!(
1186 "hit error while writing iobuf with lsn {}: {:?}",
1187 lsn, e
1188 );
1189
1190 iobufs.config.set_global_error(e);
1193
1194 let intervals = iobufs.intervals.lock();
1195
1196 drop(intervals);
1199
1200 let _notified = iobufs.interval_updated.notify_all();
1201 }
1202 })?;
1203
1204 #[cfg(feature = "event_log")]
1205 _result.wait();
1206
1207 Ok(())
1208 } else {
1209 Ok(())
1210 }
1211}
1212
1213impl Debug for IoBufs {
1214 fn fmt(
1215 &self,
1216 formatter: &mut fmt::Formatter<'_>,
1217 ) -> std::result::Result<(), fmt::Error> {
1218 formatter.write_fmt(format_args!("IoBufs {{ buf: {:?} }}", self.iobuf))
1219 }
1220}
1221
1222impl Debug for IoBuf {
1223 fn fmt(
1224 &self,
1225 formatter: &mut fmt::Formatter<'_>,
1226 ) -> std::result::Result<(), fmt::Error> {
1227 let header = self.get_header();
1228 formatter.write_fmt(format_args!(
1229 "\n\tIoBuf {{ lid: {}, n_writers: {}, offset: \
1230 {}, sealed: {} }}",
1231 self.offset,
1232 header::n_writers(header),
1233 header::offset(header),
1234 header::is_sealed(header)
1235 ))
1236 }
1237}