1pub mod expr;
21pub mod memory;
22pub mod proxy;
23pub mod string_utils;
24
25use crate::error::{
26 _exec_datafusion_err, _exec_err, _internal_datafusion_err, _internal_err,
27};
28use crate::{Result, ScalarValue};
29use arrow::array::{
30 cast::AsArray, Array, ArrayRef, FixedSizeListArray, LargeListArray, ListArray,
31 OffsetSizeTrait,
32};
33use arrow::array::{ArrowPrimitiveType, PrimitiveArray};
34use arrow::buffer::OffsetBuffer;
35use arrow::compute::{partition, SortColumn, SortOptions};
36use arrow::datatypes::{
37 ArrowNativeType, DataType, Field, Int32Type, Int64Type, SchemaRef,
38};
39#[cfg(feature = "sql")]
40use sqlparser::{ast::Ident, dialect::GenericDialect, parser::Parser};
41use std::borrow::{Borrow, Cow};
42use std::cmp::{min, Ordering};
43use std::collections::HashSet;
44use std::num::NonZero;
45use std::ops::Range;
46use std::sync::Arc;
47use std::thread::available_parallelism;
48
49pub fn project_schema(
76 schema: &SchemaRef,
77 projection: Option<&Vec<usize>>,
78) -> Result<SchemaRef> {
79 let schema = match projection {
80 Some(columns) => Arc::new(schema.project(columns)?),
81 None => Arc::clone(schema),
82 };
83 Ok(schema)
84}
85
86pub fn extract_row_at_idx_to_buf(
88 columns: &[ArrayRef],
89 idx: usize,
90 buf: &mut Vec<ScalarValue>,
91) -> Result<()> {
92 buf.clear();
93
94 let iter = columns
95 .iter()
96 .map(|arr| ScalarValue::try_from_array(arr, idx));
97 for v in iter.into_iter() {
98 buf.push(v?);
99 }
100
101 Ok(())
102}
103pub fn get_row_at_idx(columns: &[ArrayRef], idx: usize) -> Result<Vec<ScalarValue>> {
105 columns
106 .iter()
107 .map(|arr| ScalarValue::try_from_array(arr, idx))
108 .collect()
109}
110
111pub fn compare_rows(
113 x: &[ScalarValue],
114 y: &[ScalarValue],
115 sort_options: &[SortOptions],
116) -> Result<Ordering> {
117 let zip_it = x.iter().zip(y.iter()).zip(sort_options.iter());
118 for ((lhs, rhs), sort_options) in zip_it {
120 let result = match (lhs.is_null(), rhs.is_null(), sort_options.nulls_first) {
122 (true, false, false) | (false, true, true) => Ordering::Greater,
123 (true, false, true) | (false, true, false) => Ordering::Less,
124 (false, false, _) => {
125 if sort_options.descending {
126 rhs.try_cmp(lhs)?
127 } else {
128 lhs.try_cmp(rhs)?
129 }
130 }
131 (true, true, _) => continue,
132 };
133 if result != Ordering::Equal {
134 return Ok(result);
135 }
136 }
137 Ok(Ordering::Equal)
138}
139
140pub fn bisect<const SIDE: bool>(
145 item_columns: &[ArrayRef],
146 target: &[ScalarValue],
147 sort_options: &[SortOptions],
148) -> Result<usize> {
149 let low: usize = 0;
150 let high: usize = item_columns
151 .first()
152 .ok_or_else(|| _internal_datafusion_err!("Column array shouldn't be empty"))?
153 .len();
154 let compare_fn = |current: &[ScalarValue], target: &[ScalarValue]| {
155 let cmp = compare_rows(current, target, sort_options)?;
156 Ok(if SIDE { cmp.is_lt() } else { cmp.is_le() })
157 };
158 find_bisect_point(item_columns, target, compare_fn, low, high)
159}
160
161pub fn find_bisect_point<F>(
168 item_columns: &[ArrayRef],
169 target: &[ScalarValue],
170 compare_fn: F,
171 mut low: usize,
172 mut high: usize,
173) -> Result<usize>
174where
175 F: Fn(&[ScalarValue], &[ScalarValue]) -> Result<bool>,
176{
177 while low < high {
178 let mid = ((high - low) / 2) + low;
179 let val = get_row_at_idx(item_columns, mid)?;
180 if compare_fn(&val, target)? {
181 low = mid + 1;
182 } else {
183 high = mid;
184 }
185 }
186 Ok(low)
187}
188
189pub fn linear_search<const SIDE: bool>(
194 item_columns: &[ArrayRef],
195 target: &[ScalarValue],
196 sort_options: &[SortOptions],
197) -> Result<usize> {
198 let low: usize = 0;
199 let high: usize = item_columns
200 .first()
201 .ok_or_else(|| _internal_datafusion_err!("Column array shouldn't be empty"))?
202 .len();
203 let compare_fn = |current: &[ScalarValue], target: &[ScalarValue]| {
204 let cmp = compare_rows(current, target, sort_options)?;
205 Ok(if SIDE { cmp.is_lt() } else { cmp.is_le() })
206 };
207 search_in_slice(item_columns, target, compare_fn, low, high)
208}
209
210pub fn search_in_slice<F>(
215 item_columns: &[ArrayRef],
216 target: &[ScalarValue],
217 compare_fn: F,
218 mut low: usize,
219 high: usize,
220) -> Result<usize>
221where
222 F: Fn(&[ScalarValue], &[ScalarValue]) -> Result<bool>,
223{
224 while low < high {
225 let val = get_row_at_idx(item_columns, low)?;
226 if !compare_fn(&val, target)? {
227 break;
228 }
229 low += 1;
230 }
231 Ok(low)
232}
233
234pub fn evaluate_partition_ranges(
239 num_rows: usize,
240 partition_columns: &[SortColumn],
241) -> Result<Vec<Range<usize>>> {
242 Ok(if partition_columns.is_empty() {
243 vec![Range {
244 start: 0,
245 end: num_rows,
246 }]
247 } else {
248 let cols: Vec<_> = partition_columns
249 .iter()
250 .map(|x| Arc::clone(&x.values))
251 .collect();
252 partition(&cols)?.ranges()
253 })
254}
255
256pub fn quote_identifier(s: &str) -> Cow<'_, str> {
261 if needs_quotes(s) {
262 Cow::Owned(format!("\"{}\"", s.replace('"', "\"\"")))
263 } else {
264 Cow::Borrowed(s)
265 }
266}
267
268fn needs_quotes(s: &str) -> bool {
270 let mut chars = s.chars();
271
272 if let Some(first_char) = chars.next() {
274 if !(first_char.is_ascii_lowercase() || first_char == '_') {
275 return true;
276 }
277 }
278
279 !chars.all(|c| c.is_ascii_lowercase() || c.is_ascii_digit() || c == '_')
280}
281
282#[cfg(feature = "sql")]
283pub(crate) fn parse_identifiers(s: &str) -> Result<Vec<Ident>> {
284 let dialect = GenericDialect;
285 let mut parser = Parser::new(&dialect).try_with_sql(s)?;
286 let idents = parser.parse_multipart_identifier()?;
287 Ok(idents)
288}
289
290#[cfg(feature = "sql")]
294pub(crate) fn parse_identifiers_normalized(s: &str, ignore_case: bool) -> Vec<String> {
295 parse_identifiers(s)
296 .unwrap_or_default()
297 .into_iter()
298 .map(|id| match id.quote_style {
299 Some(_) => id.value,
300 None if ignore_case => id.value,
301 _ => id.value.to_ascii_lowercase(),
302 })
303 .collect::<Vec<_>>()
304}
305
306#[cfg(not(feature = "sql"))]
307pub(crate) fn parse_identifiers(s: &str) -> Result<Vec<String>> {
308 let mut result = Vec::new();
309 let mut current = String::new();
310 let mut in_quotes = false;
311
312 for ch in s.chars() {
313 match ch {
314 '"' => {
315 in_quotes = !in_quotes;
316 current.push(ch);
317 }
318 '.' if !in_quotes => {
319 result.push(current.clone());
320 current.clear();
321 }
322 _ => {
323 current.push(ch);
324 }
325 }
326 }
327
328 if !current.is_empty() {
330 result.push(current);
331 }
332
333 Ok(result)
334}
335
336#[cfg(not(feature = "sql"))]
337pub(crate) fn parse_identifiers_normalized(s: &str, ignore_case: bool) -> Vec<String> {
338 parse_identifiers(s)
339 .unwrap_or_default()
340 .into_iter()
341 .map(|id| {
342 let is_double_quoted = if id.len() > 2 {
343 let mut chars = id.chars();
344 chars.next() == Some('"') && chars.last() == Some('"')
345 } else {
346 false
347 };
348 if is_double_quoted {
349 id[1..id.len() - 1].to_string().replace("\"\"", "\"")
350 } else if ignore_case {
351 id
352 } else {
353 id.to_ascii_lowercase()
354 }
355 })
356 .collect::<Vec<_>>()
357}
358
359pub fn get_at_indices<T: Clone, I: Borrow<usize>>(
361 items: &[T],
362 indices: impl IntoIterator<Item = I>,
363) -> Result<Vec<T>> {
364 indices
365 .into_iter()
366 .map(|idx| items.get(*idx.borrow()).cloned())
367 .collect::<Option<Vec<T>>>()
368 .ok_or_else(|| {
369 _exec_datafusion_err!("Expects indices to be in the range of searched vector")
370 })
371}
372
373pub fn longest_consecutive_prefix<T: Borrow<usize>>(
379 sequence: impl IntoIterator<Item = T>,
380) -> usize {
381 let mut count = 0;
382 for item in sequence {
383 if !count.eq(item.borrow()) {
384 break;
385 }
386 count += 1;
387 }
388 count
389}
390
391#[derive(Debug, Clone)]
413pub struct SingleRowListArrayBuilder {
414 arr: ArrayRef,
416 nullable: bool,
418 field_name: Option<String>,
421}
422
423impl SingleRowListArrayBuilder {
424 pub fn new(arr: ArrayRef) -> Self {
426 Self {
427 arr,
428 nullable: true,
429 field_name: None,
430 }
431 }
432
433 pub fn with_nullable(mut self, nullable: bool) -> Self {
435 self.nullable = nullable;
436 self
437 }
438
439 pub fn with_field_name(mut self, field_name: Option<String>) -> Self {
441 self.field_name = field_name;
442 self
443 }
444
445 pub fn with_field(self, field: &Field) -> Self {
447 self.with_field_name(Some(field.name().to_owned()))
448 .with_nullable(field.is_nullable())
449 }
450
451 pub fn build_list_array(self) -> ListArray {
453 let (field, arr) = self.into_field_and_arr();
454 let offsets = OffsetBuffer::from_lengths([arr.len()]);
455 ListArray::new(field, offsets, arr, None)
456 }
457
458 pub fn build_list_scalar(self) -> ScalarValue {
460 ScalarValue::List(Arc::new(self.build_list_array()))
461 }
462
463 pub fn build_large_list_array(self) -> LargeListArray {
465 let (field, arr) = self.into_field_and_arr();
466 let offsets = OffsetBuffer::from_lengths([arr.len()]);
467 LargeListArray::new(field, offsets, arr, None)
468 }
469
470 pub fn build_large_list_scalar(self) -> ScalarValue {
472 ScalarValue::LargeList(Arc::new(self.build_large_list_array()))
473 }
474
475 pub fn build_fixed_size_list_array(self, list_size: usize) -> FixedSizeListArray {
477 let (field, arr) = self.into_field_and_arr();
478 FixedSizeListArray::new(field, list_size as i32, arr, None)
479 }
480
481 pub fn build_fixed_size_list_scalar(self, list_size: usize) -> ScalarValue {
483 ScalarValue::FixedSizeList(Arc::new(self.build_fixed_size_list_array(list_size)))
484 }
485
486 fn into_field_and_arr(self) -> (Arc<Field>, ArrayRef) {
488 let Self {
489 arr,
490 nullable,
491 field_name,
492 } = self;
493 let data_type = arr.data_type().to_owned();
494 let field = match field_name {
495 Some(name) => Field::new(name, data_type, nullable),
496 None => Field::new_list_field(data_type, nullable),
497 };
498 (Arc::new(field), arr)
499 }
500}
501
502pub fn arrays_into_list_array(
524 arr: impl IntoIterator<Item = ArrayRef>,
525) -> Result<ListArray> {
526 let arr = arr.into_iter().collect::<Vec<_>>();
527 if arr.is_empty() {
528 return _internal_err!("Cannot wrap empty array into list array");
529 }
530
531 let lens = arr.iter().map(|x| x.len()).collect::<Vec<_>>();
532 let data_type = arr[0].data_type().to_owned();
534 let values = arr.iter().map(|x| x.as_ref()).collect::<Vec<_>>();
535 Ok(ListArray::new(
536 Arc::new(Field::new_list_field(data_type, true)),
537 OffsetBuffer::from_lengths(lens),
538 arrow::compute::concat(values.as_slice())?,
539 None,
540 ))
541}
542
543pub fn list_to_arrays<O: OffsetSizeTrait>(a: &ArrayRef) -> Vec<ArrayRef> {
545 a.as_list::<O>().iter().flatten().collect::<Vec<_>>()
546}
547
548pub fn fixed_size_list_to_arrays(a: &ArrayRef) -> Vec<ArrayRef> {
550 a.as_fixed_size_list().iter().flatten().collect::<Vec<_>>()
551}
552
553pub fn base_type(data_type: &DataType) -> DataType {
569 match data_type {
570 DataType::List(field)
571 | DataType::LargeList(field)
572 | DataType::FixedSizeList(field, _) => base_type(field.data_type()),
573 _ => data_type.to_owned(),
574 }
575}
576
577#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
579pub enum ListCoercion {
580 FixedSizedListToList,
582}
583
584pub fn coerced_type_with_base_type_only(
597 data_type: &DataType,
598 base_type: &DataType,
599 array_coercion: Option<&ListCoercion>,
600) -> DataType {
601 match (data_type, array_coercion) {
602 (DataType::List(field), _)
603 | (DataType::FixedSizeList(field, _), Some(ListCoercion::FixedSizedListToList)) =>
604 {
605 let field_type = coerced_type_with_base_type_only(
606 field.data_type(),
607 base_type,
608 array_coercion,
609 );
610
611 DataType::List(Arc::new(Field::new(
612 field.name(),
613 field_type,
614 field.is_nullable(),
615 )))
616 }
617 (DataType::FixedSizeList(field, len), _) => {
618 let field_type = coerced_type_with_base_type_only(
619 field.data_type(),
620 base_type,
621 array_coercion,
622 );
623
624 DataType::FixedSizeList(
625 Arc::new(Field::new(field.name(), field_type, field.is_nullable())),
626 *len,
627 )
628 }
629 (DataType::LargeList(field), _) => {
630 let field_type = coerced_type_with_base_type_only(
631 field.data_type(),
632 base_type,
633 array_coercion,
634 );
635
636 DataType::LargeList(Arc::new(Field::new(
637 field.name(),
638 field_type,
639 field.is_nullable(),
640 )))
641 }
642
643 _ => base_type.clone(),
644 }
645}
646
647pub fn coerced_fixed_size_list_to_list(data_type: &DataType) -> DataType {
649 match data_type {
650 DataType::List(field) | DataType::FixedSizeList(field, _) => {
651 let field_type = coerced_fixed_size_list_to_list(field.data_type());
652
653 DataType::List(Arc::new(Field::new(
654 field.name(),
655 field_type,
656 field.is_nullable(),
657 )))
658 }
659 DataType::LargeList(field) => {
660 let field_type = coerced_fixed_size_list_to_list(field.data_type());
661
662 DataType::LargeList(Arc::new(Field::new(
663 field.name(),
664 field_type,
665 field.is_nullable(),
666 )))
667 }
668
669 _ => data_type.clone(),
670 }
671}
672
673pub fn list_ndims(data_type: &DataType) -> u64 {
675 match data_type {
676 DataType::List(field)
677 | DataType::LargeList(field)
678 | DataType::FixedSizeList(field, _) => 1 + list_ndims(field.data_type()),
679 _ => 0,
680 }
681}
682
683pub mod datafusion_strsim {
685 use std::cmp::min;
688 use std::str::Chars;
689
690 struct StringWrapper<'a>(&'a str);
691
692 impl<'b> IntoIterator for &StringWrapper<'b> {
693 type Item = char;
694 type IntoIter = Chars<'b>;
695
696 fn into_iter(self) -> Self::IntoIter {
697 self.0.chars()
698 }
699 }
700
701 fn generic_levenshtein<'a, 'b, Iter1, Iter2, Elem1, Elem2>(
704 a: &'a Iter1,
705 b: &'b Iter2,
706 ) -> usize
707 where
708 &'a Iter1: IntoIterator<Item = Elem1>,
709 &'b Iter2: IntoIterator<Item = Elem2>,
710 Elem1: PartialEq<Elem2>,
711 {
712 let b_len = b.into_iter().count();
713
714 if a.into_iter().next().is_none() {
715 return b_len;
716 }
717
718 let mut cache: Vec<usize> = (1..b_len + 1).collect();
719
720 let mut result = 0;
721
722 for (i, a_elem) in a.into_iter().enumerate() {
723 result = i + 1;
724 let mut distance_b = i;
725
726 for (j, b_elem) in b.into_iter().enumerate() {
727 let cost = if a_elem == b_elem { 0usize } else { 1usize };
728 let distance_a = distance_b + cost;
729 distance_b = cache[j];
730 result = min(result + 1, min(distance_a, distance_b + 1));
731 cache[j] = result;
732 }
733 }
734
735 result
736 }
737
738 pub fn levenshtein(a: &str, b: &str) -> usize {
747 generic_levenshtein(&StringWrapper(a), &StringWrapper(b))
748 }
749
750 pub fn normalized_levenshtein(a: &str, b: &str) -> f64 {
764 if a.is_empty() && b.is_empty() {
765 return 1.0;
766 }
767 1.0 - (levenshtein(a, b) as f64)
768 / (a.chars().count().max(b.chars().count()) as f64)
769 }
770}
771
772pub fn merge_and_order_indices<T: Borrow<usize>, S: Borrow<usize>>(
775 first: impl IntoIterator<Item = T>,
776 second: impl IntoIterator<Item = S>,
777) -> Vec<usize> {
778 let mut result: Vec<_> = first
779 .into_iter()
780 .map(|e| *e.borrow())
781 .chain(second.into_iter().map(|e| *e.borrow()))
782 .collect::<HashSet<_>>()
783 .into_iter()
784 .collect();
785 result.sort();
786 result
787}
788
789pub fn set_difference<T: Borrow<usize>, S: Borrow<usize>>(
792 first: impl IntoIterator<Item = T>,
793 second: impl IntoIterator<Item = S>,
794) -> Vec<usize> {
795 let set: HashSet<_> = second.into_iter().map(|e| *e.borrow()).collect();
796 first
797 .into_iter()
798 .map(|e| *e.borrow())
799 .filter(|e| !set.contains(e))
800 .collect()
801}
802
803pub fn find_indices<T: PartialEq, S: Borrow<T>>(
806 items: &[T],
807 targets: impl IntoIterator<Item = S>,
808) -> Result<Vec<usize>> {
809 targets
810 .into_iter()
811 .map(|target| items.iter().position(|e| target.borrow().eq(e)))
812 .collect::<Option<_>>()
813 .ok_or_else(|| _exec_datafusion_err!("Target not found"))
814}
815
816pub fn transpose<T>(original: Vec<Vec<T>>) -> Vec<Vec<T>> {
818 match original.as_slice() {
819 [] => vec![],
820 [first, ..] => {
821 let mut result = (0..first.len()).map(|_| vec![]).collect::<Vec<_>>();
822 for row in original {
823 for (item, transposed_row) in row.into_iter().zip(&mut result) {
824 transposed_row.push(item);
825 }
826 }
827 result
828 }
829 }
830}
831
832pub fn combine_limit(
876 parent_skip: usize,
877 parent_fetch: Option<usize>,
878 child_skip: usize,
879 child_fetch: Option<usize>,
880) -> (usize, Option<usize>) {
881 let combined_skip = child_skip.saturating_add(parent_skip);
882
883 let combined_fetch = match (parent_fetch, child_fetch) {
884 (Some(parent_fetch), Some(child_fetch)) => {
885 Some(min(parent_fetch, child_fetch.saturating_sub(parent_skip)))
886 }
887 (Some(parent_fetch), None) => Some(parent_fetch),
888 (None, Some(child_fetch)) => Some(child_fetch.saturating_sub(parent_skip)),
889 (None, None) => None,
890 };
891
892 (combined_skip, combined_fetch)
893}
894
895pub fn get_available_parallelism() -> usize {
900 available_parallelism()
901 .unwrap_or(NonZero::new(1).expect("literal value `1` shouldn't be zero"))
902 .get()
903}
904
905pub fn take_function_args<const N: usize, T>(
932 function_name: &str,
933 args: impl IntoIterator<Item = T>,
934) -> Result<[T; N]> {
935 let args = args.into_iter().collect::<Vec<_>>();
936 args.try_into().map_err(|v: Vec<T>| {
937 _exec_datafusion_err!(
938 "{} function requires {} {}, got {}",
939 function_name,
940 N,
941 if N == 1 { "argument" } else { "arguments" },
942 v.len()
943 )
944 })
945}
946
947pub fn make_list_array_indices<T: ArrowPrimitiveType>(
949 offsets: &OffsetBuffer<T::Native>,
950) -> PrimitiveArray<T> {
951 let mut indices = Vec::with_capacity(
952 offsets.last().unwrap().as_usize() - offsets.first().unwrap().as_usize(),
953 );
954
955 for (i, (&start, &end)) in std::iter::zip(&offsets[..], &offsets[1..]).enumerate() {
956 indices.extend(std::iter::repeat_n(
957 T::Native::usize_as(i),
958 end.as_usize() - start.as_usize(),
959 ));
960 }
961
962 PrimitiveArray::new(indices.into(), None)
963}
964
965pub fn make_list_element_indices<T: ArrowPrimitiveType>(
967 offsets: &OffsetBuffer<T::Native>,
968) -> PrimitiveArray<T> {
969 let mut indices = vec![
970 T::default_value();
971 offsets.last().unwrap().as_usize()
972 - offsets.first().unwrap().as_usize()
973 ];
974
975 for (&start, &end) in std::iter::zip(&offsets[..], &offsets[1..]) {
976 for i in 0..end.as_usize() - start.as_usize() {
977 indices[start.as_usize() + i] = T::Native::usize_as(i);
978 }
979 }
980
981 PrimitiveArray::new(indices.into(), None)
982}
983
984pub fn make_fsl_array_indices(
986 list_size: i32,
987 array_len: usize,
988) -> PrimitiveArray<Int32Type> {
989 let mut indices = vec![0; list_size as usize * array_len];
990
991 for i in 0..array_len {
992 for j in 0..list_size as usize {
993 indices[i + j] = i as i32;
994 }
995 }
996
997 PrimitiveArray::new(indices.into(), None)
998}
999
1000pub fn make_fsl_element_indices(
1002 list_size: i32,
1003 array_len: usize,
1004) -> PrimitiveArray<Int32Type> {
1005 let mut indices = vec![0; list_size as usize * array_len];
1006
1007 for i in 0..array_len {
1008 for j in 0..list_size as usize {
1009 indices[i + j] = j as i32;
1010 }
1011 }
1012
1013 PrimitiveArray::new(indices.into(), None)
1014}
1015
1016pub fn list_values(array: &dyn Array) -> Result<&ArrayRef> {
1017 match array.data_type() {
1018 DataType::List(_) => Ok(array.as_list::<i32>().values()),
1019 DataType::LargeList(_) => Ok(array.as_list::<i64>().values()),
1020 DataType::FixedSizeList(_, _) => Ok(array.as_fixed_size_list().values()),
1021 other => _exec_err!("expected list, got {other}"),
1022 }
1023}
1024
1025pub fn list_indices(array: &dyn Array) -> Result<ArrayRef> {
1026 match array.data_type() {
1027 DataType::List(_) => Ok(Arc::new(make_list_array_indices::<Int32Type>(
1028 array.as_list().offsets(),
1029 ))),
1030 DataType::LargeList(_) => Ok(Arc::new(make_list_array_indices::<Int64Type>(
1031 array.as_list().offsets(),
1032 ))),
1033 DataType::FixedSizeList(_, _) => {
1034 let fixed_size_list = array.as_fixed_size_list();
1035
1036 Ok(Arc::new(make_fsl_array_indices(
1037 fixed_size_list.value_length(),
1038 fixed_size_list.len(),
1039 )))
1040 }
1041 other => _exec_err!("expected list, got {other}"),
1042 }
1043}
1044
1045pub fn elements_indices(array: &dyn Array) -> Result<ArrayRef> {
1046 match array.data_type() {
1047 DataType::List(_) => Ok(Arc::new(make_list_element_indices::<Int32Type>(
1048 array.as_list::<i32>().offsets(),
1049 ))),
1050 DataType::LargeList(_) => Ok(Arc::new(make_list_element_indices::<Int64Type>(
1051 array.as_list::<i64>().offsets(),
1052 ))),
1053 DataType::FixedSizeList(_, _) => {
1054 let fixed_size_list = array.as_fixed_size_list();
1055
1056 Ok(Arc::new(make_fsl_element_indices(
1057 fixed_size_list.value_length(),
1058 fixed_size_list.len(),
1059 )))
1060 }
1061 other => _exec_err!("expected list, got {other}"),
1062 }
1063}
1064
1065#[cfg(test)]
1066mod tests {
1067 use super::*;
1068 use crate::ScalarValue::Null;
1069 use arrow::array::Float64Array;
1070 use sqlparser::ast::Ident;
1071 use sqlparser::tokenizer::Span;
1072
1073 #[test]
1074 fn test_bisect_linear_left_and_right() -> Result<()> {
1075 let arrays: Vec<ArrayRef> = vec![
1076 Arc::new(Float64Array::from(vec![5.0, 7.0, 8.0, 9., 10.])),
1077 Arc::new(Float64Array::from(vec![2.0, 3.0, 3.0, 4.0, 5.0])),
1078 Arc::new(Float64Array::from(vec![5.0, 7.0, 8.0, 10., 11.0])),
1079 Arc::new(Float64Array::from(vec![15.0, 13.0, 8.0, 5., 0.0])),
1080 ];
1081 let search_tuple: Vec<ScalarValue> = vec![
1082 ScalarValue::Float64(Some(8.0)),
1083 ScalarValue::Float64(Some(3.0)),
1084 ScalarValue::Float64(Some(8.0)),
1085 ScalarValue::Float64(Some(8.0)),
1086 ];
1087 let ords = [
1088 SortOptions {
1089 descending: false,
1090 nulls_first: true,
1091 },
1092 SortOptions {
1093 descending: false,
1094 nulls_first: true,
1095 },
1096 SortOptions {
1097 descending: false,
1098 nulls_first: true,
1099 },
1100 SortOptions {
1101 descending: true,
1102 nulls_first: true,
1103 },
1104 ];
1105 let res = bisect::<true>(&arrays, &search_tuple, &ords)?;
1106 assert_eq!(res, 2);
1107 let res = bisect::<false>(&arrays, &search_tuple, &ords)?;
1108 assert_eq!(res, 3);
1109 let res = linear_search::<true>(&arrays, &search_tuple, &ords)?;
1110 assert_eq!(res, 2);
1111 let res = linear_search::<false>(&arrays, &search_tuple, &ords)?;
1112 assert_eq!(res, 3);
1113 Ok(())
1114 }
1115
1116 #[test]
1117 fn vector_ord() {
1118 assert!(vec![1, 0, 0, 0, 0, 0, 0, 1] < vec![1, 0, 0, 0, 0, 0, 0, 2]);
1119 assert!(vec![1, 0, 0, 0, 0, 0, 1, 1] > vec![1, 0, 0, 0, 0, 0, 0, 2]);
1120 assert!(
1121 vec![
1122 ScalarValue::Int32(Some(2)),
1123 Null,
1124 ScalarValue::Int32(Some(0)),
1125 ] < vec![
1126 ScalarValue::Int32(Some(2)),
1127 Null,
1128 ScalarValue::Int32(Some(1)),
1129 ]
1130 );
1131 assert!(
1132 vec![
1133 ScalarValue::Int32(Some(2)),
1134 ScalarValue::Int32(None),
1135 ScalarValue::Int32(Some(0)),
1136 ] < vec![
1137 ScalarValue::Int32(Some(2)),
1138 ScalarValue::Int32(None),
1139 ScalarValue::Int32(Some(1)),
1140 ]
1141 );
1142 }
1143
1144 #[test]
1145 fn ord_same_type() {
1146 assert!((ScalarValue::Int32(Some(2)) < ScalarValue::Int32(Some(3))));
1147 }
1148
1149 #[test]
1150 fn test_bisect_linear_left_and_right_diff_sort() -> Result<()> {
1151 let arrays: Vec<ArrayRef> =
1153 vec![Arc::new(Float64Array::from(vec![4.0, 3.0, 2.0, 1.0, 0.0]))];
1154 let search_tuple: Vec<ScalarValue> = vec![ScalarValue::Float64(Some(4.0))];
1155 let ords = [SortOptions {
1156 descending: true,
1157 nulls_first: true,
1158 }];
1159 let res = bisect::<true>(&arrays, &search_tuple, &ords)?;
1160 assert_eq!(res, 0);
1161 let res = linear_search::<true>(&arrays, &search_tuple, &ords)?;
1162 assert_eq!(res, 0);
1163
1164 let arrays: Vec<ArrayRef> =
1166 vec![Arc::new(Float64Array::from(vec![4.0, 3.0, 2.0, 1.0, 0.0]))];
1167 let search_tuple: Vec<ScalarValue> = vec![ScalarValue::Float64(Some(4.0))];
1168 let ords = [SortOptions {
1169 descending: true,
1170 nulls_first: true,
1171 }];
1172 let res = bisect::<false>(&arrays, &search_tuple, &ords)?;
1173 assert_eq!(res, 1);
1174 let res = linear_search::<false>(&arrays, &search_tuple, &ords)?;
1175 assert_eq!(res, 1);
1176
1177 let arrays: Vec<ArrayRef> =
1179 vec![Arc::new(Float64Array::from(vec![5.0, 7.0, 8.0, 9., 10.]))];
1180 let search_tuple: Vec<ScalarValue> = vec![ScalarValue::Float64(Some(7.0))];
1181 let ords = [SortOptions {
1182 descending: false,
1183 nulls_first: true,
1184 }];
1185 let res = bisect::<true>(&arrays, &search_tuple, &ords)?;
1186 assert_eq!(res, 1);
1187 let res = linear_search::<true>(&arrays, &search_tuple, &ords)?;
1188 assert_eq!(res, 1);
1189
1190 let arrays: Vec<ArrayRef> =
1192 vec![Arc::new(Float64Array::from(vec![5.0, 7.0, 8.0, 9., 10.]))];
1193 let search_tuple: Vec<ScalarValue> = vec![ScalarValue::Float64(Some(7.0))];
1194 let ords = [SortOptions {
1195 descending: false,
1196 nulls_first: true,
1197 }];
1198 let res = bisect::<false>(&arrays, &search_tuple, &ords)?;
1199 assert_eq!(res, 2);
1200 let res = linear_search::<false>(&arrays, &search_tuple, &ords)?;
1201 assert_eq!(res, 2);
1202
1203 let arrays: Vec<ArrayRef> = vec![
1204 Arc::new(Float64Array::from(vec![5.0, 7.0, 8.0, 8.0, 9., 10.])),
1205 Arc::new(Float64Array::from(vec![10.0, 9.0, 8.0, 7.5, 7., 6.])),
1206 ];
1207 let search_tuple: Vec<ScalarValue> = vec![
1208 ScalarValue::Float64(Some(8.0)),
1209 ScalarValue::Float64(Some(8.0)),
1210 ];
1211 let ords = [
1212 SortOptions {
1213 descending: false,
1214 nulls_first: true,
1215 },
1216 SortOptions {
1217 descending: true,
1218 nulls_first: true,
1219 },
1220 ];
1221 let res = bisect::<false>(&arrays, &search_tuple, &ords)?;
1222 assert_eq!(res, 3);
1223 let res = linear_search::<false>(&arrays, &search_tuple, &ords)?;
1224 assert_eq!(res, 3);
1225
1226 let res = bisect::<true>(&arrays, &search_tuple, &ords)?;
1227 assert_eq!(res, 2);
1228 let res = linear_search::<true>(&arrays, &search_tuple, &ords)?;
1229 assert_eq!(res, 2);
1230 Ok(())
1231 }
1232
1233 #[test]
1234 fn test_evaluate_partition_ranges() -> Result<()> {
1235 let arrays: Vec<ArrayRef> = vec![
1236 Arc::new(Float64Array::from(vec![1.0, 1.0, 1.0, 2.0, 2.0, 2.0])),
1237 Arc::new(Float64Array::from(vec![4.0, 4.0, 3.0, 2.0, 1.0, 1.0])),
1238 ];
1239 let n_row = arrays[0].len();
1240 let options: Vec<SortOptions> = vec![
1241 SortOptions {
1242 descending: false,
1243 nulls_first: false,
1244 },
1245 SortOptions {
1246 descending: true,
1247 nulls_first: false,
1248 },
1249 ];
1250 let sort_columns = arrays
1251 .into_iter()
1252 .zip(options)
1253 .map(|(values, options)| SortColumn {
1254 values,
1255 options: Some(options),
1256 })
1257 .collect::<Vec<_>>();
1258 let ranges = evaluate_partition_ranges(n_row, &sort_columns)?;
1259 assert_eq!(ranges.len(), 4);
1260 assert_eq!(ranges[0], Range { start: 0, end: 2 });
1261 assert_eq!(ranges[1], Range { start: 2, end: 3 });
1262 assert_eq!(ranges[2], Range { start: 3, end: 4 });
1263 assert_eq!(ranges[3], Range { start: 4, end: 6 });
1264 Ok(())
1265 }
1266
1267 #[cfg(feature = "sql")]
1268 #[test]
1269 fn test_quote_identifier() -> Result<()> {
1270 let cases = vec![
1271 ("foo", r#"foo"#),
1272 ("_foo", r#"_foo"#),
1273 ("foo_bar", r#"foo_bar"#),
1274 ("foo-bar", r#""foo-bar""#),
1275 ("foo.bar", r#""foo.bar""#),
1277 ("Foo", r#""Foo""#),
1278 ("Foo.Bar", r#""Foo.Bar""#),
1279 ("test1", r#"test1"#),
1281 ("1test", r#""1test""#),
1282 ];
1283
1284 for (identifier, quoted_identifier) in cases {
1285 println!("input: \n{identifier}\nquoted_identifier:\n{quoted_identifier}");
1286
1287 assert_eq!(quote_identifier(identifier), quoted_identifier);
1288
1289 let quote_style = if quoted_identifier.starts_with('"') {
1292 Some('"')
1293 } else {
1294 None
1295 };
1296
1297 let expected_parsed = vec![Ident {
1298 value: identifier.to_string(),
1299 quote_style,
1300 span: Span::empty(),
1301 }];
1302
1303 assert_eq!(
1304 parse_identifiers(quoted_identifier).unwrap(),
1305 expected_parsed
1306 );
1307 }
1308
1309 Ok(())
1310 }
1311
1312 #[test]
1313 fn test_get_at_indices() -> Result<()> {
1314 let in_vec = vec![1, 2, 3, 4, 5, 6, 7];
1315 assert_eq!(get_at_indices(&in_vec, [0, 2])?, vec![1, 3]);
1316 assert_eq!(get_at_indices(&in_vec, [4, 2])?, vec![5, 3]);
1317 assert!(get_at_indices(&in_vec, [7]).is_err());
1319 Ok(())
1320 }
1321
1322 #[test]
1323 fn test_longest_consecutive_prefix() {
1324 assert_eq!(longest_consecutive_prefix([0, 3, 4]), 1);
1325 assert_eq!(longest_consecutive_prefix([0, 1, 3, 4]), 2);
1326 assert_eq!(longest_consecutive_prefix([0, 1, 2, 3, 4]), 5);
1327 assert_eq!(longest_consecutive_prefix([1, 2, 3, 4]), 0);
1328 }
1329
1330 #[test]
1331 fn test_merge_and_order_indices() {
1332 assert_eq!(
1333 merge_and_order_indices([0, 3, 4], [1, 3, 5]),
1334 vec![0, 1, 3, 4, 5]
1335 );
1336 assert_eq!(
1338 merge_and_order_indices([3, 0, 4], [5, 1, 3]),
1339 vec![0, 1, 3, 4, 5]
1340 );
1341 }
1342
1343 #[test]
1344 fn test_set_difference() {
1345 assert_eq!(set_difference([0, 3, 4], [1, 2]), vec![0, 3, 4]);
1346 assert_eq!(set_difference([0, 3, 4], [1, 2, 4]), vec![0, 3]);
1347 assert_eq!(set_difference([3, 4, 0], [1, 2, 4]), vec![3, 0]);
1349 assert_eq!(set_difference([0, 3, 4], [4, 1, 2]), vec![0, 3]);
1350 assert_eq!(set_difference([3, 4, 0], [4, 1, 2]), vec![3, 0]);
1351 }
1352
1353 #[test]
1354 fn test_find_indices() -> Result<()> {
1355 assert_eq!(find_indices(&[0, 3, 4], [0, 3, 4])?, vec![0, 1, 2]);
1356 assert_eq!(find_indices(&[0, 3, 4], [0, 4, 3])?, vec![0, 2, 1]);
1357 assert_eq!(find_indices(&[3, 0, 4], [0, 3])?, vec![1, 0]);
1358 assert!(find_indices(&[0, 3], [0, 3, 4]).is_err());
1359 assert!(find_indices(&[0, 3, 4], [0, 2]).is_err());
1360 Ok(())
1361 }
1362
1363 #[test]
1364 fn test_transpose() -> Result<()> {
1365 let in_data = vec![vec![1, 2, 3], vec![4, 5, 6]];
1366 let transposed = transpose(in_data);
1367 let expected = vec![vec![1, 4], vec![2, 5], vec![3, 6]];
1368 assert_eq!(expected, transposed);
1369 Ok(())
1370 }
1371}