datafusion/test_util/
parquet.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Helpers for writing parquet files and reading them back
19
20use std::fs::File;
21use std::path::PathBuf;
22use std::sync::Arc;
23
24use crate::arrow::{datatypes::SchemaRef, record_batch::RecordBatch};
25use crate::common::ToDFSchema;
26use crate::config::ConfigOptions;
27use crate::datasource::listing::{ListingTableUrl, PartitionedFile};
28use crate::datasource::object_store::ObjectStoreUrl;
29use crate::datasource::physical_plan::ParquetSource;
30use crate::error::Result;
31use crate::logical_expr::execution_props::ExecutionProps;
32use crate::logical_expr::simplify::SimplifyContext;
33use crate::optimizer::simplify_expressions::ExprSimplifier;
34use crate::physical_expr::create_physical_expr;
35use crate::physical_plan::filter::FilterExec;
36use crate::physical_plan::metrics::MetricsSet;
37use crate::physical_plan::ExecutionPlan;
38use crate::prelude::{Expr, SessionConfig, SessionContext};
39
40use datafusion_datasource::file::FileSource;
41use datafusion_datasource::file_scan_config::FileScanConfigBuilder;
42use datafusion_datasource::source::DataSourceExec;
43use datafusion_datasource::TableSchema;
44use object_store::path::Path;
45use object_store::ObjectMeta;
46use parquet::arrow::ArrowWriter;
47use parquet::file::properties::WriterProperties;
48
49///  a ParquetFile that has been created for testing.
50pub struct TestParquetFile {
51    path: PathBuf,
52    schema: SchemaRef,
53    object_store_url: ObjectStoreUrl,
54    object_meta: ObjectMeta,
55}
56
57#[derive(Debug, Clone, Copy)]
58/// Options for how to create the parquet scan
59pub struct ParquetScanOptions {
60    /// Enable pushdown filters
61    pub pushdown_filters: bool,
62    /// enable reordering filters
63    pub reorder_filters: bool,
64    /// enable page index
65    pub enable_page_index: bool,
66}
67
68impl ParquetScanOptions {
69    /// Returns a [`SessionConfig`] with the given options
70    pub fn config(&self) -> SessionConfig {
71        let mut config = ConfigOptions::new();
72        config.execution.parquet.pushdown_filters = self.pushdown_filters;
73        config.execution.parquet.reorder_filters = self.reorder_filters;
74        config.execution.parquet.enable_page_index = self.enable_page_index;
75        config.into()
76    }
77}
78
79impl TestParquetFile {
80    /// Creates a new parquet file at the specified location with the
81    /// given properties
82    pub fn try_new(
83        path: PathBuf,
84        props: WriterProperties,
85        batches: impl IntoIterator<Item = RecordBatch>,
86    ) -> Result<Self> {
87        let file = File::create(&path)?;
88
89        let mut batches = batches.into_iter();
90        let first_batch = batches.next().expect("need at least one record batch");
91        let schema = first_batch.schema();
92
93        let mut writer = ArrowWriter::try_new(file, Arc::clone(&schema), Some(props))?;
94
95        writer.write(&first_batch)?;
96        let mut num_rows = first_batch.num_rows();
97
98        for batch in batches {
99            writer.write(&batch)?;
100            num_rows += batch.num_rows();
101        }
102        writer.close()?;
103
104        println!("Generated test dataset with {num_rows} rows");
105
106        let size = std::fs::metadata(&path)?.len();
107
108        let mut canonical_path = path.canonicalize()?;
109
110        if cfg!(target_os = "windows") {
111            canonical_path = canonical_path
112                .to_str()
113                .unwrap()
114                .replace("\\", "/")
115                .strip_prefix("//?/")
116                .unwrap()
117                .into();
118        };
119
120        let object_store_url =
121            ListingTableUrl::parse(canonical_path.to_str().unwrap_or_default())?
122                .object_store();
123
124        let object_meta = ObjectMeta {
125            location: Path::parse(canonical_path.to_str().unwrap_or_default())?,
126            last_modified: Default::default(),
127            size,
128            e_tag: None,
129            version: None,
130        };
131
132        Ok(Self {
133            path,
134            schema,
135            object_store_url,
136            object_meta,
137        })
138    }
139}
140
141impl TestParquetFile {
142    /// Return a `DataSourceExec` with the specified options.
143    ///
144    /// If `maybe_filter` is non-None, the DataSourceExec will be filtered using
145    /// the given expression, and this method will return the same plan that DataFusion
146    /// will make with a pushed down predicate followed by a filter:
147    ///
148    /// ```text
149    /// (FilterExec)
150    ///   (DataSourceExec)
151    /// ```
152    ///
153    /// Otherwise if `maybe_filter` is None, return just a `DataSourceExec`
154    pub async fn create_scan(
155        &self,
156        ctx: &SessionContext,
157        maybe_filter: Option<Expr>,
158    ) -> Result<Arc<dyn ExecutionPlan>> {
159        let parquet_options = ctx.copied_table_options().parquet;
160        let source = Arc::new(ParquetSource::new(parquet_options.clone()));
161        let scan_config_builder = FileScanConfigBuilder::new(
162            self.object_store_url.clone(),
163            Arc::clone(&self.schema),
164            source,
165        )
166        .with_file(PartitionedFile {
167            object_meta: self.object_meta.clone(),
168            partition_values: vec![],
169            range: None,
170            statistics: None,
171            extensions: None,
172            metadata_size_hint: None,
173        });
174
175        let df_schema = Arc::clone(&self.schema).to_dfschema_ref()?;
176
177        // run coercion on the filters to coerce types etc.
178        let props = ExecutionProps::new();
179        let context = SimplifyContext::new(&props).with_schema(Arc::clone(&df_schema));
180        if let Some(filter) = maybe_filter {
181            let simplifier = ExprSimplifier::new(context);
182            let filter = simplifier.coerce(filter, &df_schema).unwrap();
183            let physical_filter_expr =
184                create_physical_expr(&filter, &df_schema, &ExecutionProps::default())?;
185
186            let source = Arc::new(
187                ParquetSource::new(parquet_options)
188                    .with_predicate(Arc::clone(&physical_filter_expr)),
189            )
190            .with_schema(TableSchema::from_file_schema(Arc::clone(&self.schema)));
191            let config = scan_config_builder.with_source(source).build();
192            let parquet_exec = DataSourceExec::from_data_source(config);
193
194            let exec = Arc::new(FilterExec::try_new(physical_filter_expr, parquet_exec)?);
195            Ok(exec)
196        } else {
197            let config = scan_config_builder.build();
198            Ok(DataSourceExec::from_data_source(config))
199        }
200    }
201
202    /// Retrieve metrics from the parquet exec returned from `create_scan`
203    ///
204    /// Recursively searches for DataSourceExec and returns the metrics
205    /// on the first one it finds
206    pub fn parquet_metrics(plan: &Arc<dyn ExecutionPlan>) -> Option<MetricsSet> {
207        if let Some(data_source_exec) = plan.as_any().downcast_ref::<DataSourceExec>() {
208            if data_source_exec
209                .downcast_to_file_source::<ParquetSource>()
210                .is_some()
211            {
212                return data_source_exec.metrics();
213            }
214        }
215
216        for child in plan.children() {
217            if let Some(metrics) = Self::parquet_metrics(child) {
218                return Some(metrics);
219            }
220        }
221        None
222    }
223
224    /// The schema of this parquet file
225    pub fn schema(&self) -> SchemaRef {
226        Arc::clone(&self.schema)
227    }
228
229    /// The path to the parquet file
230    pub fn path(&self) -> &std::path::Path {
231        self.path.as_path()
232    }
233}