datafusion_datasource_parquet/
writer.rs1use datafusion_common::DataFusionError;
19use datafusion_common_runtime::JoinSet;
20use datafusion_datasource::ListingTableUrl;
21use datafusion_execution::TaskContext;
22use datafusion_physical_plan::{ExecutionPlan, ExecutionPlanProperties};
23use futures::StreamExt;
24use object_store::buffered::BufWriter;
25use object_store::path::Path;
26use parquet::arrow::AsyncArrowWriter;
27use parquet::file::properties::WriterProperties;
28use std::sync::Arc;
29
30pub async fn plan_to_parquet(
32 task_ctx: Arc<TaskContext>,
33 plan: Arc<dyn ExecutionPlan>,
34 path: impl AsRef<str>,
35 writer_properties: Option<WriterProperties>,
36) -> datafusion_common::Result<()> {
37 let path = path.as_ref();
38 let parsed = ListingTableUrl::parse(path)?;
39 let object_store_url = parsed.object_store();
40 let store = task_ctx.runtime_env().object_store(&object_store_url)?;
41 let mut join_set = JoinSet::new();
42 for i in 0..plan.output_partitioning().partition_count() {
43 let plan: Arc<dyn ExecutionPlan> = Arc::clone(&plan);
44 let filename = format!("{}/part-{i}.parquet", parsed.prefix());
45 let file = Path::parse(filename)?;
46 let propclone = writer_properties.clone();
47
48 let storeref = Arc::clone(&store);
49 let buf_writer = BufWriter::with_capacity(
50 storeref,
51 file.clone(),
52 task_ctx
53 .session_config()
54 .options()
55 .execution
56 .objectstore_writer_buffer_size,
57 );
58 let mut stream = plan.execute(i, Arc::clone(&task_ctx))?;
59 join_set.spawn(async move {
60 let mut writer =
61 AsyncArrowWriter::try_new(buf_writer, plan.schema(), propclone)?;
62 while let Some(next_batch) = stream.next().await {
63 let batch = next_batch?;
64 writer.write(&batch).await?;
65 }
66 writer
67 .close()
68 .await
69 .map_err(DataFusionError::from)
70 .map(|_| ())
71 });
72 }
73
74 while let Some(result) = join_set.join_next().await {
75 match result {
76 Ok(res) => res?,
77 Err(e) => {
78 if e.is_panic() {
79 std::panic::resume_unwind(e.into_panic());
80 } else {
81 unreachable!();
82 }
83 }
84 }
85 }
86
87 Ok(())
88}