1use std::any::Any;
19use std::sync::Arc;
20
21use arrow::array::temporal_conversions::NANOSECONDS;
22use arrow::array::types::{
23 ArrowTimestampType, IntervalDayTimeType, IntervalMonthDayNanoType,
24 TimestampMicrosecondType, TimestampMillisecondType, TimestampNanosecondType,
25 TimestampSecondType,
26};
27use arrow::array::{ArrayRef, PrimitiveArray};
28use arrow::datatypes::DataType::{Null, Timestamp, Utf8};
29use arrow::datatypes::IntervalUnit::{DayTime, MonthDayNano};
30use arrow::datatypes::TimeUnit::{Microsecond, Millisecond, Nanosecond, Second};
31use arrow::datatypes::{DataType, TimeUnit};
32
33use datafusion_common::cast::as_primitive_array;
34use datafusion_common::{exec_err, not_impl_err, plan_err, Result, ScalarValue};
35use datafusion_expr::sort_properties::{ExprProperties, SortProperties};
36use datafusion_expr::TypeSignature::Exact;
37use datafusion_expr::{
38 ColumnarValue, Documentation, ScalarUDFImpl, Signature, Volatility, TIMEZONE_WILDCARD,
39};
40use datafusion_macros::user_doc;
41
42use chrono::{DateTime, Datelike, Duration, Months, TimeDelta, Utc};
43
44#[user_doc(
45 doc_section(label = "Time and Date Functions"),
46 description = r#"
47Calculates time intervals and returns the start of the interval nearest to the specified timestamp. Use `date_bin` to downsample time series data by grouping rows into time-based "bins" or "windows" and applying an aggregate or selector function to each window.
48
49For example, if you "bin" or "window" data into 15 minute intervals, an input timestamp of `2023-01-01T18:18:18Z` will be updated to the start time of the 15 minute bin it is in: `2023-01-01T18:15:00Z`.
50"#,
51 syntax_example = "date_bin(interval, expression, origin-timestamp)",
52 sql_example = r#"```sql
53-- Bin the timestamp into 1 day intervals
54> SELECT date_bin(interval '1 day', time) as bin
55FROM VALUES ('2023-01-01T18:18:18Z'), ('2023-01-03T19:00:03Z') t(time);
56+---------------------+
57| bin |
58+---------------------+
59| 2023-01-01T00:00:00 |
60| 2023-01-03T00:00:00 |
61+---------------------+
622 row(s) fetched.
63
64-- Bin the timestamp into 1 day intervals starting at 3AM on 2023-01-01
65> SELECT date_bin(interval '1 day', time, '2023-01-01T03:00:00') as bin
66FROM VALUES ('2023-01-01T18:18:18Z'), ('2023-01-03T19:00:03Z') t(time);
67+---------------------+
68| bin |
69+---------------------+
70| 2023-01-01T03:00:00 |
71| 2023-01-03T03:00:00 |
72+---------------------+
732 row(s) fetched.
74```"#,
75 argument(name = "interval", description = "Bin interval."),
76 argument(
77 name = "expression",
78 description = "Time expression to operate on. Can be a constant, column, or function."
79 ),
80 argument(
81 name = "origin-timestamp",
82 description = r#"Optional. Starting point used to determine bin boundaries. If not specified defaults 1970-01-01T00:00:00Z (the UNIX epoch in UTC). The following intervals are supported:
83
84 - nanoseconds
85 - microseconds
86 - milliseconds
87 - seconds
88 - minutes
89 - hours
90 - days
91 - weeks
92 - months
93 - years
94 - century
95"#
96 )
97)]
98#[derive(Debug, PartialEq, Eq, Hash)]
99pub struct DateBinFunc {
100 signature: Signature,
101}
102
103impl Default for DateBinFunc {
104 fn default() -> Self {
105 Self::new()
106 }
107}
108
109impl DateBinFunc {
110 pub fn new() -> Self {
111 let base_sig = |array_type: TimeUnit| {
112 vec![
113 Exact(vec![
114 DataType::Interval(MonthDayNano),
115 Timestamp(array_type, None),
116 Timestamp(Nanosecond, None),
117 ]),
118 Exact(vec![
119 DataType::Interval(MonthDayNano),
120 Timestamp(array_type, Some(TIMEZONE_WILDCARD.into())),
121 Timestamp(Nanosecond, Some(TIMEZONE_WILDCARD.into())),
122 ]),
123 Exact(vec![
124 DataType::Interval(DayTime),
125 Timestamp(array_type, None),
126 Timestamp(Nanosecond, None),
127 ]),
128 Exact(vec![
129 DataType::Interval(DayTime),
130 Timestamp(array_type, Some(TIMEZONE_WILDCARD.into())),
131 Timestamp(Nanosecond, Some(TIMEZONE_WILDCARD.into())),
132 ]),
133 Exact(vec![
134 DataType::Interval(MonthDayNano),
135 Timestamp(array_type, None),
136 ]),
137 Exact(vec![
138 DataType::Interval(MonthDayNano),
139 Timestamp(array_type, Some(TIMEZONE_WILDCARD.into())),
140 ]),
141 Exact(vec![
142 DataType::Interval(DayTime),
143 Timestamp(array_type, None),
144 ]),
145 Exact(vec![
146 DataType::Interval(DayTime),
147 Timestamp(array_type, Some(TIMEZONE_WILDCARD.into())),
148 ]),
149 ]
150 };
151
152 let full_sig = [Nanosecond, Microsecond, Millisecond, Second]
153 .into_iter()
154 .map(base_sig)
155 .collect::<Vec<_>>()
156 .concat();
157
158 Self {
159 signature: Signature::one_of(full_sig, Volatility::Immutable),
160 }
161 }
162}
163
164impl ScalarUDFImpl for DateBinFunc {
165 fn as_any(&self) -> &dyn Any {
166 self
167 }
168
169 fn name(&self) -> &str {
170 "date_bin"
171 }
172
173 fn signature(&self) -> &Signature {
174 &self.signature
175 }
176
177 fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
178 match &arg_types[1] {
179 Timestamp(Nanosecond, None) | Utf8 | Null => Ok(Timestamp(Nanosecond, None)),
180 Timestamp(Nanosecond, tz_opt) => Ok(Timestamp(Nanosecond, tz_opt.clone())),
181 Timestamp(Microsecond, tz_opt) => Ok(Timestamp(Microsecond, tz_opt.clone())),
182 Timestamp(Millisecond, tz_opt) => Ok(Timestamp(Millisecond, tz_opt.clone())),
183 Timestamp(Second, tz_opt) => Ok(Timestamp(Second, tz_opt.clone())),
184 _ => plan_err!(
185 "The date_bin function can only accept timestamp as the second arg."
186 ),
187 }
188 }
189
190 fn invoke_with_args(
191 &self,
192 args: datafusion_expr::ScalarFunctionArgs,
193 ) -> Result<ColumnarValue> {
194 let args = &args.args;
195 if args.len() == 2 {
196 let origin = ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(
198 Some(0),
199 Some("+00:00".into()),
200 ));
201 date_bin_impl(&args[0], &args[1], &origin)
202 } else if args.len() == 3 {
203 date_bin_impl(&args[0], &args[1], &args[2])
204 } else {
205 exec_err!("DATE_BIN expected two or three arguments")
206 }
207 }
208
209 fn output_ordering(&self, input: &[ExprProperties]) -> Result<SortProperties> {
210 let step = &input[0];
212 let date_value = &input[1];
213 let reference = input.get(2);
214
215 if step.sort_properties.eq(&SortProperties::Singleton)
216 && reference
217 .map(|r| r.sort_properties.eq(&SortProperties::Singleton))
218 .unwrap_or(true)
219 {
220 Ok(date_value.sort_properties)
221 } else {
222 Ok(SortProperties::Unordered)
223 }
224 }
225 fn documentation(&self) -> Option<&Documentation> {
226 self.doc()
227 }
228}
229
230enum Interval {
231 Nanoseconds(i64),
232 Months(i64),
233}
234
235impl Interval {
236 fn bin_fn(&self) -> (i64, fn(i64, i64, i64) -> i64) {
245 match self {
246 Interval::Nanoseconds(nanos) => (*nanos, date_bin_nanos_interval),
247 Interval::Months(months) => (*months, date_bin_months_interval),
248 }
249 }
250}
251
252fn date_bin_nanos_interval(stride_nanos: i64, source: i64, origin: i64) -> i64 {
254 let time_diff = source - origin;
255
256 let time_delta = compute_distance(time_diff, stride_nanos);
258
259 origin + time_delta
260}
261
262fn compute_distance(time_diff: i64, stride: i64) -> i64 {
264 let time_delta = time_diff - (time_diff % stride);
265
266 if time_diff < 0 && stride > 1 && time_delta != time_diff {
267 time_delta - stride
269 } else {
270 time_delta
271 }
272}
273
274fn date_bin_months_interval(stride_months: i64, source: i64, origin: i64) -> i64 {
276 let source_date = to_utc_date_time(source);
278 let origin_date = to_utc_date_time(origin);
279
280 let month_diff = (source_date.year() - origin_date.year()) * 12
282 + source_date.month() as i32
283 - origin_date.month() as i32;
284
285 let month_delta = compute_distance(month_diff as i64, stride_months);
287
288 let mut bin_time = if month_delta < 0 {
289 origin_date - Months::new(month_delta.unsigned_abs() as u32)
290 } else {
291 origin_date + Months::new(month_delta as u32)
292 };
293
294 if bin_time > source_date {
297 let month_delta = month_delta - stride_months;
298 bin_time = if month_delta < 0 {
299 origin_date - Months::new(month_delta.unsigned_abs() as u32)
300 } else {
301 origin_date + Months::new(month_delta as u32)
302 };
303 }
304
305 bin_time.timestamp_nanos_opt().unwrap()
306}
307
308fn to_utc_date_time(nanos: i64) -> DateTime<Utc> {
309 let secs = nanos / 1_000_000_000;
310 let nsec = (nanos % 1_000_000_000) as u32;
311 DateTime::from_timestamp(secs, nsec).unwrap()
312}
313
314fn date_bin_impl(
321 stride: &ColumnarValue,
322 array: &ColumnarValue,
323 origin: &ColumnarValue,
324) -> Result<ColumnarValue> {
325 let stride = match stride {
326 ColumnarValue::Scalar(ScalarValue::IntervalDayTime(Some(v))) => {
327 let (days, ms) = IntervalDayTimeType::to_parts(*v);
328 let nanos = (TimeDelta::try_days(days as i64).unwrap()
329 + TimeDelta::try_milliseconds(ms as i64).unwrap())
330 .num_nanoseconds();
331
332 match nanos {
333 Some(v) => Interval::Nanoseconds(v),
334 _ => return exec_err!("DATE_BIN stride argument is too large"),
335 }
336 }
337 ColumnarValue::Scalar(ScalarValue::IntervalMonthDayNano(Some(v))) => {
338 let (months, days, nanos) = IntervalMonthDayNanoType::to_parts(*v);
339
340 if months != 0 {
342 if days != 0 || nanos != 0 {
344 return not_impl_err!(
345 "DATE_BIN stride does not support combination of month, day and nanosecond intervals"
346 );
347 } else {
348 Interval::Months(months as i64)
349 }
350 } else {
351 let nanos = (TimeDelta::try_days(days as i64).unwrap()
352 + Duration::nanoseconds(nanos))
353 .num_nanoseconds();
354 match nanos {
355 Some(v) => Interval::Nanoseconds(v),
356 _ => return exec_err!("DATE_BIN stride argument is too large"),
357 }
358 }
359 }
360 ColumnarValue::Scalar(v) => {
361 return exec_err!(
362 "DATE_BIN expects stride argument to be an INTERVAL but got {}",
363 v.data_type()
364 );
365 }
366 ColumnarValue::Array(_) => {
367 return not_impl_err!(
368 "DATE_BIN only supports literal values for the stride argument, not arrays"
369 );
370 }
371 };
372
373 let origin = match origin {
374 ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(v), _)) => *v,
375 ColumnarValue::Scalar(v) => {
376 return exec_err!(
377 "DATE_BIN expects origin argument to be a TIMESTAMP with nanosecond precision but got {}",
378 v.data_type()
379 );
380 }
381 ColumnarValue::Array(_) => {
382 return not_impl_err!(
383 "DATE_BIN only supports literal values for the origin argument, not arrays"
384 );
385 }
386 };
387
388 let (stride, stride_fn) = stride.bin_fn();
389
390 if stride == 0 {
392 return exec_err!("DATE_BIN stride must be non-zero");
393 }
394
395 fn stride_map_fn<T: ArrowTimestampType>(
396 origin: i64,
397 stride: i64,
398 stride_fn: fn(i64, i64, i64) -> i64,
399 ) -> impl Fn(i64) -> i64 {
400 let scale = match T::UNIT {
401 Nanosecond => 1,
402 Microsecond => NANOSECONDS / 1_000_000,
403 Millisecond => NANOSECONDS / 1_000,
404 Second => NANOSECONDS,
405 };
406 move |x: i64| stride_fn(stride, x * scale, origin) / scale
407 }
408
409 Ok(match array {
410 ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(v, tz_opt)) => {
411 let apply_stride_fn =
412 stride_map_fn::<TimestampNanosecondType>(origin, stride, stride_fn);
413 ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(
414 v.map(apply_stride_fn),
415 tz_opt.clone(),
416 ))
417 }
418 ColumnarValue::Scalar(ScalarValue::TimestampMicrosecond(v, tz_opt)) => {
419 let apply_stride_fn =
420 stride_map_fn::<TimestampMicrosecondType>(origin, stride, stride_fn);
421 ColumnarValue::Scalar(ScalarValue::TimestampMicrosecond(
422 v.map(apply_stride_fn),
423 tz_opt.clone(),
424 ))
425 }
426 ColumnarValue::Scalar(ScalarValue::TimestampMillisecond(v, tz_opt)) => {
427 let apply_stride_fn =
428 stride_map_fn::<TimestampMillisecondType>(origin, stride, stride_fn);
429 ColumnarValue::Scalar(ScalarValue::TimestampMillisecond(
430 v.map(apply_stride_fn),
431 tz_opt.clone(),
432 ))
433 }
434 ColumnarValue::Scalar(ScalarValue::TimestampSecond(v, tz_opt)) => {
435 let apply_stride_fn =
436 stride_map_fn::<TimestampSecondType>(origin, stride, stride_fn);
437 ColumnarValue::Scalar(ScalarValue::TimestampSecond(
438 v.map(apply_stride_fn),
439 tz_opt.clone(),
440 ))
441 }
442
443 ColumnarValue::Array(array) => {
444 fn transform_array_with_stride<T>(
445 origin: i64,
446 stride: i64,
447 stride_fn: fn(i64, i64, i64) -> i64,
448 array: &ArrayRef,
449 tz_opt: &Option<Arc<str>>,
450 ) -> Result<ColumnarValue>
451 where
452 T: ArrowTimestampType,
453 {
454 let array = as_primitive_array::<T>(array)?;
455 let apply_stride_fn = stride_map_fn::<T>(origin, stride, stride_fn);
456 let array: PrimitiveArray<T> = array
457 .unary(apply_stride_fn)
458 .with_timezone_opt(tz_opt.clone());
459
460 Ok(ColumnarValue::Array(Arc::new(array)))
461 }
462
463 match array.data_type() {
464 Timestamp(Nanosecond, tz_opt) => {
465 transform_array_with_stride::<TimestampNanosecondType>(
466 origin, stride, stride_fn, array, tz_opt,
467 )?
468 }
469 Timestamp(Microsecond, tz_opt) => {
470 transform_array_with_stride::<TimestampMicrosecondType>(
471 origin, stride, stride_fn, array, tz_opt,
472 )?
473 }
474 Timestamp(Millisecond, tz_opt) => {
475 transform_array_with_stride::<TimestampMillisecondType>(
476 origin, stride, stride_fn, array, tz_opt,
477 )?
478 }
479 Timestamp(Second, tz_opt) => {
480 transform_array_with_stride::<TimestampSecondType>(
481 origin, stride, stride_fn, array, tz_opt,
482 )?
483 }
484 _ => {
485 return exec_err!(
486 "DATE_BIN expects source argument to be a TIMESTAMP but got {}",
487 array.data_type()
488 );
489 }
490 }
491 }
492 _ => {
493 return exec_err!(
494 "DATE_BIN expects source argument to be a TIMESTAMP scalar or array"
495 );
496 }
497 })
498}
499
500#[cfg(test)]
501mod tests {
502 use std::sync::Arc;
503
504 use crate::datetime::date_bin::{date_bin_nanos_interval, DateBinFunc};
505 use arrow::array::types::TimestampNanosecondType;
506 use arrow::array::{Array, IntervalDayTimeArray, TimestampNanosecondArray};
507 use arrow::compute::kernels::cast_utils::string_to_timestamp_nanos;
508 use arrow::datatypes::{DataType, Field, FieldRef, TimeUnit};
509
510 use arrow_buffer::{IntervalDayTime, IntervalMonthDayNano};
511 use datafusion_common::{DataFusionError, ScalarValue};
512 use datafusion_expr::{ColumnarValue, ScalarUDFImpl};
513
514 use chrono::TimeDelta;
515 use datafusion_common::config::ConfigOptions;
516
517 fn invoke_date_bin_with_args(
518 args: Vec<ColumnarValue>,
519 number_rows: usize,
520 return_field: &FieldRef,
521 ) -> Result<ColumnarValue, DataFusionError> {
522 let arg_fields = args
523 .iter()
524 .map(|arg| Field::new("a", arg.data_type(), true).into())
525 .collect::<Vec<_>>();
526
527 let args = datafusion_expr::ScalarFunctionArgs {
528 args,
529 arg_fields,
530 number_rows,
531 return_field: Arc::clone(return_field),
532 config_options: Arc::new(ConfigOptions::default()),
533 lambdas: None,
534 };
535 DateBinFunc::new().invoke_with_args(args)
536 }
537
538 #[test]
539 fn test_date_bin() {
540 let return_field = &Arc::new(Field::new(
541 "f",
542 DataType::Timestamp(TimeUnit::Nanosecond, None),
543 true,
544 ));
545
546 let mut args = vec![
547 ColumnarValue::Scalar(ScalarValue::IntervalDayTime(Some(IntervalDayTime {
548 days: 0,
549 milliseconds: 1,
550 }))),
551 ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
552 ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
553 ];
554 let res = invoke_date_bin_with_args(args, 1, return_field);
555 assert!(res.is_ok());
556
557 let timestamps = Arc::new((1..6).map(Some).collect::<TimestampNanosecondArray>());
558 let batch_len = timestamps.len();
559 args = vec![
560 ColumnarValue::Scalar(ScalarValue::IntervalDayTime(Some(IntervalDayTime {
561 days: 0,
562 milliseconds: 1,
563 }))),
564 ColumnarValue::Array(timestamps),
565 ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
566 ];
567 let res = invoke_date_bin_with_args(args, batch_len, return_field);
568 assert!(res.is_ok());
569
570 args = vec![
571 ColumnarValue::Scalar(ScalarValue::IntervalDayTime(Some(IntervalDayTime {
572 days: 0,
573 milliseconds: 1,
574 }))),
575 ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
576 ];
577 let res = invoke_date_bin_with_args(args, 1, return_field);
578 assert!(res.is_ok());
579
580 args = vec![
582 ColumnarValue::Scalar(ScalarValue::IntervalMonthDayNano(Some(
583 IntervalMonthDayNano {
584 months: 0,
585 days: 0,
586 nanoseconds: 1,
587 },
588 ))),
589 ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
590 ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
591 ];
592 let res = invoke_date_bin_with_args(args, 1, return_field);
593 assert!(res.is_ok());
594
595 args = vec![ColumnarValue::Scalar(ScalarValue::IntervalDayTime(Some(
601 IntervalDayTime {
602 days: 0,
603 milliseconds: 1,
604 },
605 )))];
606 let res = invoke_date_bin_with_args(args, 1, return_field);
607 assert_eq!(
608 res.err().unwrap().strip_backtrace(),
609 "Execution error: DATE_BIN expected two or three arguments"
610 );
611
612 args = vec![
614 ColumnarValue::Scalar(ScalarValue::IntervalYearMonth(Some(1))),
615 ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
616 ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
617 ];
618 let res = invoke_date_bin_with_args(args, 1, return_field);
619 assert_eq!(
620 res.err().unwrap().strip_backtrace(),
621 "Execution error: DATE_BIN expects stride argument to be an INTERVAL but got Interval(YearMonth)"
622 );
623
624 args = vec![
627 ColumnarValue::Scalar(ScalarValue::IntervalDayTime(Some(IntervalDayTime {
628 days: 0,
629 milliseconds: 0,
630 }))),
631 ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
632 ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
633 ];
634
635 let res = invoke_date_bin_with_args(args, 1, return_field);
636 assert_eq!(
637 res.err().unwrap().strip_backtrace(),
638 "Execution error: DATE_BIN stride must be non-zero"
639 );
640
641 args = vec![
643 ColumnarValue::Scalar(ScalarValue::IntervalDayTime(Some(
644 IntervalDayTime::MAX,
645 ))),
646 ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
647 ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
648 ];
649 let res = invoke_date_bin_with_args(args, 1, return_field);
650 assert_eq!(
651 res.err().unwrap().strip_backtrace(),
652 "Execution error: DATE_BIN stride argument is too large"
653 );
654
655 args = vec![
657 ColumnarValue::Scalar(ScalarValue::new_interval_mdn(0, i32::MAX, 1)),
658 ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
659 ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
660 ];
661 let res = invoke_date_bin_with_args(args, 1, return_field);
662 assert_eq!(
663 res.err().unwrap().strip_backtrace(),
664 "Execution error: DATE_BIN stride argument is too large"
665 );
666
667 args = vec![
669 ColumnarValue::Scalar(ScalarValue::new_interval_mdn(1, 1, 1)),
670 ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
671 ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
672 ];
673 let res = invoke_date_bin_with_args(args, 1, return_field);
674 assert_eq!(
675 res.err().unwrap().strip_backtrace(),
676 "This feature is not implemented: DATE_BIN stride does not support combination of month, day and nanosecond intervals"
677 );
678
679 args = vec![
681 ColumnarValue::Scalar(ScalarValue::IntervalDayTime(Some(IntervalDayTime {
682 days: 0,
683 milliseconds: 1,
684 }))),
685 ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
686 ColumnarValue::Scalar(ScalarValue::TimestampMicrosecond(Some(1), None)),
687 ];
688 let res = invoke_date_bin_with_args(args, 1, return_field);
689 assert_eq!(
690 res.err().unwrap().strip_backtrace(),
691 "Execution error: DATE_BIN expects origin argument to be a TIMESTAMP with nanosecond precision but got Timestamp(µs)"
692 );
693
694 args = vec![
695 ColumnarValue::Scalar(ScalarValue::IntervalDayTime(Some(IntervalDayTime {
696 days: 0,
697 milliseconds: 1,
698 }))),
699 ColumnarValue::Scalar(ScalarValue::TimestampMicrosecond(Some(1), None)),
700 ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
701 ];
702 let res = invoke_date_bin_with_args(args, 1, return_field);
703 assert!(res.is_ok());
704
705 let intervals = Arc::new(
707 (1..6)
708 .map(|x| {
709 Some(IntervalDayTime {
710 days: 0,
711 milliseconds: x,
712 })
713 })
714 .collect::<IntervalDayTimeArray>(),
715 );
716 args = vec![
717 ColumnarValue::Array(intervals),
718 ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
719 ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
720 ];
721 let res = invoke_date_bin_with_args(args, 1, return_field);
722 assert_eq!(
723 res.err().unwrap().strip_backtrace(),
724 "This feature is not implemented: DATE_BIN only supports literal values for the stride argument, not arrays"
725 );
726
727 let timestamps = Arc::new((1..6).map(Some).collect::<TimestampNanosecondArray>());
729 let batch_len = timestamps.len();
730 args = vec![
731 ColumnarValue::Scalar(ScalarValue::IntervalDayTime(Some(IntervalDayTime {
732 days: 0,
733 milliseconds: 1,
734 }))),
735 ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
736 ColumnarValue::Array(timestamps),
737 ];
738 let res = invoke_date_bin_with_args(args, batch_len, return_field);
739 assert_eq!(
740 res.err().unwrap().strip_backtrace(),
741 "This feature is not implemented: DATE_BIN only supports literal values for the origin argument, not arrays"
742 );
743 }
744
745 #[test]
746 fn test_date_bin_timezones() {
747 let cases = [
748 (
749 vec![
750 "2020-09-08T00:00:00Z",
751 "2020-09-08T01:00:00Z",
752 "2020-09-08T02:00:00Z",
753 "2020-09-08T03:00:00Z",
754 "2020-09-08T04:00:00Z",
755 ],
756 Some("+00".into()),
757 "1970-01-01T00:00:00Z",
758 vec![
759 "2020-09-08T00:00:00Z",
760 "2020-09-08T00:00:00Z",
761 "2020-09-08T00:00:00Z",
762 "2020-09-08T00:00:00Z",
763 "2020-09-08T00:00:00Z",
764 ],
765 ),
766 (
767 vec![
768 "2020-09-08T00:00:00Z",
769 "2020-09-08T01:00:00Z",
770 "2020-09-08T02:00:00Z",
771 "2020-09-08T03:00:00Z",
772 "2020-09-08T04:00:00Z",
773 ],
774 None,
775 "1970-01-01T00:00:00Z",
776 vec![
777 "2020-09-08T00:00:00Z",
778 "2020-09-08T00:00:00Z",
779 "2020-09-08T00:00:00Z",
780 "2020-09-08T00:00:00Z",
781 "2020-09-08T00:00:00Z",
782 ],
783 ),
784 (
785 vec![
786 "2020-09-08T00:00:00Z",
787 "2020-09-08T01:00:00Z",
788 "2020-09-08T02:00:00Z",
789 "2020-09-08T03:00:00Z",
790 "2020-09-08T04:00:00Z",
791 ],
792 Some("-02".into()),
793 "1970-01-01T00:00:00Z",
794 vec![
795 "2020-09-08T00:00:00Z",
796 "2020-09-08T00:00:00Z",
797 "2020-09-08T00:00:00Z",
798 "2020-09-08T00:00:00Z",
799 "2020-09-08T00:00:00Z",
800 ],
801 ),
802 (
803 vec![
804 "2020-09-08T00:00:00+05",
805 "2020-09-08T01:00:00+05",
806 "2020-09-08T02:00:00+05",
807 "2020-09-08T03:00:00+05",
808 "2020-09-08T04:00:00+05",
809 ],
810 Some("+05".into()),
811 "1970-01-01T00:00:00+05",
812 vec![
813 "2020-09-08T00:00:00+05",
814 "2020-09-08T00:00:00+05",
815 "2020-09-08T00:00:00+05",
816 "2020-09-08T00:00:00+05",
817 "2020-09-08T00:00:00+05",
818 ],
819 ),
820 (
821 vec![
822 "2020-09-08T00:00:00+08",
823 "2020-09-08T01:00:00+08",
824 "2020-09-08T02:00:00+08",
825 "2020-09-08T03:00:00+08",
826 "2020-09-08T04:00:00+08",
827 ],
828 Some("+08".into()),
829 "1970-01-01T00:00:00+08",
830 vec![
831 "2020-09-08T00:00:00+08",
832 "2020-09-08T00:00:00+08",
833 "2020-09-08T00:00:00+08",
834 "2020-09-08T00:00:00+08",
835 "2020-09-08T00:00:00+08",
836 ],
837 ),
838 ];
839
840 cases
841 .iter()
842 .for_each(|(original, tz_opt, origin, expected)| {
843 let input = original
844 .iter()
845 .map(|s| Some(string_to_timestamp_nanos(s).unwrap()))
846 .collect::<TimestampNanosecondArray>()
847 .with_timezone_opt(tz_opt.clone());
848 let right = expected
849 .iter()
850 .map(|s| Some(string_to_timestamp_nanos(s).unwrap()))
851 .collect::<TimestampNanosecondArray>()
852 .with_timezone_opt(tz_opt.clone());
853 let batch_len = input.len();
854 let args = vec![
855 ColumnarValue::Scalar(ScalarValue::new_interval_dt(1, 0)),
856 ColumnarValue::Array(Arc::new(input)),
857 ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(
858 Some(string_to_timestamp_nanos(origin).unwrap()),
859 tz_opt.clone(),
860 )),
861 ];
862 let return_field = &Arc::new(Field::new(
863 "f",
864 DataType::Timestamp(TimeUnit::Nanosecond, tz_opt.clone()),
865 true,
866 ));
867 let result =
868 invoke_date_bin_with_args(args, batch_len, return_field).unwrap();
869
870 if let ColumnarValue::Array(result) = result {
871 assert_eq!(
872 result.data_type(),
873 &DataType::Timestamp(TimeUnit::Nanosecond, tz_opt.clone())
874 );
875 let left = arrow::array::cast::as_primitive_array::<
876 TimestampNanosecondType,
877 >(&result);
878 assert_eq!(left, &right);
879 } else {
880 panic!("unexpected column type");
881 }
882 });
883 }
884
885 #[test]
886 fn test_date_bin_single() {
887 let cases = [
888 (
889 (
890 TimeDelta::try_minutes(15),
891 "2004-04-09T02:03:04.123456789Z",
892 "2001-01-01T00:00:00",
893 ),
894 "2004-04-09T02:00:00Z",
895 ),
896 (
897 (
898 TimeDelta::try_minutes(15),
899 "2004-04-09T02:03:04.123456789Z",
900 "2001-01-01T00:02:30",
901 ),
902 "2004-04-09T02:02:30Z",
903 ),
904 (
905 (
906 TimeDelta::try_minutes(15),
907 "2004-04-09T02:03:04.123456789Z",
908 "2005-01-01T00:02:30",
909 ),
910 "2004-04-09T02:02:30Z",
911 ),
912 (
913 (
914 TimeDelta::try_hours(1),
915 "2004-04-09T02:03:04.123456789Z",
916 "2001-01-01T00:00:00",
917 ),
918 "2004-04-09T02:00:00Z",
919 ),
920 (
921 (
922 TimeDelta::try_seconds(10),
923 "2004-04-09T02:03:11.123456789Z",
924 "2001-01-01T00:00:00",
925 ),
926 "2004-04-09T02:03:10Z",
927 ),
928 ];
929
930 cases
931 .iter()
932 .for_each(|((stride, source, origin), expected)| {
933 let stride = stride.unwrap();
934 let stride1 = stride.num_nanoseconds().unwrap();
935 let source1 = string_to_timestamp_nanos(source).unwrap();
936 let origin1 = string_to_timestamp_nanos(origin).unwrap();
937
938 let expected1 = string_to_timestamp_nanos(expected).unwrap();
939 let result = date_bin_nanos_interval(stride1, source1, origin1);
940 assert_eq!(result, expected1, "{source} = {expected}");
941 })
942 }
943
944 #[test]
945 fn test_date_bin_before_epoch() {
946 let cases = [
947 (
948 (TimeDelta::try_minutes(15), "1969-12-31T23:44:59.999999999"),
949 "1969-12-31T23:30:00",
950 ),
951 (
952 (TimeDelta::try_minutes(15), "1969-12-31T23:45:00"),
953 "1969-12-31T23:45:00",
954 ),
955 (
956 (TimeDelta::try_minutes(15), "1969-12-31T23:45:00.000000001"),
957 "1969-12-31T23:45:00",
958 ),
959 ];
960
961 cases.iter().for_each(|((stride, source), expected)| {
962 let stride = stride.unwrap();
963 let stride1 = stride.num_nanoseconds().unwrap();
964 let source1 = string_to_timestamp_nanos(source).unwrap();
965
966 let expected1 = string_to_timestamp_nanos(expected).unwrap();
967 let result = date_bin_nanos_interval(stride1, source1, 0);
968 assert_eq!(result, expected1, "{source} = {expected}");
969 })
970 }
971}