datafusion/execution/context/
csv.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use datafusion_common::TableReference;
19use datafusion_datasource_csv::source::plan_to_csv;
20use std::sync::Arc;
21
22use super::super::options::{CsvReadOptions, ReadOptions};
23use super::{DataFilePaths, DataFrame, ExecutionPlan, Result, SessionContext};
24
25impl SessionContext {
26    /// Creates a [`DataFrame`] for reading a CSV data source.
27    ///
28    /// For more control such as reading multiple files, you can use
29    /// [`read_table`](Self::read_table) with a [`super::ListingTable`].
30    ///
31    /// Example usage is given below:
32    ///
33    /// ```
34    /// use datafusion::prelude::*;
35    /// # use datafusion::error::Result;
36    /// # #[tokio::main]
37    /// # async fn main() -> Result<()> {
38    /// let ctx = SessionContext::new();
39    /// // You can read a single file using `read_csv`
40    /// let df = ctx
41    ///     .read_csv("tests/data/example.csv", CsvReadOptions::new())
42    ///     .await?;
43    /// // you can also read multiple files:
44    /// let df = ctx
45    ///     .read_csv(
46    ///         vec!["tests/data/example.csv", "tests/data/example.csv"],
47    ///         CsvReadOptions::new(),
48    ///     )
49    ///     .await?;
50    /// # Ok(())
51    /// # }
52    /// ```
53    pub async fn read_csv<P: DataFilePaths>(
54        &self,
55        table_paths: P,
56        options: CsvReadOptions<'_>,
57    ) -> Result<DataFrame> {
58        self._read_type(table_paths, options).await
59    }
60
61    /// Registers a CSV file as a table which can referenced from SQL
62    /// statements executed against this context.
63    pub async fn register_csv(
64        &self,
65        table_ref: impl Into<TableReference>,
66        table_path: impl AsRef<str>,
67        options: CsvReadOptions<'_>,
68    ) -> Result<()> {
69        let listing_options = options
70            .to_listing_options(&self.copied_config(), self.copied_table_options());
71
72        self.register_type_check(table_path.as_ref(), &listing_options.file_extension)?;
73
74        self.register_listing_table(
75            table_ref,
76            table_path,
77            listing_options,
78            options.schema.map(|s| Arc::new(s.to_owned())),
79            None,
80        )
81        .await?;
82
83        Ok(())
84    }
85
86    /// Executes a query and writes the results to a partitioned CSV file.
87    pub async fn write_csv(
88        &self,
89        plan: Arc<dyn ExecutionPlan>,
90        path: impl AsRef<str>,
91    ) -> Result<()> {
92        plan_to_csv(self.task_ctx(), plan, path).await
93    }
94}
95
96#[cfg(test)]
97mod tests {
98    use super::*;
99    use crate::test_util::{plan_and_collect, populate_csv_partitions};
100    use datafusion_common::test_util::batches_to_string;
101    use insta::assert_snapshot;
102
103    use tempfile::TempDir;
104
105    #[tokio::test]
106    async fn query_csv_with_custom_partition_extension() -> Result<()> {
107        let tmp_dir = TempDir::new()?;
108
109        // The main stipulation of this test: use a file extension that isn't .csv.
110        let file_extension = ".tst";
111
112        let ctx = SessionContext::new();
113        let schema = populate_csv_partitions(&tmp_dir, 2, file_extension)?;
114        ctx.register_csv(
115            "test",
116            tmp_dir.path().to_str().unwrap(),
117            CsvReadOptions::new()
118                .schema(&schema)
119                .file_extension(file_extension),
120        )
121        .await?;
122        let results =
123            plan_and_collect(&ctx, "SELECT sum(c1), sum(c2), count(*) FROM test").await?;
124
125        assert_eq!(results.len(), 1);
126        assert_snapshot!(batches_to_string(&results), @r"
127        +--------------+--------------+----------+
128        | sum(test.c1) | sum(test.c2) | count(*) |
129        +--------------+--------------+----------+
130        | 10           | 110          | 20       |
131        +--------------+--------------+----------+
132        ");
133
134        Ok(())
135    }
136}