datafusion/execution/context/csv.rs
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use datafusion_common::TableReference;
19use datafusion_datasource_csv::source::plan_to_csv;
20use std::sync::Arc;
21
22use super::super::options::{CsvReadOptions, ReadOptions};
23use super::{DataFilePaths, DataFrame, ExecutionPlan, Result, SessionContext};
24
25impl SessionContext {
26 /// Creates a [`DataFrame`] for reading a CSV data source.
27 ///
28 /// For more control such as reading multiple files, you can use
29 /// [`read_table`](Self::read_table) with a [`super::ListingTable`].
30 ///
31 /// Example usage is given below:
32 ///
33 /// ```
34 /// use datafusion::prelude::*;
35 /// # use datafusion::error::Result;
36 /// # #[tokio::main]
37 /// # async fn main() -> Result<()> {
38 /// let ctx = SessionContext::new();
39 /// // You can read a single file using `read_csv`
40 /// let df = ctx
41 /// .read_csv("tests/data/example.csv", CsvReadOptions::new())
42 /// .await?;
43 /// // you can also read multiple files:
44 /// let df = ctx
45 /// .read_csv(
46 /// vec!["tests/data/example.csv", "tests/data/example.csv"],
47 /// CsvReadOptions::new(),
48 /// )
49 /// .await?;
50 /// # Ok(())
51 /// # }
52 /// ```
53 pub async fn read_csv<P: DataFilePaths>(
54 &self,
55 table_paths: P,
56 options: CsvReadOptions<'_>,
57 ) -> Result<DataFrame> {
58 self._read_type(table_paths, options).await
59 }
60
61 /// Registers a CSV file as a table which can referenced from SQL
62 /// statements executed against this context.
63 pub async fn register_csv(
64 &self,
65 table_ref: impl Into<TableReference>,
66 table_path: impl AsRef<str>,
67 options: CsvReadOptions<'_>,
68 ) -> Result<()> {
69 let listing_options = options
70 .to_listing_options(&self.copied_config(), self.copied_table_options());
71
72 self.register_type_check(table_path.as_ref(), &listing_options.file_extension)?;
73
74 self.register_listing_table(
75 table_ref,
76 table_path,
77 listing_options,
78 options.schema.map(|s| Arc::new(s.to_owned())),
79 None,
80 )
81 .await?;
82
83 Ok(())
84 }
85
86 /// Executes a query and writes the results to a partitioned CSV file.
87 pub async fn write_csv(
88 &self,
89 plan: Arc<dyn ExecutionPlan>,
90 path: impl AsRef<str>,
91 ) -> Result<()> {
92 plan_to_csv(self.task_ctx(), plan, path).await
93 }
94}
95
96#[cfg(test)]
97mod tests {
98 use super::*;
99 use crate::test_util::{plan_and_collect, populate_csv_partitions};
100 use datafusion_common::test_util::batches_to_string;
101 use insta::assert_snapshot;
102
103 use tempfile::TempDir;
104
105 #[tokio::test]
106 async fn query_csv_with_custom_partition_extension() -> Result<()> {
107 let tmp_dir = TempDir::new()?;
108
109 // The main stipulation of this test: use a file extension that isn't .csv.
110 let file_extension = ".tst";
111
112 let ctx = SessionContext::new();
113 let schema = populate_csv_partitions(&tmp_dir, 2, file_extension)?;
114 ctx.register_csv(
115 "test",
116 tmp_dir.path().to_str().unwrap(),
117 CsvReadOptions::new()
118 .schema(&schema)
119 .file_extension(file_extension),
120 )
121 .await?;
122 let results =
123 plan_and_collect(&ctx, "SELECT sum(c1), sum(c2), count(*) FROM test").await?;
124
125 assert_eq!(results.len(), 1);
126 assert_snapshot!(batches_to_string(&results), @r"
127 +--------------+--------------+----------+
128 | sum(test.c1) | sum(test.c2) | count(*) |
129 +--------------+--------------+----------+
130 | 10 | 110 | 20 |
131 +--------------+--------------+----------+
132 ");
133
134 Ok(())
135 }
136}