datafusion/test/
object_store.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Object store implementation used for testing
19
20use crate::{
21    execution::{context::SessionState, session_state::SessionStateBuilder},
22    object_store::{
23        memory::InMemory, path::Path, Error, GetOptions, GetResult, ListResult,
24        MultipartUpload, ObjectMeta, ObjectStore, PutMultipartOptions, PutOptions,
25        PutPayload, PutResult,
26    },
27    prelude::SessionContext,
28};
29use futures::{stream::BoxStream, FutureExt};
30use std::{
31    fmt::{Debug, Display, Formatter},
32    sync::Arc,
33};
34use tokio::{
35    sync::Barrier,
36    time::{timeout, Duration},
37};
38use url::Url;
39
40/// Registers a test object store with the provided `ctx`
41pub fn register_test_store(ctx: &SessionContext, files: &[(&str, u64)]) {
42    let url = Url::parse("test://").unwrap();
43    ctx.register_object_store(&url, make_test_store_and_state(files).0);
44}
45
46/// Create a test object store with the provided files
47pub fn make_test_store_and_state(files: &[(&str, u64)]) -> (Arc<InMemory>, SessionState) {
48    let memory = InMemory::new();
49
50    for (name, size) in files {
51        memory
52            .put(&Path::from(*name), vec![0; *size as usize].into())
53            .now_or_never()
54            .unwrap()
55            .unwrap();
56    }
57
58    (
59        Arc::new(memory),
60        SessionStateBuilder::new().with_default_features().build(),
61    )
62}
63
64/// Helper method to fetch the file size and date at given path and create a `ObjectMeta`
65pub fn local_unpartitioned_file(path: impl AsRef<std::path::Path>) -> ObjectMeta {
66    let location = Path::from_filesystem_path(path.as_ref()).unwrap();
67    let metadata = std::fs::metadata(path).expect("Local file metadata");
68    ObjectMeta {
69        location,
70        last_modified: metadata.modified().map(chrono::DateTime::from).unwrap(),
71        size: metadata.len(),
72        e_tag: None,
73        version: None,
74    }
75}
76
77/// Blocks the object_store `head` call until `concurrency` number of calls are pending.
78pub fn ensure_head_concurrency(
79    object_store: Arc<dyn ObjectStore>,
80    concurrency: usize,
81) -> Arc<dyn ObjectStore> {
82    Arc::new(BlockingObjectStore::new(object_store, concurrency))
83}
84
85/// An object store that “blocks” in its `head` call until an expected number of concurrent calls are reached.
86#[derive(Debug)]
87struct BlockingObjectStore {
88    inner: Arc<dyn ObjectStore>,
89    barrier: Arc<Barrier>,
90}
91
92impl BlockingObjectStore {
93    const NAME: &'static str = "BlockingObjectStore";
94    fn new(inner: Arc<dyn ObjectStore>, expected_concurrency: usize) -> Self {
95        Self {
96            inner,
97            barrier: Arc::new(Barrier::new(expected_concurrency)),
98        }
99    }
100}
101
102impl Display for BlockingObjectStore {
103    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
104        Display::fmt(&self.inner, f)
105    }
106}
107
108/// All trait methods are forwarded to the inner object store, except for
109/// the `head` method which waits until the expected number of concurrent calls is reached.
110#[async_trait::async_trait]
111impl ObjectStore for BlockingObjectStore {
112    async fn put_opts(
113        &self,
114        location: &Path,
115        payload: PutPayload,
116        opts: PutOptions,
117    ) -> object_store::Result<PutResult> {
118        self.inner.put_opts(location, payload, opts).await
119    }
120    async fn put_multipart_opts(
121        &self,
122        location: &Path,
123        opts: PutMultipartOptions,
124    ) -> object_store::Result<Box<dyn MultipartUpload>> {
125        self.inner.put_multipart_opts(location, opts).await
126    }
127
128    async fn get_opts(
129        &self,
130        location: &Path,
131        options: GetOptions,
132    ) -> object_store::Result<GetResult> {
133        self.inner.get_opts(location, options).await
134    }
135
136    async fn head(&self, location: &Path) -> object_store::Result<ObjectMeta> {
137        println!(
138            "{} received head call for {location}",
139            BlockingObjectStore::NAME
140        );
141        // Wait until the expected number of concurrent calls is reached, but timeout after 1 second to avoid hanging failing tests.
142        let wait_result = timeout(Duration::from_secs(1), self.barrier.wait()).await;
143        match wait_result {
144            Ok(_) => println!(
145                "{} barrier reached for {location}",
146                BlockingObjectStore::NAME
147            ),
148            Err(_) => {
149                let error_message = format!(
150                    "{} barrier wait timed out for {location}",
151                    BlockingObjectStore::NAME
152                );
153                log::error!("{error_message}");
154                return Err(Error::Generic {
155                    store: BlockingObjectStore::NAME,
156                    source: error_message.into(),
157                });
158            }
159        }
160        // Forward the call to the inner object store.
161        self.inner.head(location).await
162    }
163
164    async fn delete(&self, location: &Path) -> object_store::Result<()> {
165        self.inner.delete(location).await
166    }
167
168    fn list(
169        &self,
170        prefix: Option<&Path>,
171    ) -> BoxStream<'static, object_store::Result<ObjectMeta>> {
172        self.inner.list(prefix)
173    }
174
175    async fn list_with_delimiter(
176        &self,
177        prefix: Option<&Path>,
178    ) -> object_store::Result<ListResult> {
179        self.inner.list_with_delimiter(prefix).await
180    }
181
182    async fn copy(&self, from: &Path, to: &Path) -> object_store::Result<()> {
183        self.inner.copy(from, to).await
184    }
185
186    async fn copy_if_not_exists(
187        &self,
188        from: &Path,
189        to: &Path,
190    ) -> object_store::Result<()> {
191        self.inner.copy_if_not_exists(from, to).await
192    }
193}