datafusion_optimizer/optimize_projections/required_indices.rs
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! [`RequiredIndices`] helper for OptimizeProjection
19
20use crate::optimize_projections::outer_columns;
21use datafusion_common::tree_node::TreeNodeRecursion;
22use datafusion_common::{Column, DFSchemaRef, Result};
23use datafusion_expr::{Expr, LogicalPlan};
24
25/// Represents columns in a schema which are required (used) by a plan node
26///
27/// Also carries a flag indicating if putting a projection above children is
28/// beneficial for the parent. For example `LogicalPlan::Filter` benefits from
29/// small tables. Hence for filter child this flag would be `true`. Defaults to
30/// `false`
31///
32/// # Invariant
33///
34/// Indices are always in order and without duplicates. For example, if these
35/// indices were added `[3, 2, 4, 3, 6, 1]`, the instance would be represented
36/// by `[1, 2, 3, 4, 6]`.
37#[derive(Debug, Clone, Default)]
38pub(super) struct RequiredIndices {
39 /// The indices of the required columns in the
40 indices: Vec<usize>,
41 /// If putting a projection above children is beneficial for the parent.
42 /// Defaults to false.
43 projection_beneficial: bool,
44}
45
46impl RequiredIndices {
47 /// Create a new, empty instance
48 pub fn new() -> Self {
49 Self::default()
50 }
51
52 /// Create a new instance that requires all columns from the specified plan
53 pub fn new_for_all_exprs(plan: &LogicalPlan) -> Self {
54 Self {
55 indices: (0..plan.schema().fields().len()).collect(),
56 projection_beneficial: false,
57 }
58 }
59
60 /// Create a new instance with the specified indices as required
61 pub fn new_from_indices(indices: Vec<usize>) -> Self {
62 Self {
63 indices,
64 projection_beneficial: false,
65 }
66 .compact()
67 }
68
69 /// Convert the instance to its inner indices
70 pub fn into_inner(self) -> Vec<usize> {
71 self.indices
72 }
73
74 /// Set the projection beneficial flag
75 pub fn with_projection_beneficial(mut self) -> Self {
76 self.projection_beneficial = true;
77 self
78 }
79
80 /// Return the value of projection beneficial flag
81 pub fn projection_beneficial(&self) -> bool {
82 self.projection_beneficial
83 }
84
85 /// Return a reference to the underlying indices
86 pub fn indices(&self) -> &[usize] {
87 &self.indices
88 }
89
90 /// Add required indices for all `exprs` used in plan
91 pub fn with_plan_exprs(
92 mut self,
93 plan: &LogicalPlan,
94 schema: &DFSchemaRef,
95 ) -> Result<Self> {
96 // Add indices of the child fields referred to by the expressions in the
97 // parent
98 plan.apply_expressions(|e| {
99 self.add_expr(schema, e);
100 Ok(TreeNodeRecursion::Continue)
101 })?;
102 Ok(self.compact())
103 }
104
105 /// Adds the indices of the fields referred to by the given expression
106 /// `expr` within the given schema (`input_schema`).
107 ///
108 /// Self is NOT compacted (and thus this method is not pub)
109 ///
110 /// # Parameters
111 ///
112 /// * `input_schema`: The input schema to analyze for index requirements.
113 /// * `expr`: An expression for which we want to find necessary field indices.
114 fn add_expr(&mut self, input_schema: &DFSchemaRef, expr: &Expr) {
115 // TODO could remove these clones (and visit the expression directly)
116 let mut cols = expr.column_refs();
117 // Get outer-referenced (subquery) columns:
118 outer_columns(expr, &mut cols);
119 self.indices.reserve(cols.len());
120 for col in cols {
121 if let Some(idx) = input_schema.maybe_index_of_column(col) {
122 self.indices.push(idx);
123 }
124 }
125 }
126
127 /// Adds the indices of the fields referred to by the given expressions
128 /// `within the given schema.
129 ///
130 /// # Parameters
131 ///
132 /// * `input_schema`: The input schema to analyze for index requirements.
133 /// * `exprs`: the expressions for which we want to find field indices.
134 pub fn with_exprs<'a>(
135 self,
136 schema: &DFSchemaRef,
137 exprs: impl IntoIterator<Item = &'a Expr>,
138 ) -> Self {
139 exprs
140 .into_iter()
141 .fold(self, |mut acc, expr| {
142 acc.add_expr(schema, expr);
143 acc
144 })
145 .compact()
146 }
147
148 /// Adds all `indices` into this instance.
149 pub fn append(mut self, indices: &[usize]) -> Self {
150 self.indices.extend_from_slice(indices);
151 self.compact()
152 }
153
154 /// Splits this instance into a tuple with two instances:
155 /// * The first `n` indices
156 /// * The remaining indices, adjusted down by n
157 pub fn split_off(self, n: usize) -> (Self, Self) {
158 let (l, r) = self.partition(|idx| idx < n);
159 (l, r.map_indices(|idx| idx - n))
160 }
161
162 /// Partitions the indices in this instance into two groups based on the
163 /// given predicate function `f`.
164 fn partition<F>(&self, f: F) -> (Self, Self)
165 where
166 F: Fn(usize) -> bool,
167 {
168 let (l, r): (Vec<usize>, Vec<usize>) =
169 self.indices.iter().partition(|&&idx| f(idx));
170 let projection_beneficial = self.projection_beneficial;
171
172 (
173 Self {
174 indices: l,
175 projection_beneficial,
176 },
177 Self {
178 indices: r,
179 projection_beneficial,
180 },
181 )
182 }
183
184 /// Map the indices in this instance to a new set of indices based on the
185 /// given function `f`, returning the mapped indices
186 ///
187 /// Not `pub` as it might not preserve the invariant of compacted indices
188 fn map_indices<F>(mut self, f: F) -> Self
189 where
190 F: Fn(usize) -> usize,
191 {
192 self.indices.iter_mut().for_each(|idx| *idx = f(*idx));
193 self
194 }
195
196 /// Apply the given function `f` to each index in this instance, returning
197 /// the mapped indices
198 pub fn into_mapped_indices<F>(self, f: F) -> Vec<usize>
199 where
200 F: Fn(usize) -> usize,
201 {
202 self.map_indices(f).into_inner()
203 }
204
205 /// Returns the `Expr`s from `exprs` that are at the indices in this instance
206 pub fn get_at_indices(&self, exprs: &[Expr]) -> Vec<Expr> {
207 self.indices.iter().map(|&idx| exprs[idx].clone()).collect()
208 }
209
210 /// Generates the required expressions (columns) that reside at `indices` of
211 /// the given `input_schema`.
212 pub fn get_required_exprs(&self, input_schema: &DFSchemaRef) -> Vec<Expr> {
213 self.indices
214 .iter()
215 .map(|&idx| Expr::from(Column::from(input_schema.qualified_field(idx))))
216 .collect()
217 }
218
219 /// Compacts the indices of this instance so they are sorted
220 /// (ascending) and deduplicated.
221 fn compact(mut self) -> Self {
222 self.indices.sort_unstable();
223 self.indices.dedup();
224 self
225 }
226}