1use std::fmt::{self, Debug};
23use std::ops::Sub;
24
25use hashbrown::hash_table::Entry::{Occupied, Vacant};
26use hashbrown::HashTable;
27
28pub trait JoinHashMapType: Send + Sync {
97 fn extend_zero(&mut self, len: usize);
98
99 fn update_from_iter<'a>(
100 &mut self,
101 iter: Box<dyn Iterator<Item = (usize, &'a u64)> + Send + 'a>,
102 deleted_offset: usize,
103 );
104
105 fn get_matched_indices<'a>(
106 &self,
107 iter: Box<dyn Iterator<Item = (usize, &'a u64)> + 'a>,
108 deleted_offset: Option<usize>,
109 ) -> (Vec<u32>, Vec<u64>);
110
111 fn get_matched_indices_with_limit_offset(
112 &self,
113 hash_values: &[u64],
114 limit: usize,
115 offset: JoinHashMapOffset,
116 ) -> (Vec<u32>, Vec<u64>, Option<JoinHashMapOffset>);
117
118 fn is_empty(&self) -> bool;
120}
121
122pub struct JoinHashMapU32 {
123 map: HashTable<(u64, u32)>,
125 next: Vec<u32>,
127}
128
129impl JoinHashMapU32 {
130 #[cfg(test)]
131 pub(crate) fn new(map: HashTable<(u64, u32)>, next: Vec<u32>) -> Self {
132 Self { map, next }
133 }
134
135 pub fn with_capacity(cap: usize) -> Self {
136 Self {
137 map: HashTable::with_capacity(cap),
138 next: vec![0; cap],
139 }
140 }
141}
142
143impl Debug for JoinHashMapU32 {
144 fn fmt(&self, _f: &mut fmt::Formatter) -> fmt::Result {
145 Ok(())
146 }
147}
148
149impl JoinHashMapType for JoinHashMapU32 {
150 fn extend_zero(&mut self, _: usize) {}
151
152 fn update_from_iter<'a>(
153 &mut self,
154 iter: Box<dyn Iterator<Item = (usize, &'a u64)> + Send + 'a>,
155 deleted_offset: usize,
156 ) {
157 update_from_iter::<u32>(&mut self.map, &mut self.next, iter, deleted_offset);
158 }
159
160 fn get_matched_indices<'a>(
161 &self,
162 iter: Box<dyn Iterator<Item = (usize, &'a u64)> + 'a>,
163 deleted_offset: Option<usize>,
164 ) -> (Vec<u32>, Vec<u64>) {
165 get_matched_indices::<u32>(&self.map, &self.next, iter, deleted_offset)
166 }
167
168 fn get_matched_indices_with_limit_offset(
169 &self,
170 hash_values: &[u64],
171 limit: usize,
172 offset: JoinHashMapOffset,
173 ) -> (Vec<u32>, Vec<u64>, Option<JoinHashMapOffset>) {
174 get_matched_indices_with_limit_offset::<u32>(
175 &self.map,
176 &self.next,
177 hash_values,
178 limit,
179 offset,
180 )
181 }
182
183 fn is_empty(&self) -> bool {
184 self.map.is_empty()
185 }
186}
187
188pub struct JoinHashMapU64 {
189 map: HashTable<(u64, u64)>,
191 next: Vec<u64>,
193}
194
195impl JoinHashMapU64 {
196 #[cfg(test)]
197 pub(crate) fn new(map: HashTable<(u64, u64)>, next: Vec<u64>) -> Self {
198 Self { map, next }
199 }
200
201 pub fn with_capacity(cap: usize) -> Self {
202 Self {
203 map: HashTable::with_capacity(cap),
204 next: vec![0; cap],
205 }
206 }
207}
208
209impl Debug for JoinHashMapU64 {
210 fn fmt(&self, _f: &mut fmt::Formatter) -> fmt::Result {
211 Ok(())
212 }
213}
214
215impl JoinHashMapType for JoinHashMapU64 {
216 fn extend_zero(&mut self, _: usize) {}
217
218 fn update_from_iter<'a>(
219 &mut self,
220 iter: Box<dyn Iterator<Item = (usize, &'a u64)> + Send + 'a>,
221 deleted_offset: usize,
222 ) {
223 update_from_iter::<u64>(&mut self.map, &mut self.next, iter, deleted_offset);
224 }
225
226 fn get_matched_indices<'a>(
227 &self,
228 iter: Box<dyn Iterator<Item = (usize, &'a u64)> + 'a>,
229 deleted_offset: Option<usize>,
230 ) -> (Vec<u32>, Vec<u64>) {
231 get_matched_indices::<u64>(&self.map, &self.next, iter, deleted_offset)
232 }
233
234 fn get_matched_indices_with_limit_offset(
235 &self,
236 hash_values: &[u64],
237 limit: usize,
238 offset: JoinHashMapOffset,
239 ) -> (Vec<u32>, Vec<u64>, Option<JoinHashMapOffset>) {
240 get_matched_indices_with_limit_offset::<u64>(
241 &self.map,
242 &self.next,
243 hash_values,
244 limit,
245 offset,
246 )
247 }
248
249 fn is_empty(&self) -> bool {
250 self.map.is_empty()
251 }
252}
253
254pub(crate) type JoinHashMapOffset = (usize, Option<u64>);
256
257macro_rules! chain_traverse {
260 (
261 $input_indices:ident, $match_indices:ident,
262 $hash_values:ident, $next_chain:ident,
263 $input_idx:ident, $chain_idx:ident, $remaining_output:ident, $one:ident, $zero:ident
264 ) => {{
265 let mut match_row_idx = $chain_idx - $one;
267 loop {
268 $match_indices.push(match_row_idx.into());
269 $input_indices.push($input_idx as u32);
270 $remaining_output -= 1;
271
272 let next = $next_chain[match_row_idx.into() as usize];
273
274 if $remaining_output == 0 {
275 let next_offset = if $input_idx == $hash_values.len() - 1 && next == $zero
277 {
278 None
279 } else {
280 Some(($input_idx, Some(next.into())))
281 };
282 return ($input_indices, $match_indices, next_offset);
283 }
284 if next == $zero {
285 break;
286 }
287 match_row_idx = next - $one;
288 }
289 }};
290}
291
292pub fn update_from_iter<'a, T>(
293 map: &mut HashTable<(u64, T)>,
294 next: &mut [T],
295 iter: Box<dyn Iterator<Item = (usize, &'a u64)> + Send + 'a>,
296 deleted_offset: usize,
297) where
298 T: Copy + TryFrom<usize> + PartialOrd,
299 <T as TryFrom<usize>>::Error: Debug,
300{
301 for (row, &hash_value) in iter {
302 let entry = map.entry(
303 hash_value,
304 |&(hash, _)| hash_value == hash,
305 |&(hash, _)| hash,
306 );
307
308 match entry {
309 Occupied(mut occupied_entry) => {
310 let (_, index) = occupied_entry.get_mut();
312 let prev_index = *index;
313 *index = T::try_from(row + 1).unwrap();
315 next[row - deleted_offset] = prev_index;
317 }
318 Vacant(vacant_entry) => {
319 vacant_entry.insert((hash_value, T::try_from(row + 1).unwrap()));
320 }
321 }
322 }
323}
324
325pub fn get_matched_indices<'a, T>(
326 map: &HashTable<(u64, T)>,
327 next: &[T],
328 iter: Box<dyn Iterator<Item = (usize, &'a u64)> + 'a>,
329 deleted_offset: Option<usize>,
330) -> (Vec<u32>, Vec<u64>)
331where
332 T: Copy + TryFrom<usize> + PartialOrd + Into<u64> + Sub<Output = T>,
333 <T as TryFrom<usize>>::Error: Debug,
334{
335 let mut input_indices = vec![];
336 let mut match_indices = vec![];
337 let zero = T::try_from(0).unwrap();
338 let one = T::try_from(1).unwrap();
339
340 for (row_idx, hash_value) in iter {
341 if let Some((_, index)) = map.find(*hash_value, |(hash, _)| *hash_value == *hash)
343 {
344 let mut i = *index - one;
345 loop {
346 let match_row_idx = if let Some(offset) = deleted_offset {
347 let offset = T::try_from(offset).unwrap();
348 if i < offset {
350 break;
352 }
353 i - offset
354 } else {
355 i
356 };
357 match_indices.push(match_row_idx.into());
358 input_indices.push(row_idx as u32);
359 let next_chain = next[match_row_idx.into() as usize];
361 if next_chain == zero {
362 break;
364 }
365 i = next_chain - one;
366 }
367 }
368 }
369
370 (input_indices, match_indices)
371}
372
373pub fn get_matched_indices_with_limit_offset<T>(
374 map: &HashTable<(u64, T)>,
375 next_chain: &[T],
376 hash_values: &[u64],
377 limit: usize,
378 offset: JoinHashMapOffset,
379) -> (Vec<u32>, Vec<u64>, Option<JoinHashMapOffset>)
380where
381 T: Copy + TryFrom<usize> + PartialOrd + Into<u64> + Sub<Output = T>,
382 <T as TryFrom<usize>>::Error: Debug,
383{
384 let mut input_indices = Vec::with_capacity(limit);
385 let mut match_indices = Vec::with_capacity(limit);
386 let zero = T::try_from(0).unwrap();
387 let one = T::try_from(1).unwrap();
388
389 if map.len() == next_chain.len() {
392 let start = offset.0;
393 let end = (start + limit).min(hash_values.len());
394 for (i, &hash) in hash_values[start..end].iter().enumerate() {
395 if let Some((_, idx)) = map.find(hash, |(h, _)| hash == *h) {
396 input_indices.push(start as u32 + i as u32);
397 match_indices.push((*idx - one).into());
398 }
399 }
400 let next_off = if end == hash_values.len() {
401 None
402 } else {
403 Some((end, None))
404 };
405 return (input_indices, match_indices, next_off);
406 }
407
408 let mut remaining_output = limit;
409
410 let to_skip = match offset {
412 (idx, None) => idx,
414 (idx, Some(0)) => idx + 1,
417 (idx, Some(next_idx)) => {
420 let next_idx: T = T::try_from(next_idx as usize).unwrap();
421 chain_traverse!(
422 input_indices,
423 match_indices,
424 hash_values,
425 next_chain,
426 idx,
427 next_idx,
428 remaining_output,
429 one,
430 zero
431 );
432 idx + 1
433 }
434 };
435
436 let mut row_idx = to_skip;
437 for &hash in &hash_values[to_skip..] {
438 if let Some((_, idx)) = map.find(hash, |(h, _)| hash == *h) {
439 let idx: T = *idx;
440 chain_traverse!(
441 input_indices,
442 match_indices,
443 hash_values,
444 next_chain,
445 row_idx,
446 idx,
447 remaining_output,
448 one,
449 zero
450 );
451 }
452 row_idx += 1;
453 }
454 (input_indices, match_indices, None)
455}