sled/
serialization.rs

1#![allow(clippy::mut_mut)]
2use std::{
3    convert::{TryFrom, TryInto},
4    iter::FromIterator,
5    marker::PhantomData,
6    num::NonZeroU64,
7};
8
9use crate::{
10    node::{Index, Leaf},
11    pagecache::{
12        BatchManifest, MessageHeader, PageState, SegmentNumber, Snapshot,
13    },
14    Data, DiskPtr, Error, IVec, Link, Meta, Node, Result,
15};
16
17/// Items that may be serialized and deserialized
18pub trait Serialize: Sized {
19    /// Returns the buffer size required to hold
20    /// the serialized bytes for this item.
21    fn serialized_size(&self) -> u64;
22
23    /// Serializees the item without allocating.
24    ///
25    /// # Panics
26    ///
27    /// Panics if the buffer is not large enough.
28    fn serialize_into(&self, buf: &mut &mut [u8]);
29
30    /// Attempts to deserialize this type from some bytes.
31    fn deserialize(buf: &mut &[u8]) -> Result<Self>;
32
33    /// Returns owned serialized bytes.
34    fn serialize(&self) -> Vec<u8> {
35        let sz = self.serialized_size();
36        let mut buf = vec![0; usize::try_from(sz).unwrap()];
37        self.serialize_into(&mut buf.as_mut_slice());
38        buf
39    }
40}
41
42// Moves a reference to mutable bytes forward,
43// sidestepping Rust's limitations in reasoning
44// about lifetimes.
45//
46// ☑ Checked with Miri by Tyler on 2019-12-12
47#[allow(unsafe_code)]
48fn scoot(buf: &mut &mut [u8], amount: usize) {
49    assert!(buf.len() >= amount);
50    let len = buf.len();
51    let ptr = buf.as_mut_ptr();
52    let new_len = len - amount;
53
54    unsafe {
55        let new_ptr = ptr.add(amount);
56        *buf = std::slice::from_raw_parts_mut(new_ptr, new_len);
57    }
58}
59
60impl Serialize for BatchManifest {
61    fn serialized_size(&self) -> u64 {
62        8
63    }
64
65    fn serialize_into(&self, buf: &mut &mut [u8]) {
66        buf[..8].copy_from_slice(&self.0.to_le_bytes());
67        scoot(buf, 8);
68    }
69
70    fn deserialize(buf: &mut &[u8]) -> Result<Self> {
71        if buf.len() < 8 {
72            return Err(Error::corruption(None));
73        }
74
75        let array = buf[..8].try_into().unwrap();
76        *buf = &buf[8..];
77        Ok(BatchManifest(i64::from_le_bytes(array)))
78    }
79}
80
81impl Serialize for () {
82    fn serialized_size(&self) -> u64 {
83        0
84    }
85
86    fn serialize_into(&self, _: &mut &mut [u8]) {}
87
88    fn deserialize(_: &mut &[u8]) -> Result<()> {
89        Ok(())
90    }
91}
92
93impl Serialize for MessageHeader {
94    fn serialized_size(&self) -> u64 {
95        1 + 4
96            + self.segment_number.serialized_size()
97            + self.pid.serialized_size()
98            + self.len.serialized_size()
99    }
100
101    fn serialize_into(&self, buf: &mut &mut [u8]) {
102        self.crc32.serialize_into(buf);
103        self.kind.into().serialize_into(buf);
104        self.len.serialize_into(buf);
105        self.segment_number.serialize_into(buf);
106        self.pid.serialize_into(buf);
107    }
108
109    fn deserialize(buf: &mut &[u8]) -> Result<MessageHeader> {
110        Ok(MessageHeader {
111            crc32: u32::deserialize(buf)?,
112            kind: u8::deserialize(buf)?.into(),
113            len: u64::deserialize(buf)?,
114            segment_number: SegmentNumber(u64::deserialize(buf)?),
115            pid: u64::deserialize(buf)?,
116        })
117    }
118}
119
120impl Serialize for IVec {
121    fn serialized_size(&self) -> u64 {
122        let len = self.len() as u64;
123        len + len.serialized_size()
124    }
125
126    fn serialize_into(&self, buf: &mut &mut [u8]) {
127        (self.len() as u64).serialize_into(buf);
128        buf[..self.len()].copy_from_slice(self.as_ref());
129        scoot(buf, self.len());
130    }
131
132    fn deserialize(buf: &mut &[u8]) -> Result<IVec> {
133        let k_len = usize::try_from(u64::deserialize(buf)?)
134            .expect("should never store items that rust can't natively index");
135        let ret = &buf[..k_len];
136        *buf = &buf[k_len..];
137        Ok(ret.into())
138    }
139}
140
141impl Serialize for u64 {
142    fn serialized_size(&self) -> u64 {
143        if *self <= 240 {
144            1
145        } else if *self <= 2287 {
146            2
147        } else if *self <= 67823 {
148            3
149        } else if *self <= 0x00FF_FFFF {
150            4
151        } else if *self <= 0xFFFF_FFFF {
152            5
153        } else if *self <= 0x00FF_FFFF_FFFF {
154            6
155        } else if *self <= 0xFFFF_FFFF_FFFF {
156            7
157        } else if *self <= 0x00FF_FFFF_FFFF_FFFF {
158            8
159        } else {
160            9
161        }
162    }
163
164    fn serialize_into(&self, buf: &mut &mut [u8]) {
165        let sz = if *self <= 240 {
166            buf[0] = u8::try_from(*self).unwrap();
167            1
168        } else if *self <= 2287 {
169            buf[0] = u8::try_from((*self - 240) / 256 + 241).unwrap();
170            buf[1] = u8::try_from((*self - 240) % 256).unwrap();
171            2
172        } else if *self <= 67823 {
173            buf[0] = 249;
174            buf[1] = u8::try_from((*self - 2288) / 256).unwrap();
175            buf[2] = u8::try_from((*self - 2288) % 256).unwrap();
176            3
177        } else if *self <= 0x00FF_FFFF {
178            buf[0] = 250;
179            let bytes = self.to_le_bytes();
180            buf[1..4].copy_from_slice(&bytes[..3]);
181            4
182        } else if *self <= 0xFFFF_FFFF {
183            buf[0] = 251;
184            let bytes = self.to_le_bytes();
185            buf[1..5].copy_from_slice(&bytes[..4]);
186            5
187        } else if *self <= 0x00FF_FFFF_FFFF {
188            buf[0] = 252;
189            let bytes = self.to_le_bytes();
190            buf[1..6].copy_from_slice(&bytes[..5]);
191            6
192        } else if *self <= 0xFFFF_FFFF_FFFF {
193            buf[0] = 253;
194            let bytes = self.to_le_bytes();
195            buf[1..7].copy_from_slice(&bytes[..6]);
196            7
197        } else if *self <= 0x00FF_FFFF_FFFF_FFFF {
198            buf[0] = 254;
199            let bytes = self.to_le_bytes();
200            buf[1..8].copy_from_slice(&bytes[..7]);
201            8
202        } else {
203            buf[0] = 255;
204            let bytes = self.to_le_bytes();
205            buf[1..9].copy_from_slice(&bytes[..8]);
206            9
207        };
208
209        scoot(buf, sz);
210    }
211
212    fn deserialize(buf: &mut &[u8]) -> Result<Self> {
213        if buf.is_empty() {
214            return Err(Error::corruption(None));
215        }
216        let (res, scoot) = match buf[0] {
217            0..=240 => (u64::from(buf[0]), 1),
218            241..=248 => {
219                (240 + 256 * (u64::from(buf[0]) - 241) + u64::from(buf[1]), 2)
220            }
221            249 => (2288 + 256 * u64::from(buf[1]) + u64::from(buf[2]), 3),
222            other => {
223                let sz = other as usize - 247;
224                let mut aligned = [0; 8];
225                aligned[..sz].copy_from_slice(&buf[1..=sz]);
226                (u64::from_le_bytes(aligned), sz + 1)
227            }
228        };
229        *buf = &buf[scoot..];
230        Ok(res)
231    }
232}
233
234impl Serialize for i64 {
235    fn serialized_size(&self) -> u64 {
236        8
237    }
238
239    fn serialize_into(&self, buf: &mut &mut [u8]) {
240        buf[..8].copy_from_slice(&self.to_le_bytes());
241        scoot(buf, 8);
242    }
243
244    fn deserialize(buf: &mut &[u8]) -> Result<Self> {
245        if buf.len() < 8 {
246            return Err(Error::corruption(None));
247        }
248
249        let array = buf[..8].try_into().unwrap();
250        *buf = &buf[8..];
251        Ok(i64::from_le_bytes(array))
252    }
253}
254
255impl Serialize for u32 {
256    fn serialized_size(&self) -> u64 {
257        4
258    }
259
260    fn serialize_into(&self, buf: &mut &mut [u8]) {
261        buf[..4].copy_from_slice(&self.to_le_bytes());
262        scoot(buf, 4);
263    }
264
265    fn deserialize(buf: &mut &[u8]) -> Result<Self> {
266        if buf.len() < 4 {
267            return Err(Error::corruption(None));
268        }
269
270        let array = buf[..4].try_into().unwrap();
271        *buf = &buf[4..];
272        Ok(u32::from_le_bytes(array))
273    }
274}
275
276impl Serialize for bool {
277    fn serialized_size(&self) -> u64 {
278        1
279    }
280
281    fn serialize_into(&self, buf: &mut &mut [u8]) {
282        let byte = u8::from(*self);
283        buf[0] = byte;
284        scoot(buf, 1);
285    }
286
287    fn deserialize(buf: &mut &[u8]) -> Result<bool> {
288        if buf.is_empty() {
289            return Err(Error::corruption(None));
290        }
291        let value = buf[0] != 0;
292        *buf = &buf[1..];
293        Ok(value)
294    }
295}
296
297impl Serialize for u8 {
298    fn serialized_size(&self) -> u64 {
299        1
300    }
301
302    fn serialize_into(&self, buf: &mut &mut [u8]) {
303        buf[0] = *self;
304        scoot(buf, 1);
305    }
306
307    fn deserialize(buf: &mut &[u8]) -> Result<u8> {
308        if buf.is_empty() {
309            return Err(Error::corruption(None));
310        }
311        let value = buf[0];
312        *buf = &buf[1..];
313        Ok(value)
314    }
315}
316
317impl Serialize for Meta {
318    fn serialized_size(&self) -> u64 {
319        self.inner
320            .iter()
321            .map(|(k, v)| {
322                (k.len() as u64).serialized_size()
323                    + u64::try_from(k.len()).unwrap()
324                    + v.serialized_size()
325            })
326            .sum()
327    }
328
329    fn serialize_into(&self, buf: &mut &mut [u8]) {
330        serialize_2tuple_sequence(self.inner.iter(), buf);
331    }
332
333    fn deserialize(buf: &mut &[u8]) -> Result<Self> {
334        Ok(Meta { inner: deserialize_sequence(buf)? })
335    }
336}
337
338impl Serialize for Link {
339    fn serialized_size(&self) -> u64 {
340        match self {
341            Link::Set(key, value) => {
342                1 + (key.len() as u64).serialized_size()
343                    + (value.len() as u64).serialized_size()
344                    + u64::try_from(key.len()).unwrap()
345                    + u64::try_from(value.len()).unwrap()
346            }
347            Link::Del(key) => {
348                1 + (key.len() as u64).serialized_size()
349                    + u64::try_from(key.len()).unwrap()
350            }
351            Link::ParentMergeIntention(a) => 1 + a.serialized_size(),
352            Link::ParentMergeConfirm | Link::ChildMergeCap => 1,
353        }
354    }
355
356    fn serialize_into(&self, buf: &mut &mut [u8]) {
357        match self {
358            Link::Set(key, value) => {
359                0_u8.serialize_into(buf);
360                key.serialize_into(buf);
361                value.serialize_into(buf);
362            }
363            Link::Del(key) => {
364                1_u8.serialize_into(buf);
365                key.serialize_into(buf);
366            }
367            Link::ParentMergeIntention(pid) => {
368                2_u8.serialize_into(buf);
369                pid.serialize_into(buf);
370            }
371            Link::ParentMergeConfirm => {
372                3_u8.serialize_into(buf);
373            }
374            Link::ChildMergeCap => {
375                4_u8.serialize_into(buf);
376            }
377        }
378    }
379
380    fn deserialize(buf: &mut &[u8]) -> Result<Self> {
381        if buf.is_empty() {
382            return Err(Error::corruption(None));
383        }
384        let discriminant = buf[0];
385        *buf = &buf[1..];
386        Ok(match discriminant {
387            0 => Link::Set(IVec::deserialize(buf)?, IVec::deserialize(buf)?),
388            1 => Link::Del(IVec::deserialize(buf)?),
389            2 => Link::ParentMergeIntention(u64::deserialize(buf)?),
390            3 => Link::ParentMergeConfirm,
391            4 => Link::ChildMergeCap,
392            _ => return Err(Error::corruption(None)),
393        })
394    }
395}
396
397fn shift_u64_opt(value: &Option<u64>) -> u64 {
398    value.map(|s| s + 1).unwrap_or(0)
399}
400
401impl Serialize for Option<u64> {
402    fn serialized_size(&self) -> u64 {
403        shift_u64_opt(self).serialized_size()
404    }
405    fn serialize_into(&self, buf: &mut &mut [u8]) {
406        shift_u64_opt(self).serialize_into(buf)
407    }
408    fn deserialize(buf: &mut &[u8]) -> Result<Self> {
409        let shifted = u64::deserialize(buf)?;
410        let unshifted = if shifted == 0 { None } else { Some(shifted - 1) };
411        Ok(unshifted)
412    }
413}
414
415impl Serialize for Option<NonZeroU64> {
416    fn serialized_size(&self) -> u64 {
417        (self.map(NonZeroU64::get).unwrap_or(0)).serialized_size()
418    }
419
420    fn serialize_into(&self, buf: &mut &mut [u8]) {
421        (self.map(NonZeroU64::get).unwrap_or(0)).serialize_into(buf)
422    }
423
424    fn deserialize(buf: &mut &[u8]) -> Result<Self> {
425        let underlying = u64::deserialize(buf)?;
426        Ok(if underlying == 0 {
427            None
428        } else {
429            Some(NonZeroU64::new(underlying).unwrap())
430        })
431    }
432}
433
434impl Serialize for Node {
435    fn serialized_size(&self) -> u64 {
436        2 + self.next.serialized_size()
437            + self.merging_child.serialized_size()
438            + self.lo.serialized_size()
439            + self.hi.serialized_size()
440            + self.data.serialized_size()
441    }
442
443    fn serialize_into(&self, buf: &mut &mut [u8]) {
444        self.next.serialize_into(buf);
445        self.merging_child.serialize_into(buf);
446        self.merging.serialize_into(buf);
447        self.prefix_len.serialize_into(buf);
448        self.lo.serialize_into(buf);
449        self.hi.serialize_into(buf);
450        self.data.serialize_into(buf);
451    }
452
453    fn deserialize(buf: &mut &[u8]) -> Result<Self> {
454        Ok(Node {
455            next: Serialize::deserialize(buf)?,
456            merging_child: Serialize::deserialize(buf)?,
457            merging: bool::deserialize(buf)?,
458            prefix_len: u8::deserialize(buf)?,
459            lo: IVec::deserialize(buf)?,
460            hi: IVec::deserialize(buf)?,
461            data: Data::deserialize(buf)?,
462        })
463    }
464}
465
466impl Serialize for Option<i64> {
467    fn serialized_size(&self) -> u64 {
468        shift_i64_opt(self).serialized_size()
469    }
470    fn serialize_into(&self, buf: &mut &mut [u8]) {
471        shift_i64_opt(self).serialize_into(buf)
472    }
473    fn deserialize(buf: &mut &[u8]) -> Result<Self> {
474        Ok(unshift_i64_opt(i64::deserialize(buf)?))
475    }
476}
477
478fn shift_i64_opt(value_opt: &Option<i64>) -> i64 {
479    if let Some(value) = value_opt {
480        if value.signum() == -1 {
481            *value
482        } else {
483            value + 1
484        }
485    } else {
486        0
487    }
488}
489
490fn unshift_i64_opt(value: i64) -> Option<i64> {
491    if value == 0 {
492        None
493    } else if value.signum() == -1 {
494        Some(value)
495    } else {
496        Some(value - 1)
497    }
498}
499
500impl Serialize for Snapshot {
501    fn serialized_size(&self) -> u64 {
502        self.stable_lsn.serialized_size()
503            + self.active_segment.serialized_size()
504            + self
505                .pt
506                .iter()
507                .map(|page_state| page_state.serialized_size())
508                .sum::<u64>()
509    }
510
511    fn serialize_into(&self, buf: &mut &mut [u8]) {
512        self.stable_lsn.serialize_into(buf);
513        self.active_segment.serialize_into(buf);
514        for page_state in &self.pt {
515            page_state.serialize_into(buf);
516        }
517    }
518
519    fn deserialize(buf: &mut &[u8]) -> Result<Self> {
520        Ok(Snapshot {
521            stable_lsn: Serialize::deserialize(buf)?,
522            active_segment: Serialize::deserialize(buf)?,
523            pt: deserialize_sequence(buf)?,
524        })
525    }
526}
527
528impl Serialize for Data {
529    fn serialized_size(&self) -> u64 {
530        match self {
531            Data::Leaf(ref leaf) => {
532                1_u64
533                    + (leaf.keys.len() as u64).serialized_size()
534                    + leaf
535                        .keys
536                        .iter()
537                        .enumerate()
538                        .map(|(idx, k)| {
539                            let v = &leaf.values[idx];
540                            (k.len() as u64).serialized_size()
541                                + (v.len() as u64).serialized_size()
542                                + k.len() as u64
543                                + v.len() as u64
544                        })
545                        .sum::<u64>()
546            }
547            Data::Index(ref index) => {
548                1_u64
549                    + (index.keys.len() as u64).serialized_size()
550                    + index
551                        .keys
552                        .iter()
553                        .enumerate()
554                        .map(|(idx, k)| {
555                            let v = index.pointers[idx];
556                            (k.len() as u64).serialized_size()
557                                + v.serialized_size()
558                                + k.len() as u64
559                        })
560                        .sum::<u64>()
561            }
562        }
563    }
564
565    fn serialize_into(&self, buf: &mut &mut [u8]) {
566        match self {
567            Data::Leaf(leaf) => {
568                0_u8.serialize_into(buf);
569                (leaf.keys.len() as u64).serialize_into(buf);
570                for key in &leaf.keys {
571                    key.serialize_into(buf);
572                }
573                for value in &leaf.values {
574                    value.serialize_into(buf);
575                }
576            }
577            Data::Index(index) => {
578                1_u8.serialize_into(buf);
579                (index.keys.len() as u64).serialize_into(buf);
580                for key in &index.keys {
581                    key.serialize_into(buf);
582                }
583                for value in &index.pointers {
584                    value.serialize_into(buf);
585                }
586            }
587        }
588    }
589
590    fn deserialize(buf: &mut &[u8]) -> Result<Data> {
591        if buf.is_empty() {
592            return Err(Error::corruption(None));
593        }
594        let discriminant = buf[0];
595        *buf = &buf[1..];
596        let len = usize::try_from(u64::deserialize(buf)?).unwrap();
597        Ok(match discriminant {
598            0 => Data::Leaf(Leaf {
599                keys: deserialize_bounded_sequence(buf, len)?,
600                values: deserialize_bounded_sequence(buf, len)?,
601            }),
602            1 => Data::Index(Index {
603                keys: deserialize_bounded_sequence(buf, len)?,
604                pointers: deserialize_bounded_sequence(buf, len)?,
605            }),
606            _ => return Err(Error::corruption(None)),
607        })
608    }
609}
610
611impl Serialize for DiskPtr {
612    fn serialized_size(&self) -> u64 {
613        match self {
614            DiskPtr::Inline(a) => 1 + a.serialized_size(),
615            DiskPtr::Blob(a, b) => {
616                1 + a.serialized_size() + b.serialized_size()
617            }
618        }
619    }
620
621    fn serialize_into(&self, buf: &mut &mut [u8]) {
622        match self {
623            DiskPtr::Inline(log_offset) => {
624                0_u8.serialize_into(buf);
625                log_offset.serialize_into(buf);
626            }
627            DiskPtr::Blob(log_offset, blob_lsn) => {
628                1_u8.serialize_into(buf);
629                log_offset.serialize_into(buf);
630                blob_lsn.serialize_into(buf);
631            }
632        }
633    }
634
635    fn deserialize(buf: &mut &[u8]) -> Result<DiskPtr> {
636        if buf.len() < 2 {
637            return Err(Error::corruption(None));
638        }
639        let discriminant = buf[0];
640        *buf = &buf[1..];
641        Ok(match discriminant {
642            0 => DiskPtr::Inline(u64::deserialize(buf)?),
643            1 => DiskPtr::Blob(u64::deserialize(buf)?, i64::deserialize(buf)?),
644            _ => return Err(Error::corruption(None)),
645        })
646    }
647}
648
649impl Serialize for PageState {
650    fn serialized_size(&self) -> u64 {
651        match self {
652            PageState::Free(a, disk_ptr) => {
653                1 + a.serialized_size() + disk_ptr.serialized_size()
654            }
655            PageState::Present { base, frags } => {
656                1 + base.serialized_size()
657                    + frags
658                        .iter()
659                        .map(|tuple| tuple.serialized_size())
660                        .sum::<u64>()
661            }
662            _ => panic!("tried to serialize {:?}", self),
663        }
664    }
665
666    fn serialize_into(&self, buf: &mut &mut [u8]) {
667        match self {
668            PageState::Free(lsn, disk_ptr) => {
669                0_u8.serialize_into(buf);
670                lsn.serialize_into(buf);
671                disk_ptr.serialize_into(buf);
672            }
673            PageState::Present { base, frags } => {
674                let frags_len: u8 = 1 + u8::try_from(frags.len())
675                    .expect("should never have more than 255 frags");
676                frags_len.serialize_into(buf);
677                base.serialize_into(buf);
678                serialize_3tuple_ref_sequence(frags.iter(), buf);
679            }
680            _ => panic!("tried to serialize {:?}", self),
681        }
682    }
683
684    fn deserialize(buf: &mut &[u8]) -> Result<PageState> {
685        if buf.is_empty() {
686            return Err(Error::corruption(None));
687        }
688        let discriminant = buf[0];
689        *buf = &buf[1..];
690        Ok(match discriminant {
691            0 => PageState::Free(
692                i64::deserialize(buf)?,
693                DiskPtr::deserialize(buf)?,
694            ),
695            len => PageState::Present {
696                base: Serialize::deserialize(buf)?,
697                frags: deserialize_bounded_sequence(buf, usize::from(len - 1))?,
698            },
699        })
700    }
701}
702
703impl<A: Serialize, B: Serialize> Serialize for (A, B) {
704    fn serialized_size(&self) -> u64 {
705        self.0.serialized_size() + self.1.serialized_size()
706    }
707
708    fn serialize_into(&self, buf: &mut &mut [u8]) {
709        self.0.serialize_into(buf);
710        self.1.serialize_into(buf);
711    }
712
713    fn deserialize(buf: &mut &[u8]) -> Result<(A, B)> {
714        let a = A::deserialize(buf)?;
715        let b = B::deserialize(buf)?;
716        Ok((a, b))
717    }
718}
719
720impl<A: Serialize, B: Serialize, C: Serialize> Serialize for (A, B, C) {
721    fn serialized_size(&self) -> u64 {
722        self.0.serialized_size()
723            + self.1.serialized_size()
724            + self.2.serialized_size()
725    }
726
727    fn serialize_into(&self, buf: &mut &mut [u8]) {
728        self.0.serialize_into(buf);
729        self.1.serialize_into(buf);
730        self.2.serialize_into(buf);
731    }
732
733    fn deserialize(buf: &mut &[u8]) -> Result<(A, B, C)> {
734        let a = A::deserialize(buf)?;
735        let b = B::deserialize(buf)?;
736        let c = C::deserialize(buf)?;
737        Ok((a, b, c))
738    }
739}
740
741fn serialize_2tuple_sequence<'a, XS, A, B>(xs: XS, buf: &mut &mut [u8])
742where
743    XS: Iterator<Item = (&'a A, &'a B)>,
744    A: Serialize + 'a,
745    B: Serialize + 'a,
746{
747    for item in xs {
748        item.0.serialize_into(buf);
749        item.1.serialize_into(buf);
750    }
751}
752
753fn serialize_3tuple_ref_sequence<'a, XS, A, B, C>(xs: XS, buf: &mut &mut [u8])
754where
755    XS: Iterator<Item = &'a (A, B, C)>,
756    A: Serialize + 'a,
757    B: Serialize + 'a,
758    C: Serialize + 'a,
759{
760    for item in xs {
761        item.0.serialize_into(buf);
762        item.1.serialize_into(buf);
763        item.2.serialize_into(buf);
764    }
765}
766
767struct ConsumeSequence<'a, 'b, T> {
768    buf: &'a mut &'b [u8],
769    _t: PhantomData<T>,
770    done: bool,
771}
772
773fn deserialize_sequence<T: Serialize, R>(buf: &mut &[u8]) -> Result<R>
774where
775    R: FromIterator<T>,
776{
777    let iter = ConsumeSequence { buf, _t: PhantomData, done: false };
778    iter.collect()
779}
780
781fn deserialize_bounded_sequence<T: Serialize, R>(
782    buf: &mut &[u8],
783    bound: usize,
784) -> Result<R>
785where
786    R: FromIterator<T>,
787{
788    let iter = ConsumeSequence { buf, _t: PhantomData, done: false };
789    iter.take(bound).collect()
790}
791
792impl<'a, 'b, T> Iterator for ConsumeSequence<'a, 'b, T>
793where
794    T: Serialize,
795{
796    type Item = Result<T>;
797
798    fn next(&mut self) -> Option<Result<T>> {
799        if self.done || self.buf.is_empty() {
800            return None;
801        }
802        let item_res = T::deserialize(&mut self.buf);
803        if item_res.is_err() {
804            self.done = true;
805        }
806        Some(item_res)
807    }
808}
809
810#[cfg(test)]
811mod qc {
812    use quickcheck::{Arbitrary, Gen};
813    use rand::Rng;
814
815    use super::*;
816    use crate::pagecache::MessageKind;
817
818    impl Arbitrary for MessageHeader {
819        fn arbitrary<G: Gen>(g: &mut G) -> MessageHeader {
820            MessageHeader {
821                crc32: g.gen(),
822                len: g.gen(),
823                kind: MessageKind::arbitrary(g),
824                segment_number: SegmentNumber(SpreadU64::arbitrary(g).0),
825                pid: g.gen(),
826            }
827        }
828    }
829
830    impl Arbitrary for MessageKind {
831        fn arbitrary<G: Gen>(g: &mut G) -> MessageKind {
832            g.gen_range(0, 12).into()
833        }
834    }
835
836    impl Arbitrary for Data {
837        fn arbitrary<G: Gen>(g: &mut G) -> Data {
838            if g.gen() {
839                let keys = Arbitrary::arbitrary(g);
840                let mut values = vec![];
841                for _ in &keys {
842                    values.push(Arbitrary::arbitrary(g))
843                }
844                Data::Index(Index { keys, pointers: values })
845            } else {
846                let keys = Arbitrary::arbitrary(g);
847                let mut values = vec![];
848                for _ in &keys {
849                    values.push(Arbitrary::arbitrary(g))
850                }
851                Data::Leaf(Leaf { keys, values })
852            }
853        }
854
855        fn shrink(&self) -> Box<dyn Iterator<Item = Data>> {
856            match self {
857                Data::Index(ref index) => {
858                    let index = index.clone();
859                    Box::new(index.keys.shrink().map(move |keys| {
860                        Data::Index(Index {
861                            pointers: index
862                                .pointers
863                                .iter()
864                                .take(keys.len())
865                                .copied()
866                                .collect(),
867                            keys,
868                        })
869                    }))
870                }
871                Data::Leaf(ref leaf) => {
872                    let leaf = leaf.clone();
873                    Box::new(leaf.keys.shrink().map(move |keys| {
874                        Data::Leaf(Leaf {
875                            values: leaf
876                                .values
877                                .iter()
878                                .take(keys.len())
879                                .cloned()
880                                .collect(),
881                            keys,
882                        })
883                    }))
884                }
885            }
886        }
887    }
888
889    impl Arbitrary for Node {
890        fn arbitrary<G: Gen>(g: &mut G) -> Node {
891            let next: Option<NonZeroU64> = Arbitrary::arbitrary(g);
892
893            let merging_child: Option<NonZeroU64> = Arbitrary::arbitrary(g);
894
895            Node {
896                next,
897                merging_child,
898                merging: bool::arbitrary(g),
899                prefix_len: u8::arbitrary(g),
900                lo: IVec::arbitrary(g),
901                hi: IVec::arbitrary(g),
902                data: Data::arbitrary(g),
903            }
904        }
905
906        fn shrink(&self) -> Box<dyn Iterator<Item = Node>> {
907            let data_shrinker = self.data.shrink().map({
908                let node = self.clone();
909                move |data| Node { data, ..node.clone() }
910            });
911
912            let hi_shrinker = self.hi.shrink().map({
913                let node = self.clone();
914                move |hi| Node { hi, ..node.clone() }
915            });
916
917            let lo_shrinker = self.lo.shrink().map({
918                let node = self.clone();
919                move |lo| Node { lo, ..node.clone() }
920            });
921
922            Box::new(data_shrinker.chain(hi_shrinker).chain(lo_shrinker))
923        }
924    }
925
926    impl Arbitrary for Link {
927        fn arbitrary<G: Gen>(g: &mut G) -> Link {
928            let discriminant = g.gen_range(0, 5);
929            match discriminant {
930                0 => Link::Set(IVec::arbitrary(g), IVec::arbitrary(g)),
931                1 => Link::Del(IVec::arbitrary(g)),
932                2 => Link::ParentMergeIntention(u64::arbitrary(g)),
933                3 => Link::ParentMergeConfirm,
934                4 => Link::ChildMergeCap,
935                _ => unreachable!("invalid choice"),
936            }
937        }
938    }
939
940    impl Arbitrary for IVec {
941        fn arbitrary<G: Gen>(g: &mut G) -> IVec {
942            let v: Vec<u8> = Arbitrary::arbitrary(g);
943            v.into()
944        }
945
946        fn shrink(&self) -> Box<dyn Iterator<Item = IVec>> {
947            let v: Vec<u8> = self.to_vec();
948            Box::new(v.shrink().map(IVec::from))
949        }
950    }
951
952    impl Arbitrary for Meta {
953        fn arbitrary<G: Gen>(g: &mut G) -> Meta {
954            Meta { inner: Arbitrary::arbitrary(g) }
955        }
956
957        fn shrink(&self) -> Box<dyn Iterator<Item = Meta>> {
958            Box::new(self.inner.shrink().map(|inner| Meta { inner }))
959        }
960    }
961
962    impl Arbitrary for DiskPtr {
963        fn arbitrary<G: Gen>(g: &mut G) -> DiskPtr {
964            if g.gen() {
965                DiskPtr::Inline(g.gen())
966            } else {
967                DiskPtr::Blob(g.gen(), g.gen())
968            }
969        }
970    }
971
972    impl Arbitrary for PageState {
973        fn arbitrary<G: Gen>(g: &mut G) -> PageState {
974            if g.gen() {
975                // don't generate 255 because we add 1 to this
976                // number in PageState::serialize_into to account
977                // for the base fragment
978                let n = g.gen_range(0, 255);
979
980                let base = (g.gen(), DiskPtr::arbitrary(g), g.gen());
981                let frags = (0..n)
982                    .map(|_| (g.gen(), DiskPtr::arbitrary(g), g.gen()))
983                    .collect();
984                PageState::Present { base, frags }
985            } else {
986                PageState::Free(g.gen(), DiskPtr::arbitrary(g))
987            }
988        }
989    }
990
991    impl Arbitrary for Snapshot {
992        fn arbitrary<G: Gen>(g: &mut G) -> Snapshot {
993            Snapshot {
994                stable_lsn: g.gen(),
995                active_segment: g.gen(),
996                pt: Arbitrary::arbitrary(g),
997            }
998        }
999    }
1000
1001    #[derive(Debug, Clone)]
1002    struct SpreadI64(i64);
1003
1004    impl Arbitrary for SpreadI64 {
1005        fn arbitrary<G: Gen>(g: &mut G) -> SpreadI64 {
1006            let uniform = g.gen::<i64>();
1007            let shift = g.gen_range(0, 64);
1008            SpreadI64(uniform >> shift)
1009        }
1010    }
1011
1012    #[derive(Debug, Clone)]
1013    struct SpreadU64(u64);
1014
1015    impl Arbitrary for SpreadU64 {
1016        fn arbitrary<G: Gen>(g: &mut G) -> SpreadU64 {
1017            let uniform = g.gen::<u64>();
1018            let shift = g.gen_range(0, 64);
1019            SpreadU64(uniform >> shift)
1020        }
1021    }
1022
1023    fn prop_serialize<T>(item: &T) -> bool
1024    where
1025        T: Serialize + PartialEq + Clone + std::fmt::Debug,
1026    {
1027        let mut buf = vec![0; usize::try_from(item.serialized_size()).unwrap()];
1028        let buf_ref = &mut buf.as_mut_slice();
1029        item.serialize_into(buf_ref);
1030        assert_eq!(
1031            buf_ref.len(),
1032            0,
1033            "round-trip failed to consume produced bytes"
1034        );
1035        assert_eq!(buf.len() as u64, item.serialized_size());
1036        let deserialized = T::deserialize(&mut buf.as_slice()).unwrap();
1037        if *item == deserialized {
1038            true
1039        } else {
1040            eprintln!(
1041                "round-trip serialization failed. original:\n\n{:?}\n\n \
1042                 deserialized(serialized(original)):\n\n{:?}",
1043                item, deserialized
1044            );
1045            false
1046        }
1047    }
1048
1049    quickcheck::quickcheck! {
1050        #[cfg_attr(miri, ignore)]
1051        fn bool(item: bool) -> bool {
1052            prop_serialize(&item)
1053        }
1054
1055        #[cfg_attr(miri, ignore)]
1056        fn u8(item: u8) -> bool {
1057            prop_serialize(&item)
1058        }
1059
1060        #[cfg_attr(miri, ignore)]
1061        fn i64(item: SpreadI64) -> bool {
1062            prop_serialize(&item.0)
1063        }
1064
1065        #[cfg_attr(miri, ignore)]
1066        fn u64(item: SpreadU64) -> bool {
1067            prop_serialize(&item.0)
1068        }
1069
1070        #[cfg_attr(miri, ignore)]
1071        fn disk_ptr(item: DiskPtr) -> bool {
1072            prop_serialize(&item)
1073        }
1074
1075        #[cfg_attr(miri, ignore)]
1076        fn page_state(item: PageState) -> bool {
1077            prop_serialize(&item)
1078        }
1079
1080        #[cfg_attr(miri, ignore)]
1081        fn meta(item: Meta) -> bool {
1082            prop_serialize(&item)
1083        }
1084
1085        #[cfg_attr(miri, ignore)]
1086        fn snapshot(item: Snapshot) -> bool {
1087            prop_serialize(&item)
1088        }
1089
1090        #[cfg_attr(miri, ignore)]
1091        fn node(item: Node) -> bool {
1092            prop_serialize(&item)
1093        }
1094
1095        #[cfg_attr(miri, ignore)]
1096        fn data(item: Data) -> bool {
1097            prop_serialize(&item)
1098        }
1099
1100        #[cfg_attr(miri, ignore)]
1101        fn link(item: Link) -> bool {
1102            prop_serialize(&item)
1103        }
1104
1105        #[cfg_attr(miri, ignore)]
1106        fn msg_header(item: MessageHeader) -> bool {
1107            prop_serialize(&item)
1108        }
1109    }
1110
1111    #[test]
1112    fn debug_node() {
1113        // color_backtrace::install();
1114
1115        let node = Node {
1116            next: None,
1117            lo: vec![].into(),
1118            hi: vec![].into(),
1119            merging_child: None,
1120            merging: true,
1121            prefix_len: 0,
1122            data: Data::Index(Index::default()),
1123        };
1124
1125        prop_serialize(&node);
1126    }
1127}