datafusion_physical_plan/joins/
join_hash_map.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! This file contains the implementation of the `JoinHashMap` struct, which
19//! is used to store the mapping between hash values based on the build side
20//! ["on" values] to a list of indices with this key's value.
21
22use std::fmt::{self, Debug};
23use std::ops::Sub;
24
25use hashbrown::hash_table::Entry::{Occupied, Vacant};
26use hashbrown::HashTable;
27
28/// Maps a `u64` hash value based on the build side ["on" values] to a list of indices with this key's value.
29///
30/// By allocating a `HashMap` with capacity for *at least* the number of rows for entries at the build side,
31/// we make sure that we don't have to re-hash the hashmap, which needs access to the key (the hash in this case) value.
32///
33/// E.g. 1 -> [3, 6, 8] indicates that the column values map to rows 3, 6 and 8 for hash value 1
34/// As the key is a hash value, we need to check possible hash collisions in the probe stage
35/// During this stage it might be the case that a row is contained the same hashmap value,
36/// but the values don't match. Those are checked in the `equal_rows_arr` method.
37///
38/// The indices (values) are stored in a separate chained list stored as `Vec<u32>` or `Vec<u64>`.
39///
40/// The first value (+1) is stored in the hashmap, whereas the next value is stored in array at the position value.
41///
42/// The chain can be followed until the value "0" has been reached, meaning the end of the list.
43/// Also see chapter 5.3 of [Balancing vectorized query execution with bandwidth-optimized storage](https://dare.uva.nl/search?identifier=5ccbb60a-38b8-4eeb-858a-e7735dd37487)
44///
45/// # Example
46///
47/// ``` text
48/// See the example below:
49///
50/// Insert (10,1)            <-- insert hash value 10 with row index 1
51/// map:
52/// ----------
53/// | 10 | 2 |
54/// ----------
55/// next:
56/// ---------------------
57/// | 0 | 0 | 0 | 0 | 0 |
58/// ---------------------
59/// Insert (20,2)
60/// map:
61/// ----------
62/// | 10 | 2 |
63/// | 20 | 3 |
64/// ----------
65/// next:
66/// ---------------------
67/// | 0 | 0 | 0 | 0 | 0 |
68/// ---------------------
69/// Insert (10,3)           <-- collision! row index 3 has a hash value of 10 as well
70/// map:
71/// ----------
72/// | 10 | 4 |
73/// | 20 | 3 |
74/// ----------
75/// next:
76/// ---------------------
77/// | 0 | 0 | 0 | 2 | 0 |  <--- hash value 10 maps to 4,2 (which means indices values 3,1)
78/// ---------------------
79/// Insert (10,4)          <-- another collision! row index 4 ALSO has a hash value of 10
80/// map:
81/// ---------
82/// | 10 | 5 |
83/// | 20 | 3 |
84/// ---------
85/// next:
86/// ---------------------
87/// | 0 | 0 | 0 | 2 | 4 | <--- hash value 10 maps to 5,4,2 (which means indices values 4,3,1)
88/// ---------------------
89/// ```
90///
91/// Here we have an option between creating a `JoinHashMapType` using `u32` or `u64` indices
92/// based on how many rows were being used for indices.
93///
94/// At runtime we choose between using `JoinHashMapU32` and `JoinHashMapU64` which oth implement
95/// `JoinHashMapType`.
96pub trait JoinHashMapType: Send + Sync {
97    fn extend_zero(&mut self, len: usize);
98
99    fn update_from_iter<'a>(
100        &mut self,
101        iter: Box<dyn Iterator<Item = (usize, &'a u64)> + Send + 'a>,
102        deleted_offset: usize,
103    );
104
105    fn get_matched_indices<'a>(
106        &self,
107        iter: Box<dyn Iterator<Item = (usize, &'a u64)> + 'a>,
108        deleted_offset: Option<usize>,
109    ) -> (Vec<u32>, Vec<u64>);
110
111    fn get_matched_indices_with_limit_offset(
112        &self,
113        hash_values: &[u64],
114        limit: usize,
115        offset: JoinHashMapOffset,
116    ) -> (Vec<u32>, Vec<u64>, Option<JoinHashMapOffset>);
117
118    /// Returns `true` if the join hash map contains no entries.
119    fn is_empty(&self) -> bool;
120}
121
122pub struct JoinHashMapU32 {
123    // Stores hash value to last row index
124    map: HashTable<(u64, u32)>,
125    // Stores indices in chained list data structure
126    next: Vec<u32>,
127}
128
129impl JoinHashMapU32 {
130    #[cfg(test)]
131    pub(crate) fn new(map: HashTable<(u64, u32)>, next: Vec<u32>) -> Self {
132        Self { map, next }
133    }
134
135    pub fn with_capacity(cap: usize) -> Self {
136        Self {
137            map: HashTable::with_capacity(cap),
138            next: vec![0; cap],
139        }
140    }
141}
142
143impl Debug for JoinHashMapU32 {
144    fn fmt(&self, _f: &mut fmt::Formatter) -> fmt::Result {
145        Ok(())
146    }
147}
148
149impl JoinHashMapType for JoinHashMapU32 {
150    fn extend_zero(&mut self, _: usize) {}
151
152    fn update_from_iter<'a>(
153        &mut self,
154        iter: Box<dyn Iterator<Item = (usize, &'a u64)> + Send + 'a>,
155        deleted_offset: usize,
156    ) {
157        update_from_iter::<u32>(&mut self.map, &mut self.next, iter, deleted_offset);
158    }
159
160    fn get_matched_indices<'a>(
161        &self,
162        iter: Box<dyn Iterator<Item = (usize, &'a u64)> + 'a>,
163        deleted_offset: Option<usize>,
164    ) -> (Vec<u32>, Vec<u64>) {
165        get_matched_indices::<u32>(&self.map, &self.next, iter, deleted_offset)
166    }
167
168    fn get_matched_indices_with_limit_offset(
169        &self,
170        hash_values: &[u64],
171        limit: usize,
172        offset: JoinHashMapOffset,
173    ) -> (Vec<u32>, Vec<u64>, Option<JoinHashMapOffset>) {
174        get_matched_indices_with_limit_offset::<u32>(
175            &self.map,
176            &self.next,
177            hash_values,
178            limit,
179            offset,
180        )
181    }
182
183    fn is_empty(&self) -> bool {
184        self.map.is_empty()
185    }
186}
187
188pub struct JoinHashMapU64 {
189    // Stores hash value to last row index
190    map: HashTable<(u64, u64)>,
191    // Stores indices in chained list data structure
192    next: Vec<u64>,
193}
194
195impl JoinHashMapU64 {
196    #[cfg(test)]
197    pub(crate) fn new(map: HashTable<(u64, u64)>, next: Vec<u64>) -> Self {
198        Self { map, next }
199    }
200
201    pub fn with_capacity(cap: usize) -> Self {
202        Self {
203            map: HashTable::with_capacity(cap),
204            next: vec![0; cap],
205        }
206    }
207}
208
209impl Debug for JoinHashMapU64 {
210    fn fmt(&self, _f: &mut fmt::Formatter) -> fmt::Result {
211        Ok(())
212    }
213}
214
215impl JoinHashMapType for JoinHashMapU64 {
216    fn extend_zero(&mut self, _: usize) {}
217
218    fn update_from_iter<'a>(
219        &mut self,
220        iter: Box<dyn Iterator<Item = (usize, &'a u64)> + Send + 'a>,
221        deleted_offset: usize,
222    ) {
223        update_from_iter::<u64>(&mut self.map, &mut self.next, iter, deleted_offset);
224    }
225
226    fn get_matched_indices<'a>(
227        &self,
228        iter: Box<dyn Iterator<Item = (usize, &'a u64)> + 'a>,
229        deleted_offset: Option<usize>,
230    ) -> (Vec<u32>, Vec<u64>) {
231        get_matched_indices::<u64>(&self.map, &self.next, iter, deleted_offset)
232    }
233
234    fn get_matched_indices_with_limit_offset(
235        &self,
236        hash_values: &[u64],
237        limit: usize,
238        offset: JoinHashMapOffset,
239    ) -> (Vec<u32>, Vec<u64>, Option<JoinHashMapOffset>) {
240        get_matched_indices_with_limit_offset::<u64>(
241            &self.map,
242            &self.next,
243            hash_values,
244            limit,
245            offset,
246        )
247    }
248
249    fn is_empty(&self) -> bool {
250        self.map.is_empty()
251    }
252}
253
254// Type of offsets for obtaining indices from JoinHashMap.
255pub(crate) type JoinHashMapOffset = (usize, Option<u64>);
256
257// Macro for traversing chained values with limit.
258// Early returns in case of reaching output tuples limit.
259macro_rules! chain_traverse {
260    (
261        $input_indices:ident, $match_indices:ident,
262        $hash_values:ident, $next_chain:ident,
263        $input_idx:ident, $chain_idx:ident, $remaining_output:ident, $one:ident, $zero:ident
264    ) => {{
265        // now `one` and `zero` are in scope from the outer function
266        let mut match_row_idx = $chain_idx - $one;
267        loop {
268            $match_indices.push(match_row_idx.into());
269            $input_indices.push($input_idx as u32);
270            $remaining_output -= 1;
271
272            let next = $next_chain[match_row_idx.into() as usize];
273
274            if $remaining_output == 0 {
275                // we compare against `zero` (of type T) here too
276                let next_offset = if $input_idx == $hash_values.len() - 1 && next == $zero
277                {
278                    None
279                } else {
280                    Some(($input_idx, Some(next.into())))
281                };
282                return ($input_indices, $match_indices, next_offset);
283            }
284            if next == $zero {
285                break;
286            }
287            match_row_idx = next - $one;
288        }
289    }};
290}
291
292pub fn update_from_iter<'a, T>(
293    map: &mut HashTable<(u64, T)>,
294    next: &mut [T],
295    iter: Box<dyn Iterator<Item = (usize, &'a u64)> + Send + 'a>,
296    deleted_offset: usize,
297) where
298    T: Copy + TryFrom<usize> + PartialOrd,
299    <T as TryFrom<usize>>::Error: Debug,
300{
301    for (row, &hash_value) in iter {
302        let entry = map.entry(
303            hash_value,
304            |&(hash, _)| hash_value == hash,
305            |&(hash, _)| hash,
306        );
307
308        match entry {
309            Occupied(mut occupied_entry) => {
310                // Already exists: add index to next array
311                let (_, index) = occupied_entry.get_mut();
312                let prev_index = *index;
313                // Store new value inside hashmap
314                *index = T::try_from(row + 1).unwrap();
315                // Update chained Vec at `row` with previous value
316                next[row - deleted_offset] = prev_index;
317            }
318            Vacant(vacant_entry) => {
319                vacant_entry.insert((hash_value, T::try_from(row + 1).unwrap()));
320            }
321        }
322    }
323}
324
325pub fn get_matched_indices<'a, T>(
326    map: &HashTable<(u64, T)>,
327    next: &[T],
328    iter: Box<dyn Iterator<Item = (usize, &'a u64)> + 'a>,
329    deleted_offset: Option<usize>,
330) -> (Vec<u32>, Vec<u64>)
331where
332    T: Copy + TryFrom<usize> + PartialOrd + Into<u64> + Sub<Output = T>,
333    <T as TryFrom<usize>>::Error: Debug,
334{
335    let mut input_indices = vec![];
336    let mut match_indices = vec![];
337    let zero = T::try_from(0).unwrap();
338    let one = T::try_from(1).unwrap();
339
340    for (row_idx, hash_value) in iter {
341        // Get the hash and find it in the index
342        if let Some((_, index)) = map.find(*hash_value, |(hash, _)| *hash_value == *hash)
343        {
344            let mut i = *index - one;
345            loop {
346                let match_row_idx = if let Some(offset) = deleted_offset {
347                    let offset = T::try_from(offset).unwrap();
348                    // This arguments means that we prune the next index way before here.
349                    if i < offset {
350                        // End of the list due to pruning
351                        break;
352                    }
353                    i - offset
354                } else {
355                    i
356                };
357                match_indices.push(match_row_idx.into());
358                input_indices.push(row_idx as u32);
359                // Follow the chain to get the next index value
360                let next_chain = next[match_row_idx.into() as usize];
361                if next_chain == zero {
362                    // end of list
363                    break;
364                }
365                i = next_chain - one;
366            }
367        }
368    }
369
370    (input_indices, match_indices)
371}
372
373pub fn get_matched_indices_with_limit_offset<T>(
374    map: &HashTable<(u64, T)>,
375    next_chain: &[T],
376    hash_values: &[u64],
377    limit: usize,
378    offset: JoinHashMapOffset,
379) -> (Vec<u32>, Vec<u64>, Option<JoinHashMapOffset>)
380where
381    T: Copy + TryFrom<usize> + PartialOrd + Into<u64> + Sub<Output = T>,
382    <T as TryFrom<usize>>::Error: Debug,
383{
384    let mut input_indices = Vec::with_capacity(limit);
385    let mut match_indices = Vec::with_capacity(limit);
386    let zero = T::try_from(0).unwrap();
387    let one = T::try_from(1).unwrap();
388
389    // Check if hashmap consists of unique values
390    // If so, we can skip the chain traversal
391    if map.len() == next_chain.len() {
392        let start = offset.0;
393        let end = (start + limit).min(hash_values.len());
394        for (i, &hash) in hash_values[start..end].iter().enumerate() {
395            if let Some((_, idx)) = map.find(hash, |(h, _)| hash == *h) {
396                input_indices.push(start as u32 + i as u32);
397                match_indices.push((*idx - one).into());
398            }
399        }
400        let next_off = if end == hash_values.len() {
401            None
402        } else {
403            Some((end, None))
404        };
405        return (input_indices, match_indices, next_off);
406    }
407
408    let mut remaining_output = limit;
409
410    // Calculate initial `hash_values` index before iterating
411    let to_skip = match offset {
412        // None `initial_next_idx` indicates that `initial_idx` processing has'n been started
413        (idx, None) => idx,
414        // Zero `initial_next_idx` indicates that `initial_idx` has been processed during
415        // previous iteration, and it should be skipped
416        (idx, Some(0)) => idx + 1,
417        // Otherwise, process remaining `initial_idx` matches by traversing `next_chain`,
418        // to start with the next index
419        (idx, Some(next_idx)) => {
420            let next_idx: T = T::try_from(next_idx as usize).unwrap();
421            chain_traverse!(
422                input_indices,
423                match_indices,
424                hash_values,
425                next_chain,
426                idx,
427                next_idx,
428                remaining_output,
429                one,
430                zero
431            );
432            idx + 1
433        }
434    };
435
436    let mut row_idx = to_skip;
437    for &hash in &hash_values[to_skip..] {
438        if let Some((_, idx)) = map.find(hash, |(h, _)| hash == *h) {
439            let idx: T = *idx;
440            chain_traverse!(
441                input_indices,
442                match_indices,
443                hash_values,
444                next_chain,
445                row_idx,
446                idx,
447                remaining_output,
448                one,
449                zero
450            );
451        }
452        row_idx += 1;
453    }
454    (input_indices, match_indices, None)
455}