datafusion/test_util/
parquet.rs1use std::fs::File;
21use std::path::PathBuf;
22use std::sync::Arc;
23
24use crate::arrow::{datatypes::SchemaRef, record_batch::RecordBatch};
25use crate::common::ToDFSchema;
26use crate::config::ConfigOptions;
27use crate::datasource::listing::{ListingTableUrl, PartitionedFile};
28use crate::datasource::object_store::ObjectStoreUrl;
29use crate::datasource::physical_plan::ParquetSource;
30use crate::error::Result;
31use crate::logical_expr::execution_props::ExecutionProps;
32use crate::logical_expr::simplify::SimplifyContext;
33use crate::optimizer::simplify_expressions::ExprSimplifier;
34use crate::physical_expr::create_physical_expr;
35use crate::physical_plan::filter::FilterExec;
36use crate::physical_plan::metrics::MetricsSet;
37use crate::physical_plan::ExecutionPlan;
38use crate::prelude::{Expr, SessionConfig, SessionContext};
39
40use datafusion_datasource::file::FileSource;
41use datafusion_datasource::file_scan_config::FileScanConfigBuilder;
42use datafusion_datasource::source::DataSourceExec;
43use datafusion_datasource::TableSchema;
44use object_store::path::Path;
45use object_store::ObjectMeta;
46use parquet::arrow::ArrowWriter;
47use parquet::file::properties::WriterProperties;
48
49pub struct TestParquetFile {
51 path: PathBuf,
52 schema: SchemaRef,
53 object_store_url: ObjectStoreUrl,
54 object_meta: ObjectMeta,
55}
56
57#[derive(Debug, Clone, Copy)]
58pub struct ParquetScanOptions {
60 pub pushdown_filters: bool,
62 pub reorder_filters: bool,
64 pub enable_page_index: bool,
66}
67
68impl ParquetScanOptions {
69 pub fn config(&self) -> SessionConfig {
71 let mut config = ConfigOptions::new();
72 config.execution.parquet.pushdown_filters = self.pushdown_filters;
73 config.execution.parquet.reorder_filters = self.reorder_filters;
74 config.execution.parquet.enable_page_index = self.enable_page_index;
75 config.into()
76 }
77}
78
79impl TestParquetFile {
80 pub fn try_new(
83 path: PathBuf,
84 props: WriterProperties,
85 batches: impl IntoIterator<Item = RecordBatch>,
86 ) -> Result<Self> {
87 let file = File::create(&path)?;
88
89 let mut batches = batches.into_iter();
90 let first_batch = batches.next().expect("need at least one record batch");
91 let schema = first_batch.schema();
92
93 let mut writer = ArrowWriter::try_new(file, Arc::clone(&schema), Some(props))?;
94
95 writer.write(&first_batch)?;
96 let mut num_rows = first_batch.num_rows();
97
98 for batch in batches {
99 writer.write(&batch)?;
100 num_rows += batch.num_rows();
101 }
102 writer.close()?;
103
104 println!("Generated test dataset with {num_rows} rows");
105
106 let size = std::fs::metadata(&path)?.len();
107
108 let mut canonical_path = path.canonicalize()?;
109
110 if cfg!(target_os = "windows") {
111 canonical_path = canonical_path
112 .to_str()
113 .unwrap()
114 .replace("\\", "/")
115 .strip_prefix("//?/")
116 .unwrap()
117 .into();
118 };
119
120 let object_store_url =
121 ListingTableUrl::parse(canonical_path.to_str().unwrap_or_default())?
122 .object_store();
123
124 let object_meta = ObjectMeta {
125 location: Path::parse(canonical_path.to_str().unwrap_or_default())?,
126 last_modified: Default::default(),
127 size,
128 e_tag: None,
129 version: None,
130 };
131
132 Ok(Self {
133 path,
134 schema,
135 object_store_url,
136 object_meta,
137 })
138 }
139}
140
141impl TestParquetFile {
142 pub async fn create_scan(
155 &self,
156 ctx: &SessionContext,
157 maybe_filter: Option<Expr>,
158 ) -> Result<Arc<dyn ExecutionPlan>> {
159 let parquet_options = ctx.copied_table_options().parquet;
160 let source = Arc::new(ParquetSource::new(parquet_options.clone()));
161 let scan_config_builder = FileScanConfigBuilder::new(
162 self.object_store_url.clone(),
163 Arc::clone(&self.schema),
164 source,
165 )
166 .with_file(PartitionedFile {
167 object_meta: self.object_meta.clone(),
168 partition_values: vec![],
169 range: None,
170 statistics: None,
171 extensions: None,
172 metadata_size_hint: None,
173 });
174
175 let df_schema = Arc::clone(&self.schema).to_dfschema_ref()?;
176
177 let props = ExecutionProps::new();
179 let context = SimplifyContext::new(&props).with_schema(Arc::clone(&df_schema));
180 if let Some(filter) = maybe_filter {
181 let simplifier = ExprSimplifier::new(context);
182 let filter = simplifier.coerce(filter, &df_schema).unwrap();
183 let physical_filter_expr =
184 create_physical_expr(&filter, &df_schema, &ExecutionProps::default())?;
185
186 let source = Arc::new(
187 ParquetSource::new(parquet_options)
188 .with_predicate(Arc::clone(&physical_filter_expr)),
189 )
190 .with_schema(TableSchema::from_file_schema(Arc::clone(&self.schema)));
191 let config = scan_config_builder.with_source(source).build();
192 let parquet_exec = DataSourceExec::from_data_source(config);
193
194 let exec = Arc::new(FilterExec::try_new(physical_filter_expr, parquet_exec)?);
195 Ok(exec)
196 } else {
197 let config = scan_config_builder.build();
198 Ok(DataSourceExec::from_data_source(config))
199 }
200 }
201
202 pub fn parquet_metrics(plan: &Arc<dyn ExecutionPlan>) -> Option<MetricsSet> {
207 if let Some(data_source_exec) = plan.as_any().downcast_ref::<DataSourceExec>() {
208 if data_source_exec
209 .downcast_to_file_source::<ParquetSource>()
210 .is_some()
211 {
212 return data_source_exec.metrics();
213 }
214 }
215
216 for child in plan.children() {
217 if let Some(metrics) = Self::parquet_metrics(child) {
218 return Some(metrics);
219 }
220 }
221 None
222 }
223
224 pub fn schema(&self) -> SchemaRef {
226 Arc::clone(&self.schema)
227 }
228
229 pub fn path(&self) -> &std::path::Path {
231 self.path.as_path()
232 }
233}