1use std::ops::{Bound, Deref};
2
3use crate::{Measure, M};
4
5use super::*;
6
7#[cfg(any(test, feature = "lock_free_delays"))]
8const MAX_LOOPS: usize = usize::max_value();
9
10#[cfg(not(any(test, feature = "lock_free_delays")))]
11const MAX_LOOPS: usize = 1_000_000;
12
13fn possible_predecessor(s: &[u8]) -> Option<Vec<u8>> {
14 let mut ret = s.to_vec();
15 match ret.pop() {
16 None => None,
17 Some(i) if i == 0 => Some(ret),
18 Some(i) => {
19 ret.push(i - 1);
20 ret.extend_from_slice(&[255; 4]);
21 Some(ret)
22 }
23 }
24}
25
26macro_rules! iter_try {
27 ($e:expr) => {
28 match $e {
29 Ok(item) => item,
30 Err(e) => return Some(Err(e)),
31 }
32 };
33}
34
35pub struct Iter {
37 pub(super) tree: Tree,
38 pub(super) hi: Bound<IVec>,
39 pub(super) lo: Bound<IVec>,
40 pub(super) cached_node: Option<(PageId, Node)>,
41 pub(super) going_forward: bool,
42}
43
44impl Iter {
45 pub fn keys(
47 self,
48 ) -> impl DoubleEndedIterator<Item = Result<IVec>> + Send + Sync {
49 self.map(|r| r.map(|(k, _v)| k))
50 }
51
52 pub fn values(
54 self,
55 ) -> impl DoubleEndedIterator<Item = Result<IVec>> + Send + Sync {
56 self.map(|r| r.map(|(_k, v)| v))
57 }
58
59 fn bounds_collapsed(&self) -> bool {
60 match (&self.lo, &self.hi) {
61 (Bound::Included(ref start), Bound::Included(ref end))
62 | (Bound::Included(ref start), Bound::Excluded(ref end))
63 | (Bound::Excluded(ref start), Bound::Included(ref end))
64 | (Bound::Excluded(ref start), Bound::Excluded(ref end)) => {
65 start > end
66 }
67 _ => false,
68 }
69 }
70
71 fn low_key(&self) -> &[u8] {
72 match self.lo {
73 Bound::Unbounded => &[],
74 Bound::Excluded(ref lo) | Bound::Included(ref lo) => lo.as_ref(),
75 }
76 }
77
78 fn high_key(&self) -> &[u8] {
79 const MAX_KEY: &[u8] = &[255; 1024 * 1024];
80 match self.hi {
81 Bound::Unbounded => MAX_KEY,
82 Bound::Excluded(ref hi) | Bound::Included(ref hi) => hi.as_ref(),
83 }
84 }
85
86 pub(crate) fn next_inner(&mut self) -> Option<<Self as Iterator>::Item> {
87 let guard = pin();
88 let (mut pid, mut node) = if let (true, Some((pid, node))) =
89 (self.going_forward, self.cached_node.take())
90 {
91 (pid, node)
92 } else {
93 let view =
94 iter_try!(self.tree.view_for_key(self.low_key(), &guard));
95 (view.pid, view.deref().clone())
96 };
97
98 for _ in 0..MAX_LOOPS {
99 if self.bounds_collapsed() {
100 return None;
101 }
102
103 if !node.contains_upper_bound(&self.lo) {
104 let view =
106 iter_try!(self.tree.view_for_key(self.low_key(), &guard));
107
108 pid = view.pid;
109 node = view.deref().clone();
110 continue;
111 } else if !node.contains_lower_bound(&self.lo, true) {
112 let seek_key = possible_predecessor(&node.lo)?;
114 let view = iter_try!(self.tree.view_for_key(seek_key, &guard));
115 pid = view.pid;
116 node = view.deref().clone();
117 continue;
118 }
119
120 if let Some((key, value)) = node.successor(&self.lo) {
121 self.lo = Bound::Excluded(key.clone());
122 self.cached_node = Some((pid, node));
123 self.going_forward = true;
124
125 match self.hi {
126 Bound::Unbounded => return Some(Ok((key, value))),
127 Bound::Included(ref h) if *h >= key => {
128 return Some(Ok((key, value)));
129 }
130 Bound::Excluded(ref h) if *h > key => {
131 return Some(Ok((key, value)));
132 }
133 _ => return None,
134 }
135 } else {
136 if node.hi.is_empty() {
137 return None;
138 }
139 self.lo = Bound::Included(node.hi.clone());
140 continue;
141 }
142 }
143 panic!(
144 "fucked up tree traversal next({:?}) on {:?}",
145 self.lo, self.tree
146 );
147 }
148}
149
150impl Iterator for Iter {
151 type Item = Result<(IVec, IVec)>;
152
153 fn next(&mut self) -> Option<Self::Item> {
154 let _measure = Measure::new(&M.tree_scan);
155 let _cc = concurrency_control::read();
156 self.next_inner()
157 }
158
159 fn last(mut self) -> Option<Self::Item> {
160 self.next_back()
161 }
162}
163
164impl DoubleEndedIterator for Iter {
165 fn next_back(&mut self) -> Option<Self::Item> {
166 let _measure = Measure::new(&M.tree_reverse_scan);
167 let guard = pin();
168 let _cc = concurrency_control::read();
169
170 let (mut pid, mut node) = if let (false, Some((pid, node))) =
171 (self.going_forward, self.cached_node.take())
172 {
173 (pid, node)
174 } else {
175 let view =
176 iter_try!(self.tree.view_for_key(self.high_key(), &guard));
177 (view.pid, view.deref().clone())
178 };
179
180 for _ in 0..MAX_LOOPS {
181 if self.bounds_collapsed() {
182 return None;
183 }
184
185 if !node.contains_upper_bound(&self.hi) {
186 let view =
188 iter_try!(self.tree.view_for_key(self.high_key(), &guard));
189
190 pid = view.pid;
191 node = view.deref().clone();
192 continue;
193 } else if !node.contains_lower_bound(&self.hi, false) {
194 let seek_key = possible_predecessor(&node.lo)?;
196 let view = iter_try!(self.tree.view_for_key(seek_key, &guard));
197 pid = view.pid;
198 node = view.deref().clone();
199 continue;
200 }
201
202 if let Some((key, value)) = node.predecessor(&self.hi) {
203 self.hi = Bound::Excluded(key.clone());
204 self.cached_node = Some((pid, node));
205 self.going_forward = false;
206
207 match self.lo {
208 Bound::Unbounded => return Some(Ok((key, value))),
209 Bound::Included(ref l) if *l <= key => {
210 return Some(Ok((key, value)));
211 }
212 Bound::Excluded(ref l) if *l < key => {
213 return Some(Ok((key, value)));
214 }
215 _ => return None,
216 }
217 } else {
218 if node.lo.is_empty() {
219 return None;
220 }
221 self.hi = Bound::Excluded(node.lo.clone());
222 continue;
223 }
224 }
225 panic!(
226 "fucked up tree traversal next_back({:?}) on {:?}",
227 self.hi, self.tree
228 );
229 }
230}
231
232#[test]
233fn test_possible_predecessor() {
234 assert_eq!(possible_predecessor(b""), None);
235 assert_eq!(possible_predecessor(&[0]), Some(vec![]));
236 assert_eq!(possible_predecessor(&[0, 0]), Some(vec![0]));
237 assert_eq!(
238 possible_predecessor(&[0, 1]),
239 Some(vec![0, 0, 255, 255, 255, 255])
240 );
241 assert_eq!(
242 possible_predecessor(&[0, 2]),
243 Some(vec![0, 1, 255, 255, 255, 255])
244 );
245 assert_eq!(possible_predecessor(&[1, 0]), Some(vec![1]));
246 assert_eq!(
247 possible_predecessor(&[1, 1]),
248 Some(vec![1, 0, 255, 255, 255, 255])
249 );
250 assert_eq!(
251 possible_predecessor(&[155]),
252 Some(vec![154, 255, 255, 255, 255])
253 );
254}