datafusion/test/
object_store.rs1use crate::{
21 execution::{context::SessionState, session_state::SessionStateBuilder},
22 object_store::{
23 memory::InMemory, path::Path, Error, GetOptions, GetResult, ListResult,
24 MultipartUpload, ObjectMeta, ObjectStore, PutMultipartOptions, PutOptions,
25 PutPayload, PutResult,
26 },
27 prelude::SessionContext,
28};
29use futures::{stream::BoxStream, FutureExt};
30use std::{
31 fmt::{Debug, Display, Formatter},
32 sync::Arc,
33};
34use tokio::{
35 sync::Barrier,
36 time::{timeout, Duration},
37};
38use url::Url;
39
40pub fn register_test_store(ctx: &SessionContext, files: &[(&str, u64)]) {
42 let url = Url::parse("test://").unwrap();
43 ctx.register_object_store(&url, make_test_store_and_state(files).0);
44}
45
46pub fn make_test_store_and_state(files: &[(&str, u64)]) -> (Arc<InMemory>, SessionState) {
48 let memory = InMemory::new();
49
50 for (name, size) in files {
51 memory
52 .put(&Path::from(*name), vec![0; *size as usize].into())
53 .now_or_never()
54 .unwrap()
55 .unwrap();
56 }
57
58 (
59 Arc::new(memory),
60 SessionStateBuilder::new().with_default_features().build(),
61 )
62}
63
64pub fn local_unpartitioned_file(path: impl AsRef<std::path::Path>) -> ObjectMeta {
66 let location = Path::from_filesystem_path(path.as_ref()).unwrap();
67 let metadata = std::fs::metadata(path).expect("Local file metadata");
68 ObjectMeta {
69 location,
70 last_modified: metadata.modified().map(chrono::DateTime::from).unwrap(),
71 size: metadata.len(),
72 e_tag: None,
73 version: None,
74 }
75}
76
77pub fn ensure_head_concurrency(
79 object_store: Arc<dyn ObjectStore>,
80 concurrency: usize,
81) -> Arc<dyn ObjectStore> {
82 Arc::new(BlockingObjectStore::new(object_store, concurrency))
83}
84
85#[derive(Debug)]
87struct BlockingObjectStore {
88 inner: Arc<dyn ObjectStore>,
89 barrier: Arc<Barrier>,
90}
91
92impl BlockingObjectStore {
93 const NAME: &'static str = "BlockingObjectStore";
94 fn new(inner: Arc<dyn ObjectStore>, expected_concurrency: usize) -> Self {
95 Self {
96 inner,
97 barrier: Arc::new(Barrier::new(expected_concurrency)),
98 }
99 }
100}
101
102impl Display for BlockingObjectStore {
103 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
104 Display::fmt(&self.inner, f)
105 }
106}
107
108#[async_trait::async_trait]
111impl ObjectStore for BlockingObjectStore {
112 async fn put_opts(
113 &self,
114 location: &Path,
115 payload: PutPayload,
116 opts: PutOptions,
117 ) -> object_store::Result<PutResult> {
118 self.inner.put_opts(location, payload, opts).await
119 }
120 async fn put_multipart_opts(
121 &self,
122 location: &Path,
123 opts: PutMultipartOptions,
124 ) -> object_store::Result<Box<dyn MultipartUpload>> {
125 self.inner.put_multipart_opts(location, opts).await
126 }
127
128 async fn get_opts(
129 &self,
130 location: &Path,
131 options: GetOptions,
132 ) -> object_store::Result<GetResult> {
133 self.inner.get_opts(location, options).await
134 }
135
136 async fn head(&self, location: &Path) -> object_store::Result<ObjectMeta> {
137 println!(
138 "{} received head call for {location}",
139 BlockingObjectStore::NAME
140 );
141 let wait_result = timeout(Duration::from_secs(1), self.barrier.wait()).await;
143 match wait_result {
144 Ok(_) => println!(
145 "{} barrier reached for {location}",
146 BlockingObjectStore::NAME
147 ),
148 Err(_) => {
149 let error_message = format!(
150 "{} barrier wait timed out for {location}",
151 BlockingObjectStore::NAME
152 );
153 log::error!("{error_message}");
154 return Err(Error::Generic {
155 store: BlockingObjectStore::NAME,
156 source: error_message.into(),
157 });
158 }
159 }
160 self.inner.head(location).await
162 }
163
164 async fn delete(&self, location: &Path) -> object_store::Result<()> {
165 self.inner.delete(location).await
166 }
167
168 fn list(
169 &self,
170 prefix: Option<&Path>,
171 ) -> BoxStream<'static, object_store::Result<ObjectMeta>> {
172 self.inner.list(prefix)
173 }
174
175 async fn list_with_delimiter(
176 &self,
177 prefix: Option<&Path>,
178 ) -> object_store::Result<ListResult> {
179 self.inner.list_with_delimiter(prefix).await
180 }
181
182 async fn copy(&self, from: &Path, to: &Path) -> object_store::Result<()> {
183 self.inner.copy(from, to).await
184 }
185
186 async fn copy_if_not_exists(
187 &self,
188 from: &Path,
189 to: &Path,
190 ) -> object_store::Result<()> {
191 self.inner.copy_if_not_exists(from, to).await
192 }
193}