datafusion_datasource_parquet/
writer.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use datafusion_common::DataFusionError;
19use datafusion_common_runtime::JoinSet;
20use datafusion_datasource::ListingTableUrl;
21use datafusion_execution::TaskContext;
22use datafusion_physical_plan::{ExecutionPlan, ExecutionPlanProperties};
23use futures::StreamExt;
24use object_store::buffered::BufWriter;
25use object_store::path::Path;
26use parquet::arrow::AsyncArrowWriter;
27use parquet::file::properties::WriterProperties;
28use std::sync::Arc;
29
30/// Executes a query and writes the results to a partitioned Parquet file.
31pub async fn plan_to_parquet(
32    task_ctx: Arc<TaskContext>,
33    plan: Arc<dyn ExecutionPlan>,
34    path: impl AsRef<str>,
35    writer_properties: Option<WriterProperties>,
36) -> datafusion_common::Result<()> {
37    let path = path.as_ref();
38    let parsed = ListingTableUrl::parse(path)?;
39    let object_store_url = parsed.object_store();
40    let store = task_ctx.runtime_env().object_store(&object_store_url)?;
41    let mut join_set = JoinSet::new();
42    for i in 0..plan.output_partitioning().partition_count() {
43        let plan: Arc<dyn ExecutionPlan> = Arc::clone(&plan);
44        let filename = format!("{}/part-{i}.parquet", parsed.prefix());
45        let file = Path::parse(filename)?;
46        let propclone = writer_properties.clone();
47
48        let storeref = Arc::clone(&store);
49        let buf_writer = BufWriter::with_capacity(
50            storeref,
51            file.clone(),
52            task_ctx
53                .session_config()
54                .options()
55                .execution
56                .objectstore_writer_buffer_size,
57        );
58        let mut stream = plan.execute(i, Arc::clone(&task_ctx))?;
59        join_set.spawn(async move {
60            let mut writer =
61                AsyncArrowWriter::try_new(buf_writer, plan.schema(), propclone)?;
62            while let Some(next_batch) = stream.next().await {
63                let batch = next_batch?;
64                writer.write(&batch).await?;
65            }
66            writer
67                .close()
68                .await
69                .map_err(DataFusionError::from)
70                .map(|_| ())
71        });
72    }
73
74    while let Some(result) = join_set.join_next().await {
75        match result {
76            Ok(res) => res?,
77            Err(e) => {
78                if e.is_panic() {
79                    std::panic::resume_unwind(e.into_panic());
80                } else {
81                    unreachable!();
82                }
83            }
84        }
85    }
86
87    Ok(())
88}