datafusion_functions/datetime/
date_bin.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::any::Any;
19use std::sync::Arc;
20
21use arrow::array::temporal_conversions::NANOSECONDS;
22use arrow::array::types::{
23    ArrowTimestampType, IntervalDayTimeType, IntervalMonthDayNanoType,
24    TimestampMicrosecondType, TimestampMillisecondType, TimestampNanosecondType,
25    TimestampSecondType,
26};
27use arrow::array::{ArrayRef, PrimitiveArray};
28use arrow::datatypes::DataType::{Null, Timestamp, Utf8};
29use arrow::datatypes::IntervalUnit::{DayTime, MonthDayNano};
30use arrow::datatypes::TimeUnit::{Microsecond, Millisecond, Nanosecond, Second};
31use arrow::datatypes::{DataType, TimeUnit};
32
33use datafusion_common::cast::as_primitive_array;
34use datafusion_common::{exec_err, not_impl_err, plan_err, Result, ScalarValue};
35use datafusion_expr::sort_properties::{ExprProperties, SortProperties};
36use datafusion_expr::TypeSignature::Exact;
37use datafusion_expr::{
38    ColumnarValue, Documentation, ScalarUDFImpl, Signature, Volatility, TIMEZONE_WILDCARD,
39};
40use datafusion_macros::user_doc;
41
42use chrono::{DateTime, Datelike, Duration, Months, TimeDelta, Utc};
43
44#[user_doc(
45    doc_section(label = "Time and Date Functions"),
46    description = r#"
47Calculates time intervals and returns the start of the interval nearest to the specified timestamp. Use `date_bin` to downsample time series data by grouping rows into time-based "bins" or "windows" and applying an aggregate or selector function to each window.
48
49For example, if you "bin" or "window" data into 15 minute intervals, an input timestamp of `2023-01-01T18:18:18Z` will be updated to the start time of the 15 minute bin it is in: `2023-01-01T18:15:00Z`.
50"#,
51    syntax_example = "date_bin(interval, expression, origin-timestamp)",
52    sql_example = r#"```sql
53-- Bin the timestamp into 1 day intervals
54> SELECT date_bin(interval '1 day', time) as bin
55FROM VALUES ('2023-01-01T18:18:18Z'), ('2023-01-03T19:00:03Z')  t(time);
56+---------------------+
57| bin                 |
58+---------------------+
59| 2023-01-01T00:00:00 |
60| 2023-01-03T00:00:00 |
61+---------------------+
622 row(s) fetched.
63
64-- Bin the timestamp into 1 day intervals starting at 3AM on  2023-01-01
65> SELECT date_bin(interval '1 day', time,  '2023-01-01T03:00:00') as bin
66FROM VALUES ('2023-01-01T18:18:18Z'), ('2023-01-03T19:00:03Z')  t(time);
67+---------------------+
68| bin                 |
69+---------------------+
70| 2023-01-01T03:00:00 |
71| 2023-01-03T03:00:00 |
72+---------------------+
732 row(s) fetched.
74```"#,
75    argument(name = "interval", description = "Bin interval."),
76    argument(
77        name = "expression",
78        description = "Time expression to operate on. Can be a constant, column, or function."
79    ),
80    argument(
81        name = "origin-timestamp",
82        description = r#"Optional. Starting point used to determine bin boundaries. If not specified defaults 1970-01-01T00:00:00Z (the UNIX epoch in UTC). The following intervals are supported:
83
84    - nanoseconds
85    - microseconds
86    - milliseconds
87    - seconds
88    - minutes
89    - hours
90    - days
91    - weeks
92    - months
93    - years
94    - century
95"#
96    )
97)]
98#[derive(Debug, PartialEq, Eq, Hash)]
99pub struct DateBinFunc {
100    signature: Signature,
101}
102
103impl Default for DateBinFunc {
104    fn default() -> Self {
105        Self::new()
106    }
107}
108
109impl DateBinFunc {
110    pub fn new() -> Self {
111        let base_sig = |array_type: TimeUnit| {
112            vec![
113                Exact(vec![
114                    DataType::Interval(MonthDayNano),
115                    Timestamp(array_type, None),
116                    Timestamp(Nanosecond, None),
117                ]),
118                Exact(vec![
119                    DataType::Interval(MonthDayNano),
120                    Timestamp(array_type, Some(TIMEZONE_WILDCARD.into())),
121                    Timestamp(Nanosecond, Some(TIMEZONE_WILDCARD.into())),
122                ]),
123                Exact(vec![
124                    DataType::Interval(DayTime),
125                    Timestamp(array_type, None),
126                    Timestamp(Nanosecond, None),
127                ]),
128                Exact(vec![
129                    DataType::Interval(DayTime),
130                    Timestamp(array_type, Some(TIMEZONE_WILDCARD.into())),
131                    Timestamp(Nanosecond, Some(TIMEZONE_WILDCARD.into())),
132                ]),
133                Exact(vec![
134                    DataType::Interval(MonthDayNano),
135                    Timestamp(array_type, None),
136                ]),
137                Exact(vec![
138                    DataType::Interval(MonthDayNano),
139                    Timestamp(array_type, Some(TIMEZONE_WILDCARD.into())),
140                ]),
141                Exact(vec![
142                    DataType::Interval(DayTime),
143                    Timestamp(array_type, None),
144                ]),
145                Exact(vec![
146                    DataType::Interval(DayTime),
147                    Timestamp(array_type, Some(TIMEZONE_WILDCARD.into())),
148                ]),
149            ]
150        };
151
152        let full_sig = [Nanosecond, Microsecond, Millisecond, Second]
153            .into_iter()
154            .map(base_sig)
155            .collect::<Vec<_>>()
156            .concat();
157
158        Self {
159            signature: Signature::one_of(full_sig, Volatility::Immutable),
160        }
161    }
162}
163
164impl ScalarUDFImpl for DateBinFunc {
165    fn as_any(&self) -> &dyn Any {
166        self
167    }
168
169    fn name(&self) -> &str {
170        "date_bin"
171    }
172
173    fn signature(&self) -> &Signature {
174        &self.signature
175    }
176
177    fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
178        match &arg_types[1] {
179            Timestamp(Nanosecond, None) | Utf8 | Null => Ok(Timestamp(Nanosecond, None)),
180            Timestamp(Nanosecond, tz_opt) => Ok(Timestamp(Nanosecond, tz_opt.clone())),
181            Timestamp(Microsecond, tz_opt) => Ok(Timestamp(Microsecond, tz_opt.clone())),
182            Timestamp(Millisecond, tz_opt) => Ok(Timestamp(Millisecond, tz_opt.clone())),
183            Timestamp(Second, tz_opt) => Ok(Timestamp(Second, tz_opt.clone())),
184            _ => plan_err!(
185                "The date_bin function can only accept timestamp as the second arg."
186            ),
187        }
188    }
189
190    fn invoke_with_args(
191        &self,
192        args: datafusion_expr::ScalarFunctionArgs,
193    ) -> Result<ColumnarValue> {
194        let args = &args.args;
195        if args.len() == 2 {
196            // Default to unix EPOCH
197            let origin = ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(
198                Some(0),
199                Some("+00:00".into()),
200            ));
201            date_bin_impl(&args[0], &args[1], &origin)
202        } else if args.len() == 3 {
203            date_bin_impl(&args[0], &args[1], &args[2])
204        } else {
205            exec_err!("DATE_BIN expected two or three arguments")
206        }
207    }
208
209    fn output_ordering(&self, input: &[ExprProperties]) -> Result<SortProperties> {
210        // The DATE_BIN function preserves the order of its second argument.
211        let step = &input[0];
212        let date_value = &input[1];
213        let reference = input.get(2);
214
215        if step.sort_properties.eq(&SortProperties::Singleton)
216            && reference
217                .map(|r| r.sort_properties.eq(&SortProperties::Singleton))
218                .unwrap_or(true)
219        {
220            Ok(date_value.sort_properties)
221        } else {
222            Ok(SortProperties::Unordered)
223        }
224    }
225    fn documentation(&self) -> Option<&Documentation> {
226        self.doc()
227    }
228}
229
230enum Interval {
231    Nanoseconds(i64),
232    Months(i64),
233}
234
235impl Interval {
236    /// Returns (`stride_nanos`, `fn`) where
237    ///
238    /// 1. `stride_nanos` is a width, in nanoseconds
239    /// 2. `fn` is a function that takes (stride_nanos, source, origin)
240    ///
241    /// `source` is the timestamp being binned
242    ///
243    /// `origin`  is the time, in nanoseconds, where windows are measured from
244    fn bin_fn(&self) -> (i64, fn(i64, i64, i64) -> i64) {
245        match self {
246            Interval::Nanoseconds(nanos) => (*nanos, date_bin_nanos_interval),
247            Interval::Months(months) => (*months, date_bin_months_interval),
248        }
249    }
250}
251
252// return time in nanoseconds that the source timestamp falls into based on the stride and origin
253fn date_bin_nanos_interval(stride_nanos: i64, source: i64, origin: i64) -> i64 {
254    let time_diff = source - origin;
255
256    // distance from origin to bin
257    let time_delta = compute_distance(time_diff, stride_nanos);
258
259    origin + time_delta
260}
261
262// distance from origin to bin
263fn compute_distance(time_diff: i64, stride: i64) -> i64 {
264    let time_delta = time_diff - (time_diff % stride);
265
266    if time_diff < 0 && stride > 1 && time_delta != time_diff {
267        // The origin is later than the source timestamp, round down to the previous bin
268        time_delta - stride
269    } else {
270        time_delta
271    }
272}
273
274// return time in nanoseconds that the source timestamp falls into based on the stride and origin
275fn date_bin_months_interval(stride_months: i64, source: i64, origin: i64) -> i64 {
276    // convert source and origin to DateTime<Utc>
277    let source_date = to_utc_date_time(source);
278    let origin_date = to_utc_date_time(origin);
279
280    // calculate the number of months between the source and origin
281    let month_diff = (source_date.year() - origin_date.year()) * 12
282        + source_date.month() as i32
283        - origin_date.month() as i32;
284
285    // distance from origin to bin
286    let month_delta = compute_distance(month_diff as i64, stride_months);
287
288    let mut bin_time = if month_delta < 0 {
289        origin_date - Months::new(month_delta.unsigned_abs() as u32)
290    } else {
291        origin_date + Months::new(month_delta as u32)
292    };
293
294    // If origin is not midnight of first date of the month, the bin_time may be larger than the source
295    // In this case, we need to move back to previous bin
296    if bin_time > source_date {
297        let month_delta = month_delta - stride_months;
298        bin_time = if month_delta < 0 {
299            origin_date - Months::new(month_delta.unsigned_abs() as u32)
300        } else {
301            origin_date + Months::new(month_delta as u32)
302        };
303    }
304
305    bin_time.timestamp_nanos_opt().unwrap()
306}
307
308fn to_utc_date_time(nanos: i64) -> DateTime<Utc> {
309    let secs = nanos / 1_000_000_000;
310    let nsec = (nanos % 1_000_000_000) as u32;
311    DateTime::from_timestamp(secs, nsec).unwrap()
312}
313
314// Supported intervals:
315//  1. IntervalDayTime: this means that the stride is in days, hours, minutes, seconds and milliseconds
316//     We will assume month interval won't be converted into this type
317//     TODO (my next PR): without `INTERVAL` keyword, the stride was converted into ScalarValue::IntervalDayTime somewhere
318//             for month interval. I need to find that and make it ScalarValue::IntervalMonthDayNano instead
319// 2. IntervalMonthDayNano
320fn date_bin_impl(
321    stride: &ColumnarValue,
322    array: &ColumnarValue,
323    origin: &ColumnarValue,
324) -> Result<ColumnarValue> {
325    let stride = match stride {
326        ColumnarValue::Scalar(ScalarValue::IntervalDayTime(Some(v))) => {
327            let (days, ms) = IntervalDayTimeType::to_parts(*v);
328            let nanos = (TimeDelta::try_days(days as i64).unwrap()
329                + TimeDelta::try_milliseconds(ms as i64).unwrap())
330            .num_nanoseconds();
331
332            match nanos {
333                Some(v) => Interval::Nanoseconds(v),
334                _ => return exec_err!("DATE_BIN stride argument is too large"),
335            }
336        }
337        ColumnarValue::Scalar(ScalarValue::IntervalMonthDayNano(Some(v))) => {
338            let (months, days, nanos) = IntervalMonthDayNanoType::to_parts(*v);
339
340            // If interval is months, its origin must be midnight of first date of the month
341            if months != 0 {
342                // Return error if days or nanos is not zero
343                if days != 0 || nanos != 0 {
344                    return not_impl_err!(
345                        "DATE_BIN stride does not support combination of month, day and nanosecond intervals"
346                    );
347                } else {
348                    Interval::Months(months as i64)
349                }
350            } else {
351                let nanos = (TimeDelta::try_days(days as i64).unwrap()
352                    + Duration::nanoseconds(nanos))
353                .num_nanoseconds();
354                match nanos {
355                    Some(v) => Interval::Nanoseconds(v),
356                    _ => return exec_err!("DATE_BIN stride argument is too large"),
357                }
358            }
359        }
360        ColumnarValue::Scalar(v) => {
361            return exec_err!(
362                "DATE_BIN expects stride argument to be an INTERVAL but got {}",
363                v.data_type()
364            );
365        }
366        ColumnarValue::Array(_) => {
367            return not_impl_err!(
368            "DATE_BIN only supports literal values for the stride argument, not arrays"
369        );
370        }
371    };
372
373    let origin = match origin {
374        ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(v), _)) => *v,
375        ColumnarValue::Scalar(v) => {
376            return exec_err!(
377                "DATE_BIN expects origin argument to be a TIMESTAMP with nanosecond precision but got {}",
378                v.data_type()
379            );
380        }
381        ColumnarValue::Array(_) => {
382            return not_impl_err!(
383            "DATE_BIN only supports literal values for the origin argument, not arrays"
384        );
385        }
386    };
387
388    let (stride, stride_fn) = stride.bin_fn();
389
390    // Return error if stride is 0
391    if stride == 0 {
392        return exec_err!("DATE_BIN stride must be non-zero");
393    }
394
395    fn stride_map_fn<T: ArrowTimestampType>(
396        origin: i64,
397        stride: i64,
398        stride_fn: fn(i64, i64, i64) -> i64,
399    ) -> impl Fn(i64) -> i64 {
400        let scale = match T::UNIT {
401            Nanosecond => 1,
402            Microsecond => NANOSECONDS / 1_000_000,
403            Millisecond => NANOSECONDS / 1_000,
404            Second => NANOSECONDS,
405        };
406        move |x: i64| stride_fn(stride, x * scale, origin) / scale
407    }
408
409    Ok(match array {
410        ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(v, tz_opt)) => {
411            let apply_stride_fn =
412                stride_map_fn::<TimestampNanosecondType>(origin, stride, stride_fn);
413            ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(
414                v.map(apply_stride_fn),
415                tz_opt.clone(),
416            ))
417        }
418        ColumnarValue::Scalar(ScalarValue::TimestampMicrosecond(v, tz_opt)) => {
419            let apply_stride_fn =
420                stride_map_fn::<TimestampMicrosecondType>(origin, stride, stride_fn);
421            ColumnarValue::Scalar(ScalarValue::TimestampMicrosecond(
422                v.map(apply_stride_fn),
423                tz_opt.clone(),
424            ))
425        }
426        ColumnarValue::Scalar(ScalarValue::TimestampMillisecond(v, tz_opt)) => {
427            let apply_stride_fn =
428                stride_map_fn::<TimestampMillisecondType>(origin, stride, stride_fn);
429            ColumnarValue::Scalar(ScalarValue::TimestampMillisecond(
430                v.map(apply_stride_fn),
431                tz_opt.clone(),
432            ))
433        }
434        ColumnarValue::Scalar(ScalarValue::TimestampSecond(v, tz_opt)) => {
435            let apply_stride_fn =
436                stride_map_fn::<TimestampSecondType>(origin, stride, stride_fn);
437            ColumnarValue::Scalar(ScalarValue::TimestampSecond(
438                v.map(apply_stride_fn),
439                tz_opt.clone(),
440            ))
441        }
442
443        ColumnarValue::Array(array) => {
444            fn transform_array_with_stride<T>(
445                origin: i64,
446                stride: i64,
447                stride_fn: fn(i64, i64, i64) -> i64,
448                array: &ArrayRef,
449                tz_opt: &Option<Arc<str>>,
450            ) -> Result<ColumnarValue>
451            where
452                T: ArrowTimestampType,
453            {
454                let array = as_primitive_array::<T>(array)?;
455                let apply_stride_fn = stride_map_fn::<T>(origin, stride, stride_fn);
456                let array: PrimitiveArray<T> = array
457                    .unary(apply_stride_fn)
458                    .with_timezone_opt(tz_opt.clone());
459
460                Ok(ColumnarValue::Array(Arc::new(array)))
461            }
462
463            match array.data_type() {
464                Timestamp(Nanosecond, tz_opt) => {
465                    transform_array_with_stride::<TimestampNanosecondType>(
466                        origin, stride, stride_fn, array, tz_opt,
467                    )?
468                }
469                Timestamp(Microsecond, tz_opt) => {
470                    transform_array_with_stride::<TimestampMicrosecondType>(
471                        origin, stride, stride_fn, array, tz_opt,
472                    )?
473                }
474                Timestamp(Millisecond, tz_opt) => {
475                    transform_array_with_stride::<TimestampMillisecondType>(
476                        origin, stride, stride_fn, array, tz_opt,
477                    )?
478                }
479                Timestamp(Second, tz_opt) => {
480                    transform_array_with_stride::<TimestampSecondType>(
481                        origin, stride, stride_fn, array, tz_opt,
482                    )?
483                }
484                _ => {
485                    return exec_err!(
486                        "DATE_BIN expects source argument to be a TIMESTAMP but got {}",
487                        array.data_type()
488                    );
489                }
490            }
491        }
492        _ => {
493            return exec_err!(
494                "DATE_BIN expects source argument to be a TIMESTAMP scalar or array"
495            );
496        }
497    })
498}
499
500#[cfg(test)]
501mod tests {
502    use std::sync::Arc;
503
504    use crate::datetime::date_bin::{date_bin_nanos_interval, DateBinFunc};
505    use arrow::array::types::TimestampNanosecondType;
506    use arrow::array::{Array, IntervalDayTimeArray, TimestampNanosecondArray};
507    use arrow::compute::kernels::cast_utils::string_to_timestamp_nanos;
508    use arrow::datatypes::{DataType, Field, FieldRef, TimeUnit};
509
510    use arrow_buffer::{IntervalDayTime, IntervalMonthDayNano};
511    use datafusion_common::{DataFusionError, ScalarValue};
512    use datafusion_expr::{ColumnarValue, ScalarUDFImpl};
513
514    use chrono::TimeDelta;
515    use datafusion_common::config::ConfigOptions;
516
517    fn invoke_date_bin_with_args(
518        args: Vec<ColumnarValue>,
519        number_rows: usize,
520        return_field: &FieldRef,
521    ) -> Result<ColumnarValue, DataFusionError> {
522        let arg_fields = args
523            .iter()
524            .map(|arg| Field::new("a", arg.data_type(), true).into())
525            .collect::<Vec<_>>();
526
527        let args = datafusion_expr::ScalarFunctionArgs {
528            args,
529            arg_fields,
530            number_rows,
531            return_field: Arc::clone(return_field),
532            config_options: Arc::new(ConfigOptions::default()),
533            lambdas: None,
534        };
535        DateBinFunc::new().invoke_with_args(args)
536    }
537
538    #[test]
539    fn test_date_bin() {
540        let return_field = &Arc::new(Field::new(
541            "f",
542            DataType::Timestamp(TimeUnit::Nanosecond, None),
543            true,
544        ));
545
546        let mut args = vec![
547            ColumnarValue::Scalar(ScalarValue::IntervalDayTime(Some(IntervalDayTime {
548                days: 0,
549                milliseconds: 1,
550            }))),
551            ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
552            ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
553        ];
554        let res = invoke_date_bin_with_args(args, 1, return_field);
555        assert!(res.is_ok());
556
557        let timestamps = Arc::new((1..6).map(Some).collect::<TimestampNanosecondArray>());
558        let batch_len = timestamps.len();
559        args = vec![
560            ColumnarValue::Scalar(ScalarValue::IntervalDayTime(Some(IntervalDayTime {
561                days: 0,
562                milliseconds: 1,
563            }))),
564            ColumnarValue::Array(timestamps),
565            ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
566        ];
567        let res = invoke_date_bin_with_args(args, batch_len, return_field);
568        assert!(res.is_ok());
569
570        args = vec![
571            ColumnarValue::Scalar(ScalarValue::IntervalDayTime(Some(IntervalDayTime {
572                days: 0,
573                milliseconds: 1,
574            }))),
575            ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
576        ];
577        let res = invoke_date_bin_with_args(args, 1, return_field);
578        assert!(res.is_ok());
579
580        // stride supports month-day-nano
581        args = vec![
582            ColumnarValue::Scalar(ScalarValue::IntervalMonthDayNano(Some(
583                IntervalMonthDayNano {
584                    months: 0,
585                    days: 0,
586                    nanoseconds: 1,
587                },
588            ))),
589            ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
590            ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
591        ];
592        let res = invoke_date_bin_with_args(args, 1, return_field);
593        assert!(res.is_ok());
594
595        //
596        // Fallible test cases
597        //
598
599        // invalid number of arguments
600        args = vec![ColumnarValue::Scalar(ScalarValue::IntervalDayTime(Some(
601            IntervalDayTime {
602                days: 0,
603                milliseconds: 1,
604            },
605        )))];
606        let res = invoke_date_bin_with_args(args, 1, return_field);
607        assert_eq!(
608            res.err().unwrap().strip_backtrace(),
609            "Execution error: DATE_BIN expected two or three arguments"
610        );
611
612        // stride: invalid type
613        args = vec![
614            ColumnarValue::Scalar(ScalarValue::IntervalYearMonth(Some(1))),
615            ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
616            ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
617        ];
618        let res = invoke_date_bin_with_args(args, 1, return_field);
619        assert_eq!(
620            res.err().unwrap().strip_backtrace(),
621            "Execution error: DATE_BIN expects stride argument to be an INTERVAL but got Interval(YearMonth)"
622        );
623
624        // stride: invalid value
625
626        args = vec![
627            ColumnarValue::Scalar(ScalarValue::IntervalDayTime(Some(IntervalDayTime {
628                days: 0,
629                milliseconds: 0,
630            }))),
631            ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
632            ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
633        ];
634
635        let res = invoke_date_bin_with_args(args, 1, return_field);
636        assert_eq!(
637            res.err().unwrap().strip_backtrace(),
638            "Execution error: DATE_BIN stride must be non-zero"
639        );
640
641        // stride: overflow of day-time interval
642        args = vec![
643            ColumnarValue::Scalar(ScalarValue::IntervalDayTime(Some(
644                IntervalDayTime::MAX,
645            ))),
646            ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
647            ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
648        ];
649        let res = invoke_date_bin_with_args(args, 1, return_field);
650        assert_eq!(
651            res.err().unwrap().strip_backtrace(),
652            "Execution error: DATE_BIN stride argument is too large"
653        );
654
655        // stride: overflow of month-day-nano interval
656        args = vec![
657            ColumnarValue::Scalar(ScalarValue::new_interval_mdn(0, i32::MAX, 1)),
658            ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
659            ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
660        ];
661        let res = invoke_date_bin_with_args(args, 1, return_field);
662        assert_eq!(
663            res.err().unwrap().strip_backtrace(),
664            "Execution error: DATE_BIN stride argument is too large"
665        );
666
667        // stride: month intervals
668        args = vec![
669            ColumnarValue::Scalar(ScalarValue::new_interval_mdn(1, 1, 1)),
670            ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
671            ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
672        ];
673        let res = invoke_date_bin_with_args(args, 1, return_field);
674        assert_eq!(
675            res.err().unwrap().strip_backtrace(),
676            "This feature is not implemented: DATE_BIN stride does not support combination of month, day and nanosecond intervals"
677        );
678
679        // origin: invalid type
680        args = vec![
681            ColumnarValue::Scalar(ScalarValue::IntervalDayTime(Some(IntervalDayTime {
682                days: 0,
683                milliseconds: 1,
684            }))),
685            ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
686            ColumnarValue::Scalar(ScalarValue::TimestampMicrosecond(Some(1), None)),
687        ];
688        let res = invoke_date_bin_with_args(args, 1, return_field);
689        assert_eq!(
690            res.err().unwrap().strip_backtrace(),
691            "Execution error: DATE_BIN expects origin argument to be a TIMESTAMP with nanosecond precision but got Timestamp(µs)"
692        );
693
694        args = vec![
695            ColumnarValue::Scalar(ScalarValue::IntervalDayTime(Some(IntervalDayTime {
696                days: 0,
697                milliseconds: 1,
698            }))),
699            ColumnarValue::Scalar(ScalarValue::TimestampMicrosecond(Some(1), None)),
700            ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
701        ];
702        let res = invoke_date_bin_with_args(args, 1, return_field);
703        assert!(res.is_ok());
704
705        // unsupported array type for stride
706        let intervals = Arc::new(
707            (1..6)
708                .map(|x| {
709                    Some(IntervalDayTime {
710                        days: 0,
711                        milliseconds: x,
712                    })
713                })
714                .collect::<IntervalDayTimeArray>(),
715        );
716        args = vec![
717            ColumnarValue::Array(intervals),
718            ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
719            ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
720        ];
721        let res = invoke_date_bin_with_args(args, 1, return_field);
722        assert_eq!(
723            res.err().unwrap().strip_backtrace(),
724            "This feature is not implemented: DATE_BIN only supports literal values for the stride argument, not arrays"
725        );
726
727        // unsupported array type for origin
728        let timestamps = Arc::new((1..6).map(Some).collect::<TimestampNanosecondArray>());
729        let batch_len = timestamps.len();
730        args = vec![
731            ColumnarValue::Scalar(ScalarValue::IntervalDayTime(Some(IntervalDayTime {
732                days: 0,
733                milliseconds: 1,
734            }))),
735            ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
736            ColumnarValue::Array(timestamps),
737        ];
738        let res = invoke_date_bin_with_args(args, batch_len, return_field);
739        assert_eq!(
740            res.err().unwrap().strip_backtrace(),
741            "This feature is not implemented: DATE_BIN only supports literal values for the origin argument, not arrays"
742        );
743    }
744
745    #[test]
746    fn test_date_bin_timezones() {
747        let cases = [
748            (
749                vec![
750                    "2020-09-08T00:00:00Z",
751                    "2020-09-08T01:00:00Z",
752                    "2020-09-08T02:00:00Z",
753                    "2020-09-08T03:00:00Z",
754                    "2020-09-08T04:00:00Z",
755                ],
756                Some("+00".into()),
757                "1970-01-01T00:00:00Z",
758                vec![
759                    "2020-09-08T00:00:00Z",
760                    "2020-09-08T00:00:00Z",
761                    "2020-09-08T00:00:00Z",
762                    "2020-09-08T00:00:00Z",
763                    "2020-09-08T00:00:00Z",
764                ],
765            ),
766            (
767                vec![
768                    "2020-09-08T00:00:00Z",
769                    "2020-09-08T01:00:00Z",
770                    "2020-09-08T02:00:00Z",
771                    "2020-09-08T03:00:00Z",
772                    "2020-09-08T04:00:00Z",
773                ],
774                None,
775                "1970-01-01T00:00:00Z",
776                vec![
777                    "2020-09-08T00:00:00Z",
778                    "2020-09-08T00:00:00Z",
779                    "2020-09-08T00:00:00Z",
780                    "2020-09-08T00:00:00Z",
781                    "2020-09-08T00:00:00Z",
782                ],
783            ),
784            (
785                vec![
786                    "2020-09-08T00:00:00Z",
787                    "2020-09-08T01:00:00Z",
788                    "2020-09-08T02:00:00Z",
789                    "2020-09-08T03:00:00Z",
790                    "2020-09-08T04:00:00Z",
791                ],
792                Some("-02".into()),
793                "1970-01-01T00:00:00Z",
794                vec![
795                    "2020-09-08T00:00:00Z",
796                    "2020-09-08T00:00:00Z",
797                    "2020-09-08T00:00:00Z",
798                    "2020-09-08T00:00:00Z",
799                    "2020-09-08T00:00:00Z",
800                ],
801            ),
802            (
803                vec![
804                    "2020-09-08T00:00:00+05",
805                    "2020-09-08T01:00:00+05",
806                    "2020-09-08T02:00:00+05",
807                    "2020-09-08T03:00:00+05",
808                    "2020-09-08T04:00:00+05",
809                ],
810                Some("+05".into()),
811                "1970-01-01T00:00:00+05",
812                vec![
813                    "2020-09-08T00:00:00+05",
814                    "2020-09-08T00:00:00+05",
815                    "2020-09-08T00:00:00+05",
816                    "2020-09-08T00:00:00+05",
817                    "2020-09-08T00:00:00+05",
818                ],
819            ),
820            (
821                vec![
822                    "2020-09-08T00:00:00+08",
823                    "2020-09-08T01:00:00+08",
824                    "2020-09-08T02:00:00+08",
825                    "2020-09-08T03:00:00+08",
826                    "2020-09-08T04:00:00+08",
827                ],
828                Some("+08".into()),
829                "1970-01-01T00:00:00+08",
830                vec![
831                    "2020-09-08T00:00:00+08",
832                    "2020-09-08T00:00:00+08",
833                    "2020-09-08T00:00:00+08",
834                    "2020-09-08T00:00:00+08",
835                    "2020-09-08T00:00:00+08",
836                ],
837            ),
838        ];
839
840        cases
841            .iter()
842            .for_each(|(original, tz_opt, origin, expected)| {
843                let input = original
844                    .iter()
845                    .map(|s| Some(string_to_timestamp_nanos(s).unwrap()))
846                    .collect::<TimestampNanosecondArray>()
847                    .with_timezone_opt(tz_opt.clone());
848                let right = expected
849                    .iter()
850                    .map(|s| Some(string_to_timestamp_nanos(s).unwrap()))
851                    .collect::<TimestampNanosecondArray>()
852                    .with_timezone_opt(tz_opt.clone());
853                let batch_len = input.len();
854                let args = vec![
855                    ColumnarValue::Scalar(ScalarValue::new_interval_dt(1, 0)),
856                    ColumnarValue::Array(Arc::new(input)),
857                    ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(
858                        Some(string_to_timestamp_nanos(origin).unwrap()),
859                        tz_opt.clone(),
860                    )),
861                ];
862                let return_field = &Arc::new(Field::new(
863                    "f",
864                    DataType::Timestamp(TimeUnit::Nanosecond, tz_opt.clone()),
865                    true,
866                ));
867                let result =
868                    invoke_date_bin_with_args(args, batch_len, return_field).unwrap();
869
870                if let ColumnarValue::Array(result) = result {
871                    assert_eq!(
872                        result.data_type(),
873                        &DataType::Timestamp(TimeUnit::Nanosecond, tz_opt.clone())
874                    );
875                    let left = arrow::array::cast::as_primitive_array::<
876                        TimestampNanosecondType,
877                    >(&result);
878                    assert_eq!(left, &right);
879                } else {
880                    panic!("unexpected column type");
881                }
882            });
883    }
884
885    #[test]
886    fn test_date_bin_single() {
887        let cases = [
888            (
889                (
890                    TimeDelta::try_minutes(15),
891                    "2004-04-09T02:03:04.123456789Z",
892                    "2001-01-01T00:00:00",
893                ),
894                "2004-04-09T02:00:00Z",
895            ),
896            (
897                (
898                    TimeDelta::try_minutes(15),
899                    "2004-04-09T02:03:04.123456789Z",
900                    "2001-01-01T00:02:30",
901                ),
902                "2004-04-09T02:02:30Z",
903            ),
904            (
905                (
906                    TimeDelta::try_minutes(15),
907                    "2004-04-09T02:03:04.123456789Z",
908                    "2005-01-01T00:02:30",
909                ),
910                "2004-04-09T02:02:30Z",
911            ),
912            (
913                (
914                    TimeDelta::try_hours(1),
915                    "2004-04-09T02:03:04.123456789Z",
916                    "2001-01-01T00:00:00",
917                ),
918                "2004-04-09T02:00:00Z",
919            ),
920            (
921                (
922                    TimeDelta::try_seconds(10),
923                    "2004-04-09T02:03:11.123456789Z",
924                    "2001-01-01T00:00:00",
925                ),
926                "2004-04-09T02:03:10Z",
927            ),
928        ];
929
930        cases
931            .iter()
932            .for_each(|((stride, source, origin), expected)| {
933                let stride = stride.unwrap();
934                let stride1 = stride.num_nanoseconds().unwrap();
935                let source1 = string_to_timestamp_nanos(source).unwrap();
936                let origin1 = string_to_timestamp_nanos(origin).unwrap();
937
938                let expected1 = string_to_timestamp_nanos(expected).unwrap();
939                let result = date_bin_nanos_interval(stride1, source1, origin1);
940                assert_eq!(result, expected1, "{source} = {expected}");
941            })
942    }
943
944    #[test]
945    fn test_date_bin_before_epoch() {
946        let cases = [
947            (
948                (TimeDelta::try_minutes(15), "1969-12-31T23:44:59.999999999"),
949                "1969-12-31T23:30:00",
950            ),
951            (
952                (TimeDelta::try_minutes(15), "1969-12-31T23:45:00"),
953                "1969-12-31T23:45:00",
954            ),
955            (
956                (TimeDelta::try_minutes(15), "1969-12-31T23:45:00.000000001"),
957                "1969-12-31T23:45:00",
958            ),
959        ];
960
961        cases.iter().for_each(|((stride, source), expected)| {
962            let stride = stride.unwrap();
963            let stride1 = stride.num_nanoseconds().unwrap();
964            let source1 = string_to_timestamp_nanos(source).unwrap();
965
966            let expected1 = string_to_timestamp_nanos(expected).unwrap();
967            let result = date_bin_nanos_interval(stride1, source1, 0);
968            assert_eq!(result, expected1, "{source} = {expected}");
969        })
970    }
971}