datafusion_optimizer/optimize_projections/
required_indices.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! [`RequiredIndices`] helper for OptimizeProjection
19
20use crate::optimize_projections::outer_columns;
21use datafusion_common::tree_node::TreeNodeRecursion;
22use datafusion_common::{Column, DFSchemaRef, Result};
23use datafusion_expr::{Expr, LogicalPlan};
24
25/// Represents columns in a schema which are required (used) by a plan node
26///
27/// Also carries a flag indicating if putting a projection above children is
28/// beneficial for the parent. For example `LogicalPlan::Filter` benefits from
29/// small tables. Hence for filter child this flag would be `true`. Defaults to
30/// `false`
31///
32/// # Invariant
33///
34/// Indices are always in order and without duplicates. For example, if these
35/// indices were added `[3, 2, 4, 3, 6, 1]`,  the instance would be represented
36/// by  `[1, 2, 3, 4, 6]`.
37#[derive(Debug, Clone, Default)]
38pub(super) struct RequiredIndices {
39    /// The indices of the required columns in the
40    indices: Vec<usize>,
41    /// If putting a projection above children is beneficial for the parent.
42    /// Defaults to false.
43    projection_beneficial: bool,
44}
45
46impl RequiredIndices {
47    /// Create a new, empty instance
48    pub fn new() -> Self {
49        Self::default()
50    }
51
52    /// Create a new instance that requires all columns from the specified plan
53    pub fn new_for_all_exprs(plan: &LogicalPlan) -> Self {
54        Self {
55            indices: (0..plan.schema().fields().len()).collect(),
56            projection_beneficial: false,
57        }
58    }
59
60    /// Create a new instance with the specified indices as required
61    pub fn new_from_indices(indices: Vec<usize>) -> Self {
62        Self {
63            indices,
64            projection_beneficial: false,
65        }
66        .compact()
67    }
68
69    /// Convert the instance to its inner indices
70    pub fn into_inner(self) -> Vec<usize> {
71        self.indices
72    }
73
74    /// Set the projection beneficial flag
75    pub fn with_projection_beneficial(mut self) -> Self {
76        self.projection_beneficial = true;
77        self
78    }
79
80    /// Return the value of projection beneficial flag
81    pub fn projection_beneficial(&self) -> bool {
82        self.projection_beneficial
83    }
84
85    /// Return a reference to the underlying indices
86    pub fn indices(&self) -> &[usize] {
87        &self.indices
88    }
89
90    /// Add required indices for all `exprs` used in plan
91    pub fn with_plan_exprs(
92        mut self,
93        plan: &LogicalPlan,
94        schema: &DFSchemaRef,
95    ) -> Result<Self> {
96        // Add indices of the child fields referred to by the expressions in the
97        // parent
98        plan.apply_expressions(|e| {
99            self.add_expr(schema, e);
100            Ok(TreeNodeRecursion::Continue)
101        })?;
102        Ok(self.compact())
103    }
104
105    /// Adds the indices of the fields referred to by the given expression
106    /// `expr` within the given schema (`input_schema`).
107    ///
108    /// Self is NOT compacted (and thus this method is not pub)
109    ///
110    /// # Parameters
111    ///
112    /// * `input_schema`: The input schema to analyze for index requirements.
113    /// * `expr`: An expression for which we want to find necessary field indices.
114    fn add_expr(&mut self, input_schema: &DFSchemaRef, expr: &Expr) {
115        // TODO could remove these clones (and visit the expression directly)
116        let mut cols = expr.column_refs();
117        // Get outer-referenced (subquery) columns:
118        outer_columns(expr, &mut cols);
119        self.indices.reserve(cols.len());
120        for col in cols {
121            if let Some(idx) = input_schema.maybe_index_of_column(col) {
122                self.indices.push(idx);
123            }
124        }
125    }
126
127    /// Adds the indices of the fields referred to by the given expressions
128    /// `within the given schema.
129    ///
130    /// # Parameters
131    ///
132    /// * `input_schema`: The input schema to analyze for index requirements.
133    /// * `exprs`: the expressions for which we want to find field indices.
134    pub fn with_exprs<'a>(
135        self,
136        schema: &DFSchemaRef,
137        exprs: impl IntoIterator<Item = &'a Expr>,
138    ) -> Self {
139        exprs
140            .into_iter()
141            .fold(self, |mut acc, expr| {
142                acc.add_expr(schema, expr);
143                acc
144            })
145            .compact()
146    }
147
148    /// Adds all `indices` into this instance.
149    pub fn append(mut self, indices: &[usize]) -> Self {
150        self.indices.extend_from_slice(indices);
151        self.compact()
152    }
153
154    /// Splits this instance into a tuple with two instances:
155    /// * The first `n` indices
156    /// * The remaining indices, adjusted down by n
157    pub fn split_off(self, n: usize) -> (Self, Self) {
158        let (l, r) = self.partition(|idx| idx < n);
159        (l, r.map_indices(|idx| idx - n))
160    }
161
162    /// Partitions the indices in this instance into two groups based on the
163    /// given predicate function `f`.
164    fn partition<F>(&self, f: F) -> (Self, Self)
165    where
166        F: Fn(usize) -> bool,
167    {
168        let (l, r): (Vec<usize>, Vec<usize>) =
169            self.indices.iter().partition(|&&idx| f(idx));
170        let projection_beneficial = self.projection_beneficial;
171
172        (
173            Self {
174                indices: l,
175                projection_beneficial,
176            },
177            Self {
178                indices: r,
179                projection_beneficial,
180            },
181        )
182    }
183
184    /// Map the indices in this instance to a new set of indices based on the
185    /// given function `f`, returning the mapped indices
186    ///
187    /// Not `pub` as it might not preserve the invariant of compacted indices
188    fn map_indices<F>(mut self, f: F) -> Self
189    where
190        F: Fn(usize) -> usize,
191    {
192        self.indices.iter_mut().for_each(|idx| *idx = f(*idx));
193        self
194    }
195
196    /// Apply the given function `f` to each index in this instance, returning
197    /// the mapped indices
198    pub fn into_mapped_indices<F>(self, f: F) -> Vec<usize>
199    where
200        F: Fn(usize) -> usize,
201    {
202        self.map_indices(f).into_inner()
203    }
204
205    /// Returns the `Expr`s from `exprs` that are at the indices in this instance
206    pub fn get_at_indices(&self, exprs: &[Expr]) -> Vec<Expr> {
207        self.indices.iter().map(|&idx| exprs[idx].clone()).collect()
208    }
209
210    /// Generates the required expressions (columns) that reside at `indices` of
211    /// the given `input_schema`.
212    pub fn get_required_exprs(&self, input_schema: &DFSchemaRef) -> Vec<Expr> {
213        self.indices
214            .iter()
215            .map(|&idx| Expr::from(Column::from(input_schema.qualified_field(idx))))
216            .collect()
217    }
218
219    /// Compacts the indices of this instance so they are sorted
220    /// (ascending) and deduplicated.
221    fn compact(mut self) -> Self {
222        self.indices.sort_unstable();
223        self.indices.dedup();
224        self
225    }
226}