1#![allow(clippy::mut_mut)]
2use std::{
3 convert::{TryFrom, TryInto},
4 iter::FromIterator,
5 marker::PhantomData,
6 num::NonZeroU64,
7};
8
9use crate::{
10 node::{Index, Leaf},
11 pagecache::{
12 BatchManifest, MessageHeader, PageState, SegmentNumber, Snapshot,
13 },
14 Data, DiskPtr, Error, IVec, Link, Meta, Node, Result,
15};
16
17pub trait Serialize: Sized {
19 fn serialized_size(&self) -> u64;
22
23 fn serialize_into(&self, buf: &mut &mut [u8]);
29
30 fn deserialize(buf: &mut &[u8]) -> Result<Self>;
32
33 fn serialize(&self) -> Vec<u8> {
35 let sz = self.serialized_size();
36 let mut buf = vec![0; usize::try_from(sz).unwrap()];
37 self.serialize_into(&mut buf.as_mut_slice());
38 buf
39 }
40}
41
42#[allow(unsafe_code)]
48fn scoot(buf: &mut &mut [u8], amount: usize) {
49 assert!(buf.len() >= amount);
50 let len = buf.len();
51 let ptr = buf.as_mut_ptr();
52 let new_len = len - amount;
53
54 unsafe {
55 let new_ptr = ptr.add(amount);
56 *buf = std::slice::from_raw_parts_mut(new_ptr, new_len);
57 }
58}
59
60impl Serialize for BatchManifest {
61 fn serialized_size(&self) -> u64 {
62 8
63 }
64
65 fn serialize_into(&self, buf: &mut &mut [u8]) {
66 buf[..8].copy_from_slice(&self.0.to_le_bytes());
67 scoot(buf, 8);
68 }
69
70 fn deserialize(buf: &mut &[u8]) -> Result<Self> {
71 if buf.len() < 8 {
72 return Err(Error::corruption(None));
73 }
74
75 let array = buf[..8].try_into().unwrap();
76 *buf = &buf[8..];
77 Ok(BatchManifest(i64::from_le_bytes(array)))
78 }
79}
80
81impl Serialize for () {
82 fn serialized_size(&self) -> u64 {
83 0
84 }
85
86 fn serialize_into(&self, _: &mut &mut [u8]) {}
87
88 fn deserialize(_: &mut &[u8]) -> Result<()> {
89 Ok(())
90 }
91}
92
93impl Serialize for MessageHeader {
94 fn serialized_size(&self) -> u64 {
95 1 + 4
96 + self.segment_number.serialized_size()
97 + self.pid.serialized_size()
98 + self.len.serialized_size()
99 }
100
101 fn serialize_into(&self, buf: &mut &mut [u8]) {
102 self.crc32.serialize_into(buf);
103 self.kind.into().serialize_into(buf);
104 self.len.serialize_into(buf);
105 self.segment_number.serialize_into(buf);
106 self.pid.serialize_into(buf);
107 }
108
109 fn deserialize(buf: &mut &[u8]) -> Result<MessageHeader> {
110 Ok(MessageHeader {
111 crc32: u32::deserialize(buf)?,
112 kind: u8::deserialize(buf)?.into(),
113 len: u64::deserialize(buf)?,
114 segment_number: SegmentNumber(u64::deserialize(buf)?),
115 pid: u64::deserialize(buf)?,
116 })
117 }
118}
119
120impl Serialize for IVec {
121 fn serialized_size(&self) -> u64 {
122 let len = self.len() as u64;
123 len + len.serialized_size()
124 }
125
126 fn serialize_into(&self, buf: &mut &mut [u8]) {
127 (self.len() as u64).serialize_into(buf);
128 buf[..self.len()].copy_from_slice(self.as_ref());
129 scoot(buf, self.len());
130 }
131
132 fn deserialize(buf: &mut &[u8]) -> Result<IVec> {
133 let k_len = usize::try_from(u64::deserialize(buf)?)
134 .expect("should never store items that rust can't natively index");
135 let ret = &buf[..k_len];
136 *buf = &buf[k_len..];
137 Ok(ret.into())
138 }
139}
140
141impl Serialize for u64 {
142 fn serialized_size(&self) -> u64 {
143 if *self <= 240 {
144 1
145 } else if *self <= 2287 {
146 2
147 } else if *self <= 67823 {
148 3
149 } else if *self <= 0x00FF_FFFF {
150 4
151 } else if *self <= 0xFFFF_FFFF {
152 5
153 } else if *self <= 0x00FF_FFFF_FFFF {
154 6
155 } else if *self <= 0xFFFF_FFFF_FFFF {
156 7
157 } else if *self <= 0x00FF_FFFF_FFFF_FFFF {
158 8
159 } else {
160 9
161 }
162 }
163
164 fn serialize_into(&self, buf: &mut &mut [u8]) {
165 let sz = if *self <= 240 {
166 buf[0] = u8::try_from(*self).unwrap();
167 1
168 } else if *self <= 2287 {
169 buf[0] = u8::try_from((*self - 240) / 256 + 241).unwrap();
170 buf[1] = u8::try_from((*self - 240) % 256).unwrap();
171 2
172 } else if *self <= 67823 {
173 buf[0] = 249;
174 buf[1] = u8::try_from((*self - 2288) / 256).unwrap();
175 buf[2] = u8::try_from((*self - 2288) % 256).unwrap();
176 3
177 } else if *self <= 0x00FF_FFFF {
178 buf[0] = 250;
179 let bytes = self.to_le_bytes();
180 buf[1..4].copy_from_slice(&bytes[..3]);
181 4
182 } else if *self <= 0xFFFF_FFFF {
183 buf[0] = 251;
184 let bytes = self.to_le_bytes();
185 buf[1..5].copy_from_slice(&bytes[..4]);
186 5
187 } else if *self <= 0x00FF_FFFF_FFFF {
188 buf[0] = 252;
189 let bytes = self.to_le_bytes();
190 buf[1..6].copy_from_slice(&bytes[..5]);
191 6
192 } else if *self <= 0xFFFF_FFFF_FFFF {
193 buf[0] = 253;
194 let bytes = self.to_le_bytes();
195 buf[1..7].copy_from_slice(&bytes[..6]);
196 7
197 } else if *self <= 0x00FF_FFFF_FFFF_FFFF {
198 buf[0] = 254;
199 let bytes = self.to_le_bytes();
200 buf[1..8].copy_from_slice(&bytes[..7]);
201 8
202 } else {
203 buf[0] = 255;
204 let bytes = self.to_le_bytes();
205 buf[1..9].copy_from_slice(&bytes[..8]);
206 9
207 };
208
209 scoot(buf, sz);
210 }
211
212 fn deserialize(buf: &mut &[u8]) -> Result<Self> {
213 if buf.is_empty() {
214 return Err(Error::corruption(None));
215 }
216 let (res, scoot) = match buf[0] {
217 0..=240 => (u64::from(buf[0]), 1),
218 241..=248 => {
219 (240 + 256 * (u64::from(buf[0]) - 241) + u64::from(buf[1]), 2)
220 }
221 249 => (2288 + 256 * u64::from(buf[1]) + u64::from(buf[2]), 3),
222 other => {
223 let sz = other as usize - 247;
224 let mut aligned = [0; 8];
225 aligned[..sz].copy_from_slice(&buf[1..=sz]);
226 (u64::from_le_bytes(aligned), sz + 1)
227 }
228 };
229 *buf = &buf[scoot..];
230 Ok(res)
231 }
232}
233
234impl Serialize for i64 {
235 fn serialized_size(&self) -> u64 {
236 8
237 }
238
239 fn serialize_into(&self, buf: &mut &mut [u8]) {
240 buf[..8].copy_from_slice(&self.to_le_bytes());
241 scoot(buf, 8);
242 }
243
244 fn deserialize(buf: &mut &[u8]) -> Result<Self> {
245 if buf.len() < 8 {
246 return Err(Error::corruption(None));
247 }
248
249 let array = buf[..8].try_into().unwrap();
250 *buf = &buf[8..];
251 Ok(i64::from_le_bytes(array))
252 }
253}
254
255impl Serialize for u32 {
256 fn serialized_size(&self) -> u64 {
257 4
258 }
259
260 fn serialize_into(&self, buf: &mut &mut [u8]) {
261 buf[..4].copy_from_slice(&self.to_le_bytes());
262 scoot(buf, 4);
263 }
264
265 fn deserialize(buf: &mut &[u8]) -> Result<Self> {
266 if buf.len() < 4 {
267 return Err(Error::corruption(None));
268 }
269
270 let array = buf[..4].try_into().unwrap();
271 *buf = &buf[4..];
272 Ok(u32::from_le_bytes(array))
273 }
274}
275
276impl Serialize for bool {
277 fn serialized_size(&self) -> u64 {
278 1
279 }
280
281 fn serialize_into(&self, buf: &mut &mut [u8]) {
282 let byte = u8::from(*self);
283 buf[0] = byte;
284 scoot(buf, 1);
285 }
286
287 fn deserialize(buf: &mut &[u8]) -> Result<bool> {
288 if buf.is_empty() {
289 return Err(Error::corruption(None));
290 }
291 let value = buf[0] != 0;
292 *buf = &buf[1..];
293 Ok(value)
294 }
295}
296
297impl Serialize for u8 {
298 fn serialized_size(&self) -> u64 {
299 1
300 }
301
302 fn serialize_into(&self, buf: &mut &mut [u8]) {
303 buf[0] = *self;
304 scoot(buf, 1);
305 }
306
307 fn deserialize(buf: &mut &[u8]) -> Result<u8> {
308 if buf.is_empty() {
309 return Err(Error::corruption(None));
310 }
311 let value = buf[0];
312 *buf = &buf[1..];
313 Ok(value)
314 }
315}
316
317impl Serialize for Meta {
318 fn serialized_size(&self) -> u64 {
319 self.inner
320 .iter()
321 .map(|(k, v)| {
322 (k.len() as u64).serialized_size()
323 + u64::try_from(k.len()).unwrap()
324 + v.serialized_size()
325 })
326 .sum()
327 }
328
329 fn serialize_into(&self, buf: &mut &mut [u8]) {
330 serialize_2tuple_sequence(self.inner.iter(), buf);
331 }
332
333 fn deserialize(buf: &mut &[u8]) -> Result<Self> {
334 Ok(Meta { inner: deserialize_sequence(buf)? })
335 }
336}
337
338impl Serialize for Link {
339 fn serialized_size(&self) -> u64 {
340 match self {
341 Link::Set(key, value) => {
342 1 + (key.len() as u64).serialized_size()
343 + (value.len() as u64).serialized_size()
344 + u64::try_from(key.len()).unwrap()
345 + u64::try_from(value.len()).unwrap()
346 }
347 Link::Del(key) => {
348 1 + (key.len() as u64).serialized_size()
349 + u64::try_from(key.len()).unwrap()
350 }
351 Link::ParentMergeIntention(a) => 1 + a.serialized_size(),
352 Link::ParentMergeConfirm | Link::ChildMergeCap => 1,
353 }
354 }
355
356 fn serialize_into(&self, buf: &mut &mut [u8]) {
357 match self {
358 Link::Set(key, value) => {
359 0_u8.serialize_into(buf);
360 key.serialize_into(buf);
361 value.serialize_into(buf);
362 }
363 Link::Del(key) => {
364 1_u8.serialize_into(buf);
365 key.serialize_into(buf);
366 }
367 Link::ParentMergeIntention(pid) => {
368 2_u8.serialize_into(buf);
369 pid.serialize_into(buf);
370 }
371 Link::ParentMergeConfirm => {
372 3_u8.serialize_into(buf);
373 }
374 Link::ChildMergeCap => {
375 4_u8.serialize_into(buf);
376 }
377 }
378 }
379
380 fn deserialize(buf: &mut &[u8]) -> Result<Self> {
381 if buf.is_empty() {
382 return Err(Error::corruption(None));
383 }
384 let discriminant = buf[0];
385 *buf = &buf[1..];
386 Ok(match discriminant {
387 0 => Link::Set(IVec::deserialize(buf)?, IVec::deserialize(buf)?),
388 1 => Link::Del(IVec::deserialize(buf)?),
389 2 => Link::ParentMergeIntention(u64::deserialize(buf)?),
390 3 => Link::ParentMergeConfirm,
391 4 => Link::ChildMergeCap,
392 _ => return Err(Error::corruption(None)),
393 })
394 }
395}
396
397fn shift_u64_opt(value: &Option<u64>) -> u64 {
398 value.map(|s| s + 1).unwrap_or(0)
399}
400
401impl Serialize for Option<u64> {
402 fn serialized_size(&self) -> u64 {
403 shift_u64_opt(self).serialized_size()
404 }
405 fn serialize_into(&self, buf: &mut &mut [u8]) {
406 shift_u64_opt(self).serialize_into(buf)
407 }
408 fn deserialize(buf: &mut &[u8]) -> Result<Self> {
409 let shifted = u64::deserialize(buf)?;
410 let unshifted = if shifted == 0 { None } else { Some(shifted - 1) };
411 Ok(unshifted)
412 }
413}
414
415impl Serialize for Option<NonZeroU64> {
416 fn serialized_size(&self) -> u64 {
417 (self.map(NonZeroU64::get).unwrap_or(0)).serialized_size()
418 }
419
420 fn serialize_into(&self, buf: &mut &mut [u8]) {
421 (self.map(NonZeroU64::get).unwrap_or(0)).serialize_into(buf)
422 }
423
424 fn deserialize(buf: &mut &[u8]) -> Result<Self> {
425 let underlying = u64::deserialize(buf)?;
426 Ok(if underlying == 0 {
427 None
428 } else {
429 Some(NonZeroU64::new(underlying).unwrap())
430 })
431 }
432}
433
434impl Serialize for Node {
435 fn serialized_size(&self) -> u64 {
436 2 + self.next.serialized_size()
437 + self.merging_child.serialized_size()
438 + self.lo.serialized_size()
439 + self.hi.serialized_size()
440 + self.data.serialized_size()
441 }
442
443 fn serialize_into(&self, buf: &mut &mut [u8]) {
444 self.next.serialize_into(buf);
445 self.merging_child.serialize_into(buf);
446 self.merging.serialize_into(buf);
447 self.prefix_len.serialize_into(buf);
448 self.lo.serialize_into(buf);
449 self.hi.serialize_into(buf);
450 self.data.serialize_into(buf);
451 }
452
453 fn deserialize(buf: &mut &[u8]) -> Result<Self> {
454 Ok(Node {
455 next: Serialize::deserialize(buf)?,
456 merging_child: Serialize::deserialize(buf)?,
457 merging: bool::deserialize(buf)?,
458 prefix_len: u8::deserialize(buf)?,
459 lo: IVec::deserialize(buf)?,
460 hi: IVec::deserialize(buf)?,
461 data: Data::deserialize(buf)?,
462 })
463 }
464}
465
466impl Serialize for Option<i64> {
467 fn serialized_size(&self) -> u64 {
468 shift_i64_opt(self).serialized_size()
469 }
470 fn serialize_into(&self, buf: &mut &mut [u8]) {
471 shift_i64_opt(self).serialize_into(buf)
472 }
473 fn deserialize(buf: &mut &[u8]) -> Result<Self> {
474 Ok(unshift_i64_opt(i64::deserialize(buf)?))
475 }
476}
477
478fn shift_i64_opt(value_opt: &Option<i64>) -> i64 {
479 if let Some(value) = value_opt {
480 if value.signum() == -1 {
481 *value
482 } else {
483 value + 1
484 }
485 } else {
486 0
487 }
488}
489
490fn unshift_i64_opt(value: i64) -> Option<i64> {
491 if value == 0 {
492 None
493 } else if value.signum() == -1 {
494 Some(value)
495 } else {
496 Some(value - 1)
497 }
498}
499
500impl Serialize for Snapshot {
501 fn serialized_size(&self) -> u64 {
502 self.stable_lsn.serialized_size()
503 + self.active_segment.serialized_size()
504 + self
505 .pt
506 .iter()
507 .map(|page_state| page_state.serialized_size())
508 .sum::<u64>()
509 }
510
511 fn serialize_into(&self, buf: &mut &mut [u8]) {
512 self.stable_lsn.serialize_into(buf);
513 self.active_segment.serialize_into(buf);
514 for page_state in &self.pt {
515 page_state.serialize_into(buf);
516 }
517 }
518
519 fn deserialize(buf: &mut &[u8]) -> Result<Self> {
520 Ok(Snapshot {
521 stable_lsn: Serialize::deserialize(buf)?,
522 active_segment: Serialize::deserialize(buf)?,
523 pt: deserialize_sequence(buf)?,
524 })
525 }
526}
527
528impl Serialize for Data {
529 fn serialized_size(&self) -> u64 {
530 match self {
531 Data::Leaf(ref leaf) => {
532 1_u64
533 + (leaf.keys.len() as u64).serialized_size()
534 + leaf
535 .keys
536 .iter()
537 .enumerate()
538 .map(|(idx, k)| {
539 let v = &leaf.values[idx];
540 (k.len() as u64).serialized_size()
541 + (v.len() as u64).serialized_size()
542 + k.len() as u64
543 + v.len() as u64
544 })
545 .sum::<u64>()
546 }
547 Data::Index(ref index) => {
548 1_u64
549 + (index.keys.len() as u64).serialized_size()
550 + index
551 .keys
552 .iter()
553 .enumerate()
554 .map(|(idx, k)| {
555 let v = index.pointers[idx];
556 (k.len() as u64).serialized_size()
557 + v.serialized_size()
558 + k.len() as u64
559 })
560 .sum::<u64>()
561 }
562 }
563 }
564
565 fn serialize_into(&self, buf: &mut &mut [u8]) {
566 match self {
567 Data::Leaf(leaf) => {
568 0_u8.serialize_into(buf);
569 (leaf.keys.len() as u64).serialize_into(buf);
570 for key in &leaf.keys {
571 key.serialize_into(buf);
572 }
573 for value in &leaf.values {
574 value.serialize_into(buf);
575 }
576 }
577 Data::Index(index) => {
578 1_u8.serialize_into(buf);
579 (index.keys.len() as u64).serialize_into(buf);
580 for key in &index.keys {
581 key.serialize_into(buf);
582 }
583 for value in &index.pointers {
584 value.serialize_into(buf);
585 }
586 }
587 }
588 }
589
590 fn deserialize(buf: &mut &[u8]) -> Result<Data> {
591 if buf.is_empty() {
592 return Err(Error::corruption(None));
593 }
594 let discriminant = buf[0];
595 *buf = &buf[1..];
596 let len = usize::try_from(u64::deserialize(buf)?).unwrap();
597 Ok(match discriminant {
598 0 => Data::Leaf(Leaf {
599 keys: deserialize_bounded_sequence(buf, len)?,
600 values: deserialize_bounded_sequence(buf, len)?,
601 }),
602 1 => Data::Index(Index {
603 keys: deserialize_bounded_sequence(buf, len)?,
604 pointers: deserialize_bounded_sequence(buf, len)?,
605 }),
606 _ => return Err(Error::corruption(None)),
607 })
608 }
609}
610
611impl Serialize for DiskPtr {
612 fn serialized_size(&self) -> u64 {
613 match self {
614 DiskPtr::Inline(a) => 1 + a.serialized_size(),
615 DiskPtr::Blob(a, b) => {
616 1 + a.serialized_size() + b.serialized_size()
617 }
618 }
619 }
620
621 fn serialize_into(&self, buf: &mut &mut [u8]) {
622 match self {
623 DiskPtr::Inline(log_offset) => {
624 0_u8.serialize_into(buf);
625 log_offset.serialize_into(buf);
626 }
627 DiskPtr::Blob(log_offset, blob_lsn) => {
628 1_u8.serialize_into(buf);
629 log_offset.serialize_into(buf);
630 blob_lsn.serialize_into(buf);
631 }
632 }
633 }
634
635 fn deserialize(buf: &mut &[u8]) -> Result<DiskPtr> {
636 if buf.len() < 2 {
637 return Err(Error::corruption(None));
638 }
639 let discriminant = buf[0];
640 *buf = &buf[1..];
641 Ok(match discriminant {
642 0 => DiskPtr::Inline(u64::deserialize(buf)?),
643 1 => DiskPtr::Blob(u64::deserialize(buf)?, i64::deserialize(buf)?),
644 _ => return Err(Error::corruption(None)),
645 })
646 }
647}
648
649impl Serialize for PageState {
650 fn serialized_size(&self) -> u64 {
651 match self {
652 PageState::Free(a, disk_ptr) => {
653 1 + a.serialized_size() + disk_ptr.serialized_size()
654 }
655 PageState::Present { base, frags } => {
656 1 + base.serialized_size()
657 + frags
658 .iter()
659 .map(|tuple| tuple.serialized_size())
660 .sum::<u64>()
661 }
662 _ => panic!("tried to serialize {:?}", self),
663 }
664 }
665
666 fn serialize_into(&self, buf: &mut &mut [u8]) {
667 match self {
668 PageState::Free(lsn, disk_ptr) => {
669 0_u8.serialize_into(buf);
670 lsn.serialize_into(buf);
671 disk_ptr.serialize_into(buf);
672 }
673 PageState::Present { base, frags } => {
674 let frags_len: u8 = 1 + u8::try_from(frags.len())
675 .expect("should never have more than 255 frags");
676 frags_len.serialize_into(buf);
677 base.serialize_into(buf);
678 serialize_3tuple_ref_sequence(frags.iter(), buf);
679 }
680 _ => panic!("tried to serialize {:?}", self),
681 }
682 }
683
684 fn deserialize(buf: &mut &[u8]) -> Result<PageState> {
685 if buf.is_empty() {
686 return Err(Error::corruption(None));
687 }
688 let discriminant = buf[0];
689 *buf = &buf[1..];
690 Ok(match discriminant {
691 0 => PageState::Free(
692 i64::deserialize(buf)?,
693 DiskPtr::deserialize(buf)?,
694 ),
695 len => PageState::Present {
696 base: Serialize::deserialize(buf)?,
697 frags: deserialize_bounded_sequence(buf, usize::from(len - 1))?,
698 },
699 })
700 }
701}
702
703impl<A: Serialize, B: Serialize> Serialize for (A, B) {
704 fn serialized_size(&self) -> u64 {
705 self.0.serialized_size() + self.1.serialized_size()
706 }
707
708 fn serialize_into(&self, buf: &mut &mut [u8]) {
709 self.0.serialize_into(buf);
710 self.1.serialize_into(buf);
711 }
712
713 fn deserialize(buf: &mut &[u8]) -> Result<(A, B)> {
714 let a = A::deserialize(buf)?;
715 let b = B::deserialize(buf)?;
716 Ok((a, b))
717 }
718}
719
720impl<A: Serialize, B: Serialize, C: Serialize> Serialize for (A, B, C) {
721 fn serialized_size(&self) -> u64 {
722 self.0.serialized_size()
723 + self.1.serialized_size()
724 + self.2.serialized_size()
725 }
726
727 fn serialize_into(&self, buf: &mut &mut [u8]) {
728 self.0.serialize_into(buf);
729 self.1.serialize_into(buf);
730 self.2.serialize_into(buf);
731 }
732
733 fn deserialize(buf: &mut &[u8]) -> Result<(A, B, C)> {
734 let a = A::deserialize(buf)?;
735 let b = B::deserialize(buf)?;
736 let c = C::deserialize(buf)?;
737 Ok((a, b, c))
738 }
739}
740
741fn serialize_2tuple_sequence<'a, XS, A, B>(xs: XS, buf: &mut &mut [u8])
742where
743 XS: Iterator<Item = (&'a A, &'a B)>,
744 A: Serialize + 'a,
745 B: Serialize + 'a,
746{
747 for item in xs {
748 item.0.serialize_into(buf);
749 item.1.serialize_into(buf);
750 }
751}
752
753fn serialize_3tuple_ref_sequence<'a, XS, A, B, C>(xs: XS, buf: &mut &mut [u8])
754where
755 XS: Iterator<Item = &'a (A, B, C)>,
756 A: Serialize + 'a,
757 B: Serialize + 'a,
758 C: Serialize + 'a,
759{
760 for item in xs {
761 item.0.serialize_into(buf);
762 item.1.serialize_into(buf);
763 item.2.serialize_into(buf);
764 }
765}
766
767struct ConsumeSequence<'a, 'b, T> {
768 buf: &'a mut &'b [u8],
769 _t: PhantomData<T>,
770 done: bool,
771}
772
773fn deserialize_sequence<T: Serialize, R>(buf: &mut &[u8]) -> Result<R>
774where
775 R: FromIterator<T>,
776{
777 let iter = ConsumeSequence { buf, _t: PhantomData, done: false };
778 iter.collect()
779}
780
781fn deserialize_bounded_sequence<T: Serialize, R>(
782 buf: &mut &[u8],
783 bound: usize,
784) -> Result<R>
785where
786 R: FromIterator<T>,
787{
788 let iter = ConsumeSequence { buf, _t: PhantomData, done: false };
789 iter.take(bound).collect()
790}
791
792impl<'a, 'b, T> Iterator for ConsumeSequence<'a, 'b, T>
793where
794 T: Serialize,
795{
796 type Item = Result<T>;
797
798 fn next(&mut self) -> Option<Result<T>> {
799 if self.done || self.buf.is_empty() {
800 return None;
801 }
802 let item_res = T::deserialize(&mut self.buf);
803 if item_res.is_err() {
804 self.done = true;
805 }
806 Some(item_res)
807 }
808}
809
810#[cfg(test)]
811mod qc {
812 use quickcheck::{Arbitrary, Gen};
813 use rand::Rng;
814
815 use super::*;
816 use crate::pagecache::MessageKind;
817
818 impl Arbitrary for MessageHeader {
819 fn arbitrary<G: Gen>(g: &mut G) -> MessageHeader {
820 MessageHeader {
821 crc32: g.gen(),
822 len: g.gen(),
823 kind: MessageKind::arbitrary(g),
824 segment_number: SegmentNumber(SpreadU64::arbitrary(g).0),
825 pid: g.gen(),
826 }
827 }
828 }
829
830 impl Arbitrary for MessageKind {
831 fn arbitrary<G: Gen>(g: &mut G) -> MessageKind {
832 g.gen_range(0, 12).into()
833 }
834 }
835
836 impl Arbitrary for Data {
837 fn arbitrary<G: Gen>(g: &mut G) -> Data {
838 if g.gen() {
839 let keys = Arbitrary::arbitrary(g);
840 let mut values = vec![];
841 for _ in &keys {
842 values.push(Arbitrary::arbitrary(g))
843 }
844 Data::Index(Index { keys, pointers: values })
845 } else {
846 let keys = Arbitrary::arbitrary(g);
847 let mut values = vec![];
848 for _ in &keys {
849 values.push(Arbitrary::arbitrary(g))
850 }
851 Data::Leaf(Leaf { keys, values })
852 }
853 }
854
855 fn shrink(&self) -> Box<dyn Iterator<Item = Data>> {
856 match self {
857 Data::Index(ref index) => {
858 let index = index.clone();
859 Box::new(index.keys.shrink().map(move |keys| {
860 Data::Index(Index {
861 pointers: index
862 .pointers
863 .iter()
864 .take(keys.len())
865 .copied()
866 .collect(),
867 keys,
868 })
869 }))
870 }
871 Data::Leaf(ref leaf) => {
872 let leaf = leaf.clone();
873 Box::new(leaf.keys.shrink().map(move |keys| {
874 Data::Leaf(Leaf {
875 values: leaf
876 .values
877 .iter()
878 .take(keys.len())
879 .cloned()
880 .collect(),
881 keys,
882 })
883 }))
884 }
885 }
886 }
887 }
888
889 impl Arbitrary for Node {
890 fn arbitrary<G: Gen>(g: &mut G) -> Node {
891 let next: Option<NonZeroU64> = Arbitrary::arbitrary(g);
892
893 let merging_child: Option<NonZeroU64> = Arbitrary::arbitrary(g);
894
895 Node {
896 next,
897 merging_child,
898 merging: bool::arbitrary(g),
899 prefix_len: u8::arbitrary(g),
900 lo: IVec::arbitrary(g),
901 hi: IVec::arbitrary(g),
902 data: Data::arbitrary(g),
903 }
904 }
905
906 fn shrink(&self) -> Box<dyn Iterator<Item = Node>> {
907 let data_shrinker = self.data.shrink().map({
908 let node = self.clone();
909 move |data| Node { data, ..node.clone() }
910 });
911
912 let hi_shrinker = self.hi.shrink().map({
913 let node = self.clone();
914 move |hi| Node { hi, ..node.clone() }
915 });
916
917 let lo_shrinker = self.lo.shrink().map({
918 let node = self.clone();
919 move |lo| Node { lo, ..node.clone() }
920 });
921
922 Box::new(data_shrinker.chain(hi_shrinker).chain(lo_shrinker))
923 }
924 }
925
926 impl Arbitrary for Link {
927 fn arbitrary<G: Gen>(g: &mut G) -> Link {
928 let discriminant = g.gen_range(0, 5);
929 match discriminant {
930 0 => Link::Set(IVec::arbitrary(g), IVec::arbitrary(g)),
931 1 => Link::Del(IVec::arbitrary(g)),
932 2 => Link::ParentMergeIntention(u64::arbitrary(g)),
933 3 => Link::ParentMergeConfirm,
934 4 => Link::ChildMergeCap,
935 _ => unreachable!("invalid choice"),
936 }
937 }
938 }
939
940 impl Arbitrary for IVec {
941 fn arbitrary<G: Gen>(g: &mut G) -> IVec {
942 let v: Vec<u8> = Arbitrary::arbitrary(g);
943 v.into()
944 }
945
946 fn shrink(&self) -> Box<dyn Iterator<Item = IVec>> {
947 let v: Vec<u8> = self.to_vec();
948 Box::new(v.shrink().map(IVec::from))
949 }
950 }
951
952 impl Arbitrary for Meta {
953 fn arbitrary<G: Gen>(g: &mut G) -> Meta {
954 Meta { inner: Arbitrary::arbitrary(g) }
955 }
956
957 fn shrink(&self) -> Box<dyn Iterator<Item = Meta>> {
958 Box::new(self.inner.shrink().map(|inner| Meta { inner }))
959 }
960 }
961
962 impl Arbitrary for DiskPtr {
963 fn arbitrary<G: Gen>(g: &mut G) -> DiskPtr {
964 if g.gen() {
965 DiskPtr::Inline(g.gen())
966 } else {
967 DiskPtr::Blob(g.gen(), g.gen())
968 }
969 }
970 }
971
972 impl Arbitrary for PageState {
973 fn arbitrary<G: Gen>(g: &mut G) -> PageState {
974 if g.gen() {
975 let n = g.gen_range(0, 255);
979
980 let base = (g.gen(), DiskPtr::arbitrary(g), g.gen());
981 let frags = (0..n)
982 .map(|_| (g.gen(), DiskPtr::arbitrary(g), g.gen()))
983 .collect();
984 PageState::Present { base, frags }
985 } else {
986 PageState::Free(g.gen(), DiskPtr::arbitrary(g))
987 }
988 }
989 }
990
991 impl Arbitrary for Snapshot {
992 fn arbitrary<G: Gen>(g: &mut G) -> Snapshot {
993 Snapshot {
994 stable_lsn: g.gen(),
995 active_segment: g.gen(),
996 pt: Arbitrary::arbitrary(g),
997 }
998 }
999 }
1000
1001 #[derive(Debug, Clone)]
1002 struct SpreadI64(i64);
1003
1004 impl Arbitrary for SpreadI64 {
1005 fn arbitrary<G: Gen>(g: &mut G) -> SpreadI64 {
1006 let uniform = g.gen::<i64>();
1007 let shift = g.gen_range(0, 64);
1008 SpreadI64(uniform >> shift)
1009 }
1010 }
1011
1012 #[derive(Debug, Clone)]
1013 struct SpreadU64(u64);
1014
1015 impl Arbitrary for SpreadU64 {
1016 fn arbitrary<G: Gen>(g: &mut G) -> SpreadU64 {
1017 let uniform = g.gen::<u64>();
1018 let shift = g.gen_range(0, 64);
1019 SpreadU64(uniform >> shift)
1020 }
1021 }
1022
1023 fn prop_serialize<T>(item: &T) -> bool
1024 where
1025 T: Serialize + PartialEq + Clone + std::fmt::Debug,
1026 {
1027 let mut buf = vec![0; usize::try_from(item.serialized_size()).unwrap()];
1028 let buf_ref = &mut buf.as_mut_slice();
1029 item.serialize_into(buf_ref);
1030 assert_eq!(
1031 buf_ref.len(),
1032 0,
1033 "round-trip failed to consume produced bytes"
1034 );
1035 assert_eq!(buf.len() as u64, item.serialized_size());
1036 let deserialized = T::deserialize(&mut buf.as_slice()).unwrap();
1037 if *item == deserialized {
1038 true
1039 } else {
1040 eprintln!(
1041 "round-trip serialization failed. original:\n\n{:?}\n\n \
1042 deserialized(serialized(original)):\n\n{:?}",
1043 item, deserialized
1044 );
1045 false
1046 }
1047 }
1048
1049 quickcheck::quickcheck! {
1050 #[cfg_attr(miri, ignore)]
1051 fn bool(item: bool) -> bool {
1052 prop_serialize(&item)
1053 }
1054
1055 #[cfg_attr(miri, ignore)]
1056 fn u8(item: u8) -> bool {
1057 prop_serialize(&item)
1058 }
1059
1060 #[cfg_attr(miri, ignore)]
1061 fn i64(item: SpreadI64) -> bool {
1062 prop_serialize(&item.0)
1063 }
1064
1065 #[cfg_attr(miri, ignore)]
1066 fn u64(item: SpreadU64) -> bool {
1067 prop_serialize(&item.0)
1068 }
1069
1070 #[cfg_attr(miri, ignore)]
1071 fn disk_ptr(item: DiskPtr) -> bool {
1072 prop_serialize(&item)
1073 }
1074
1075 #[cfg_attr(miri, ignore)]
1076 fn page_state(item: PageState) -> bool {
1077 prop_serialize(&item)
1078 }
1079
1080 #[cfg_attr(miri, ignore)]
1081 fn meta(item: Meta) -> bool {
1082 prop_serialize(&item)
1083 }
1084
1085 #[cfg_attr(miri, ignore)]
1086 fn snapshot(item: Snapshot) -> bool {
1087 prop_serialize(&item)
1088 }
1089
1090 #[cfg_attr(miri, ignore)]
1091 fn node(item: Node) -> bool {
1092 prop_serialize(&item)
1093 }
1094
1095 #[cfg_attr(miri, ignore)]
1096 fn data(item: Data) -> bool {
1097 prop_serialize(&item)
1098 }
1099
1100 #[cfg_attr(miri, ignore)]
1101 fn link(item: Link) -> bool {
1102 prop_serialize(&item)
1103 }
1104
1105 #[cfg_attr(miri, ignore)]
1106 fn msg_header(item: MessageHeader) -> bool {
1107 prop_serialize(&item)
1108 }
1109 }
1110
1111 #[test]
1112 fn debug_node() {
1113 let node = Node {
1116 next: None,
1117 lo: vec![].into(),
1118 hi: vec![].into(),
1119 merging_child: None,
1120 merging: true,
1121 prefix_len: 0,
1122 data: Data::Index(Index::default()),
1123 };
1124
1125 prop_serialize(&node);
1126 }
1127}