sled/
iter.rs

1use std::ops::{Bound, Deref};
2
3use crate::{Measure, M};
4
5use super::*;
6
7#[cfg(any(test, feature = "lock_free_delays"))]
8const MAX_LOOPS: usize = usize::max_value();
9
10#[cfg(not(any(test, feature = "lock_free_delays")))]
11const MAX_LOOPS: usize = 1_000_000;
12
13fn possible_predecessor(s: &[u8]) -> Option<Vec<u8>> {
14    let mut ret = s.to_vec();
15    match ret.pop() {
16        None => None,
17        Some(i) if i == 0 => Some(ret),
18        Some(i) => {
19            ret.push(i - 1);
20            ret.extend_from_slice(&[255; 4]);
21            Some(ret)
22        }
23    }
24}
25
26macro_rules! iter_try {
27    ($e:expr) => {
28        match $e {
29            Ok(item) => item,
30            Err(e) => return Some(Err(e)),
31        }
32    };
33}
34
35/// An iterator over keys and values in a `Tree`.
36pub struct Iter {
37    pub(super) tree: Tree,
38    pub(super) hi: Bound<IVec>,
39    pub(super) lo: Bound<IVec>,
40    pub(super) cached_node: Option<(PageId, Node)>,
41    pub(super) going_forward: bool,
42}
43
44impl Iter {
45    /// Iterate over the keys of this Tree
46    pub fn keys(
47        self,
48    ) -> impl DoubleEndedIterator<Item = Result<IVec>> + Send + Sync {
49        self.map(|r| r.map(|(k, _v)| k))
50    }
51
52    /// Iterate over the values of this Tree
53    pub fn values(
54        self,
55    ) -> impl DoubleEndedIterator<Item = Result<IVec>> + Send + Sync {
56        self.map(|r| r.map(|(_k, v)| v))
57    }
58
59    fn bounds_collapsed(&self) -> bool {
60        match (&self.lo, &self.hi) {
61            (Bound::Included(ref start), Bound::Included(ref end))
62            | (Bound::Included(ref start), Bound::Excluded(ref end))
63            | (Bound::Excluded(ref start), Bound::Included(ref end))
64            | (Bound::Excluded(ref start), Bound::Excluded(ref end)) => {
65                start > end
66            }
67            _ => false,
68        }
69    }
70
71    fn low_key(&self) -> &[u8] {
72        match self.lo {
73            Bound::Unbounded => &[],
74            Bound::Excluded(ref lo) | Bound::Included(ref lo) => lo.as_ref(),
75        }
76    }
77
78    fn high_key(&self) -> &[u8] {
79        const MAX_KEY: &[u8] = &[255; 1024 * 1024];
80        match self.hi {
81            Bound::Unbounded => MAX_KEY,
82            Bound::Excluded(ref hi) | Bound::Included(ref hi) => hi.as_ref(),
83        }
84    }
85
86    pub(crate) fn next_inner(&mut self) -> Option<<Self as Iterator>::Item> {
87        let guard = pin();
88        let (mut pid, mut node) = if let (true, Some((pid, node))) =
89            (self.going_forward, self.cached_node.take())
90        {
91            (pid, node)
92        } else {
93            let view =
94                iter_try!(self.tree.view_for_key(self.low_key(), &guard));
95            (view.pid, view.deref().clone())
96        };
97
98        for _ in 0..MAX_LOOPS {
99            if self.bounds_collapsed() {
100                return None;
101            }
102
103            if !node.contains_upper_bound(&self.lo) {
104                // node too low (maybe merged, maybe exhausted?)
105                let view =
106                    iter_try!(self.tree.view_for_key(self.low_key(), &guard));
107
108                pid = view.pid;
109                node = view.deref().clone();
110                continue;
111            } else if !node.contains_lower_bound(&self.lo, true) {
112                // node too high (maybe split, maybe exhausted?)
113                let seek_key = possible_predecessor(&node.lo)?;
114                let view = iter_try!(self.tree.view_for_key(seek_key, &guard));
115                pid = view.pid;
116                node = view.deref().clone();
117                continue;
118            }
119
120            if let Some((key, value)) = node.successor(&self.lo) {
121                self.lo = Bound::Excluded(key.clone());
122                self.cached_node = Some((pid, node));
123                self.going_forward = true;
124
125                match self.hi {
126                    Bound::Unbounded => return Some(Ok((key, value))),
127                    Bound::Included(ref h) if *h >= key => {
128                        return Some(Ok((key, value)));
129                    }
130                    Bound::Excluded(ref h) if *h > key => {
131                        return Some(Ok((key, value)));
132                    }
133                    _ => return None,
134                }
135            } else {
136                if node.hi.is_empty() {
137                    return None;
138                }
139                self.lo = Bound::Included(node.hi.clone());
140                continue;
141            }
142        }
143        panic!(
144            "fucked up tree traversal next({:?}) on {:?}",
145            self.lo, self.tree
146        );
147    }
148}
149
150impl Iterator for Iter {
151    type Item = Result<(IVec, IVec)>;
152
153    fn next(&mut self) -> Option<Self::Item> {
154        let _measure = Measure::new(&M.tree_scan);
155        let _cc = concurrency_control::read();
156        self.next_inner()
157    }
158
159    fn last(mut self) -> Option<Self::Item> {
160        self.next_back()
161    }
162}
163
164impl DoubleEndedIterator for Iter {
165    fn next_back(&mut self) -> Option<Self::Item> {
166        let _measure = Measure::new(&M.tree_reverse_scan);
167        let guard = pin();
168        let _cc = concurrency_control::read();
169
170        let (mut pid, mut node) = if let (false, Some((pid, node))) =
171            (self.going_forward, self.cached_node.take())
172        {
173            (pid, node)
174        } else {
175            let view =
176                iter_try!(self.tree.view_for_key(self.high_key(), &guard));
177            (view.pid, view.deref().clone())
178        };
179
180        for _ in 0..MAX_LOOPS {
181            if self.bounds_collapsed() {
182                return None;
183            }
184
185            if !node.contains_upper_bound(&self.hi) {
186                // node too low (maybe merged, maybe exhausted?)
187                let view =
188                    iter_try!(self.tree.view_for_key(self.high_key(), &guard));
189
190                pid = view.pid;
191                node = view.deref().clone();
192                continue;
193            } else if !node.contains_lower_bound(&self.hi, false) {
194                // node too high (maybe split, maybe exhausted?)
195                let seek_key = possible_predecessor(&node.lo)?;
196                let view = iter_try!(self.tree.view_for_key(seek_key, &guard));
197                pid = view.pid;
198                node = view.deref().clone();
199                continue;
200            }
201
202            if let Some((key, value)) = node.predecessor(&self.hi) {
203                self.hi = Bound::Excluded(key.clone());
204                self.cached_node = Some((pid, node));
205                self.going_forward = false;
206
207                match self.lo {
208                    Bound::Unbounded => return Some(Ok((key, value))),
209                    Bound::Included(ref l) if *l <= key => {
210                        return Some(Ok((key, value)));
211                    }
212                    Bound::Excluded(ref l) if *l < key => {
213                        return Some(Ok((key, value)));
214                    }
215                    _ => return None,
216                }
217            } else {
218                if node.lo.is_empty() {
219                    return None;
220                }
221                self.hi = Bound::Excluded(node.lo.clone());
222                continue;
223            }
224        }
225        panic!(
226            "fucked up tree traversal next_back({:?}) on {:?}",
227            self.hi, self.tree
228        );
229    }
230}
231
232#[test]
233fn test_possible_predecessor() {
234    assert_eq!(possible_predecessor(b""), None);
235    assert_eq!(possible_predecessor(&[0]), Some(vec![]));
236    assert_eq!(possible_predecessor(&[0, 0]), Some(vec![0]));
237    assert_eq!(
238        possible_predecessor(&[0, 1]),
239        Some(vec![0, 0, 255, 255, 255, 255])
240    );
241    assert_eq!(
242        possible_predecessor(&[0, 2]),
243        Some(vec![0, 1, 255, 255, 255, 255])
244    );
245    assert_eq!(possible_predecessor(&[1, 0]), Some(vec![1]));
246    assert_eq!(
247        possible_predecessor(&[1, 1]),
248        Some(vec![1, 0, 255, 255, 255, 255])
249    );
250    assert_eq!(
251        possible_predecessor(&[155]),
252        Some(vec![154, 255, 255, 255, 255])
253    );
254}