This is an automated email from the ASF dual-hosted git repository.

xushiyan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/hudi-rs.git


The following commit(s) were added to refs/heads/main by this push:
     new da217e5  refactor: use `object_store` API for Timeline (#27)
da217e5 is described below

commit da217e5e2923cd244cc463f0071afbe7ebf245a0
Author: Shiyan Xu <[email protected]>
AuthorDate: Sun Jun 23 19:00:23 2024 -0500

    refactor: use `object_store` API for Timeline (#27)
---
 Cargo.toml                                       |   2 +-
 crates/core/Cargo.toml                           |   1 -
 crates/{fs => core}/fixtures/a.parquet           | Bin
 crates/core/src/lib.rs                           |   1 +
 crates/core/src/storage/file_metadata.rs         |  22 ++++++
 crates/core/src/storage/mod.rs                   |  58 ++++++++++++--
 crates/core/src/table/fs_view.rs                 |   2 +-
 crates/core/src/table/mod.rs                     |  35 +++++++--
 crates/{fs/src/lib.rs => core/src/test_utils.rs} |  24 +++---
 crates/core/src/timeline/mod.rs                  |  93 ++++++++++++-----------
 crates/datafusion/Cargo.toml                     |   1 -
 crates/fs/Cargo.toml                             |  66 ----------------
 crates/fs/src/file_systems.rs                    |  62 ---------------
 crates/fs/src/test_utils.rs                      |  31 --------
 14 files changed, 162 insertions(+), 236 deletions(-)

diff --git a/Cargo.toml b/Cargo.toml
index fa5f51b..28340c9 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -41,7 +41,7 @@ arrow-ord = { version = "50" }
 arrow-row = { version = "50" }
 arrow-schema = { version = "50" }
 arrow-select = { version = "50" }
-object_store = { version = "0.10.1" }
+object_store = { version = "0.9.1" }
 parquet = { version = "50" }
 
 # datafusion
diff --git a/crates/core/Cargo.toml b/crates/core/Cargo.toml
index 04da3f2..bdf05bb 100644
--- a/crates/core/Cargo.toml
+++ b/crates/core/Cargo.toml
@@ -23,7 +23,6 @@ license.workspace = true
 rust-version.workspace = true
 
 [dependencies]
-hudi-fs = { path = "../fs"}
 # arrow
 arrow = { workspace = true }
 arrow-arith = { workspace = true }
diff --git a/crates/fs/fixtures/a.parquet b/crates/core/fixtures/a.parquet
similarity index 100%
rename from crates/fs/fixtures/a.parquet
rename to crates/core/fixtures/a.parquet
diff --git a/crates/core/src/lib.rs b/crates/core/src/lib.rs
index aa7d66a..0d56755 100644
--- a/crates/core/src/lib.rs
+++ b/crates/core/src/lib.rs
@@ -23,6 +23,7 @@ mod file_group;
 pub mod table;
 pub type HudiTable = Table;
 mod storage;
+pub mod test_utils;
 mod timeline;
 
 pub fn crate_version() -> &'static str {
diff --git a/crates/core/src/storage/file_metadata.rs 
b/crates/core/src/storage/file_metadata.rs
index ee80a03..a7b8f28 100644
--- a/crates/core/src/storage/file_metadata.rs
+++ b/crates/core/src/storage/file_metadata.rs
@@ -17,6 +17,10 @@
  * under the License.
  */
 
+use anyhow::anyhow;
+use anyhow::Result;
+use std::path::Path;
+
 #[derive(Clone, Debug, Default, Eq, PartialEq)]
 pub struct FileMetadata {
     pub path: String,
@@ -35,3 +39,21 @@ impl FileMetadata {
         }
     }
 }
+
+pub fn split_filename(filename: &str) -> Result<(String, String)> {
+    let path = Path::new(filename);
+
+    let stem = path
+        .file_stem()
+        .and_then(|s| s.to_str())
+        .ok_or_else(|| anyhow!("No file stem found"))?
+        .to_string();
+
+    let extension = path
+        .extension()
+        .and_then(|e| e.to_str())
+        .unwrap_or_default()
+        .to_string();
+
+    Ok((stem, extension))
+}
diff --git a/crates/core/src/storage/mod.rs b/crates/core/src/storage/mod.rs
index 97df419..3de77a4 100644
--- a/crates/core/src/storage/mod.rs
+++ b/crates/core/src/storage/mod.rs
@@ -19,10 +19,15 @@
 
 use std::collections::HashMap;
 use std::path::PathBuf;
+use std::sync::Arc;
 
 use async_recursion::async_recursion;
+use bytes::Bytes;
 use object_store::path::Path as ObjPath;
-use object_store::{parse_url_opts, DynObjectStore, ObjectStore};
+use object_store::{parse_url_opts, ObjectStore};
+use parquet::arrow::async_reader::ParquetObjectReader;
+use parquet::arrow::ParquetRecordBatchStreamBuilder;
+use parquet::file::metadata::ParquetMetaData;
 use url::Url;
 
 use crate::storage::file_metadata::FileMetadata;
@@ -32,7 +37,7 @@ pub(crate) mod file_metadata;
 #[allow(dead_code)]
 pub struct Storage {
     base_url: Url,
-    object_store: Box<DynObjectStore>,
+    object_store: Arc<dyn ObjectStore>,
     options: HashMap<String, String>,
 }
 
@@ -43,22 +48,42 @@ impl Storage {
         let object_store = parse_url_opts(&base_url, &options).unwrap().0;
         Box::from(Storage {
             base_url,
-            object_store,
+            object_store: Arc::new(object_store),
             options,
         })
     }
 
-    pub async fn get_file_metadata(&self, path: &str) -> FileMetadata {
-        let p = ObjPath::from(path);
-        let meta = self.object_store.head(&p).await.unwrap();
+    pub async fn get_file_metadata(&self, relative_path: &str) -> FileMetadata 
{
+        let mut obj_url = self.base_url.clone();
+        obj_url.path_segments_mut().unwrap().push(relative_path);
+        let obj_path = ObjPath::from_url_path(obj_url.path()).unwrap();
+        let meta = self.object_store.head(&obj_path).await.unwrap();
         FileMetadata {
             path: meta.location.to_string(),
-            name: p.filename().unwrap().to_string(),
+            name: obj_path.filename().unwrap().to_string(),
             size: meta.size,
             num_records: None,
         }
     }
 
+    pub async fn get_parquet_file_metadata(&self, relative_path: &str) -> 
ParquetMetaData {
+        let mut obj_url = self.base_url.clone();
+        obj_url.path_segments_mut().unwrap().push(relative_path);
+        let obj_path = ObjPath::from_url_path(obj_url.path()).unwrap();
+        let meta = self.object_store.head(&obj_path).await.unwrap();
+        let reader = ParquetObjectReader::new(self.object_store.clone(), meta);
+        let builder = 
ParquetRecordBatchStreamBuilder::new(reader).await.unwrap();
+        builder.metadata().as_ref().to_owned()
+    }
+
+    pub async fn get_file_data(&self, relative_path: &str) -> Bytes {
+        let mut obj_url = self.base_url.clone();
+        obj_url.path_segments_mut().unwrap().push(relative_path);
+        let obj_path = ObjPath::from_url_path(obj_url.path()).unwrap();
+        let result = self.object_store.get(&obj_path).await.unwrap();
+        result.bytes().await.unwrap()
+    }
+
     pub async fn list_dirs(&self, subdir: Option<&str>) -> Vec<String> {
         self.list_dirs_as_paths(subdir)
             .await
@@ -125,11 +150,11 @@ pub async fn get_leaf_dirs(storage: &Storage, subdir: 
Option<&str>) -> Vec<Strin
 
 #[cfg(test)]
 mod tests {
-    use object_store::path::Path as ObjPath;
     use std::collections::{HashMap, HashSet};
     use std::fs::canonicalize;
     use std::path::Path;
 
+    use object_store::path::Path as ObjPath;
     use url::Url;
 
     use crate::storage::{get_leaf_dirs, Storage};
@@ -214,4 +239,21 @@ mod tests {
             vec![".hoodie", "part1", "part2/part22", "part3/part32/part33"]
         );
     }
+
+    #[tokio::test]
+    async fn get_file_metadata() {
+        let base_url =
+            
Url::from_directory_path(canonicalize(Path::new("fixtures")).unwrap()).unwrap();
+        let storage = Storage::new(base_url.path(), HashMap::new());
+        let file_metadata = storage.get_file_metadata("a.parquet").await;
+        assert_eq!(file_metadata.name, "a.parquet");
+        assert_eq!(
+            file_metadata.path,
+            ObjPath::from_url_path(base_url.join("a.parquet").unwrap().path())
+                .unwrap()
+                .to_string()
+        );
+        assert_eq!(file_metadata.size, 866);
+        assert_eq!(file_metadata.num_records, None);
+    }
 }
diff --git a/crates/core/src/table/fs_view.rs b/crates/core/src/table/fs_view.rs
index ae8812b..2f3b981 100644
--- a/crates/core/src/table/fs_view.rs
+++ b/crates/core/src/table/fs_view.rs
@@ -135,7 +135,7 @@ mod tests {
     use std::collections::HashSet;
     use std::path::Path;
 
-    use hudi_fs::test_utils::extract_test_table;
+    use crate::test_utils::extract_test_table;
 
     use crate::table::fs_view::FileSystemView;
 
diff --git a/crates/core/src/table/mod.rs b/crates/core/src/table/mod.rs
index 50f5289..9191aa4 100644
--- a/crates/core/src/table/mod.rs
+++ b/crates/core/src/table/mod.rs
@@ -82,17 +82,36 @@ impl Table {
     }
 
     pub fn get_timeline(&self) -> Result<Timeline> {
-        Timeline::new(self.base_path.as_path())
+        let rt = tokio::runtime::Builder::new_current_thread()
+            .enable_all()
+            .build()
+            .unwrap();
+        let f = async { Timeline::new(self.base_path.to_str().unwrap()).await 
};
+        rt.block_on(f)
     }
 
     pub fn schema(&self) -> SchemaRef {
-        match Timeline::new(self.base_path.as_path()) {
-            Ok(timeline) => match timeline.get_latest_schema() {
-                Ok(schema) => SchemaRef::from(schema),
-                Err(e) => {
-                    panic!("Failed to resolve table schema: {}", e)
+        let rt = tokio::runtime::Builder::new_current_thread()
+            .enable_all()
+            .build()
+            .unwrap();
+        let f = async { Timeline::new(self.base_path.to_str().unwrap()).await 
};
+        let timeline = rt.block_on(f);
+        match timeline {
+            Ok(timeline) => {
+                let rt = tokio::runtime::Builder::new_current_thread()
+                    .enable_all()
+                    .build()
+                    .unwrap();
+                let wrapper = async { timeline.get_latest_schema().await };
+                let result = rt.block_on(wrapper);
+                match result {
+                    Ok(schema) => SchemaRef::from(schema),
+                    Err(e) => {
+                        panic!("Failed to resolve table schema: {}", e)
+                    }
                 }
-            },
+            }
             Err(e) => {
                 panic!("Failed to resolve table schema: {}", e)
             }
@@ -210,7 +229,7 @@ mod tests {
     use crate::table::config::TableType::CopyOnWrite;
     use crate::table::metadata::ProvidesTableMetadata;
     use crate::table::Table;
-    use hudi_fs::test_utils::extract_test_table;
+    use crate::test_utils::extract_test_table;
 
     #[test]
     fn hudi_table_get_latest_file_paths() {
diff --git a/crates/fs/src/lib.rs b/crates/core/src/test_utils.rs
similarity index 77%
rename from crates/fs/src/lib.rs
rename to crates/core/src/test_utils.rs
index 9721a00..94e3c4b 100644
--- a/crates/fs/src/lib.rs
+++ b/crates/core/src/test_utils.rs
@@ -17,10 +17,18 @@
  * under the License.
  */
 
-use std::ffi::OsStr;
+use std::fs;
+use std::io::Cursor;
+use std::path::{Path, PathBuf};
 
-pub mod file_systems;
-pub mod test_utils;
+use tempfile::tempdir;
+
+pub fn extract_test_table(fixture_path: &Path) -> PathBuf {
+    let target_dir = tempdir().unwrap().path().to_path_buf();
+    let archive = fs::read(fixture_path).unwrap();
+    zip_extract::extract(Cursor::new(archive), &target_dir, true).unwrap();
+    target_dir
+}
 
 #[macro_export]
 macro_rules! assert_approx_eq {
@@ -37,13 +45,3 @@ macro_rules! assert_approx_eq {
         );
     }};
 }
-
-pub fn file_name_without_ext(file_name: Option<&OsStr>) -> String {
-    return file_name
-        .and_then(|e| e.to_str())
-        .unwrap()
-        .rsplit_once('.')
-        .unwrap()
-        .0
-        .to_owned();
-}
diff --git a/crates/core/src/timeline/mod.rs b/crates/core/src/timeline/mod.rs
index 04e66c8..3ac0a70 100644
--- a/crates/core/src/timeline/mod.rs
+++ b/crates/core/src/timeline/mod.rs
@@ -18,17 +18,16 @@
  */
 
 use std::collections::HashMap;
-use std::fs;
-use std::fs::File;
-use std::io::Read;
-use std::path::{Path, PathBuf};
+use std::fmt::Debug;
+use std::path::PathBuf;
 
 use anyhow::{anyhow, Result};
 use arrow_schema::SchemaRef;
-use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
-use serde_json::Value;
+use parquet::arrow::parquet_to_arrow_schema;
+use serde_json::{Map, Value};
 
-use hudi_fs::file_name_without_ext;
+use crate::storage::file_metadata::split_filename;
+use crate::storage::Storage;
 
 #[allow(dead_code)]
 #[derive(Debug, Clone, PartialEq)]
@@ -61,29 +60,28 @@ impl Instant {
 
 #[derive(Debug, Clone)]
 pub struct Timeline {
-    pub base_path: PathBuf,
+    pub base_path: String,
     pub instants: Vec<Instant>,
 }
 
 impl Timeline {
-    pub fn new(base_path: &Path) -> Result<Self> {
-        let instants = Self::load_completed_commit_instants(base_path)?;
+    pub async fn new(base_path: &str) -> Result<Self> {
+        let instants = Self::load_completed_commit_instants(base_path).await?;
         Ok(Self {
-            base_path: base_path.to_path_buf(),
+            base_path: base_path.to_string(),
             instants,
         })
     }
 
-    fn load_completed_commit_instants(base_path: &Path) -> 
Result<Vec<Instant>> {
+    async fn load_completed_commit_instants(base_path: &str) -> 
Result<Vec<Instant>> {
+        let storage = Storage::new(base_path, HashMap::new());
         let mut completed_commits = Vec::new();
-        let mut timeline_path = base_path.to_path_buf();
-        timeline_path.push(".hoodie");
-        for entry in fs::read_dir(timeline_path)? {
-            let p = entry?.path();
-            if p.is_file() && p.extension().and_then(|e| e.to_str()) == 
Some("commit") {
+        for file_metadata in storage.list_files(Some(".hoodie")).await {
+            let (file_stem, file_ext) = 
split_filename(file_metadata.name.as_str())?;
+            if file_ext == "commit" {
                 completed_commits.push(Instant {
                     state: State::Completed,
-                    timestamp: file_name_without_ext(p.file_name()),
+                    timestamp: file_stem,
                     action: "commit".to_owned(),
                 })
             }
@@ -93,34 +91,40 @@ impl Timeline {
         Ok(completed_commits)
     }
 
-    pub fn get_latest_commit_metadata(&self) -> Result<HashMap<String, Value>> 
{
+    async fn get_latest_commit_metadata(&self) -> Result<Map<String, Value>> {
         match self.instants.iter().next_back() {
             Some(instant) => {
-                let mut latest_instant_file_path = 
self.base_path.to_path_buf();
-                latest_instant_file_path.push(".hoodie");
-                latest_instant_file_path.push(instant.file_name());
-                let mut f = File::open(latest_instant_file_path)?;
-                let mut content = String::new();
-                f.read_to_string(&mut content)?;
-                let commit_metadata = serde_json::from_str(&content)?;
+                let mut commit_file_path = PathBuf::from(".hoodie");
+                commit_file_path.push(instant.file_name());
+                let storage = Storage::new(&self.base_path, HashMap::new());
+                let bytes = storage
+                    .get_file_data(commit_file_path.to_str().unwrap())
+                    .await;
+                let json: Value = serde_json::from_slice(&bytes)?;
+                let commit_metadata = json
+                    .as_object()
+                    .ok_or_else(|| anyhow!("Expected JSON object"))?
+                    .clone();
                 Ok(commit_metadata)
             }
-            None => Ok(HashMap::new()),
+            None => Ok(Map::new()),
         }
     }
 
-    pub fn get_latest_schema(&self) -> Result<SchemaRef> {
-        let commit_metadata = self.get_latest_commit_metadata()?;
+    pub async fn get_latest_schema(&self) -> Result<SchemaRef> {
+        let commit_metadata = self.get_latest_commit_metadata().await.unwrap();
         if let Some(partition_to_write_stats) = 
commit_metadata["partitionToWriteStats"].as_object()
         {
             if let Some((_, value)) = partition_to_write_stats.iter().next() {
                 if let Some(first_value) = value.as_array().and_then(|arr| 
arr.first()) {
                     if let Some(path) = first_value["path"].as_str() {
-                        let mut base_file_path = 
PathBuf::from(&self.base_path);
-                        base_file_path.push(path);
-                        let file = File::open(base_file_path)?;
-                        let builder = 
ParquetRecordBatchReaderBuilder::try_new(file)?;
-                        return Ok(builder.schema().to_owned());
+                        let storage = Storage::new(&self.base_path, 
HashMap::new());
+                        let parquet_meta = 
storage.get_parquet_file_metadata(path).await;
+                        let arrow_schema = parquet_to_arrow_schema(
+                            parquet_meta.file_metadata().schema_descr(),
+                            None,
+                        )?;
+                        return Ok(SchemaRef::from(arrow_schema));
                     }
                 }
             }
@@ -131,25 +135,26 @@ impl Timeline {
 
 #[cfg(test)]
 mod tests {
+    use std::fs::canonicalize;
     use std::path::Path;
 
-    use hudi_fs::test_utils::extract_test_table;
-
+    use crate::test_utils::extract_test_table;
     use crate::timeline::{Instant, State, Timeline};
 
-    #[test]
-    fn read_latest_schema() {
+    #[tokio::test]
+    async fn read_latest_schema() {
         let fixture_path = Path::new("fixtures/table/0.x_cow_partitioned.zip");
         let target_table_path = extract_test_table(fixture_path);
-        let timeline = Timeline::new(target_table_path.as_path()).unwrap();
-        let table_schema = timeline.get_latest_schema().unwrap();
+        let base_path = canonicalize(target_table_path).unwrap();
+        let timeline = 
Timeline::new(base_path.to_str().unwrap()).await.unwrap();
+        let table_schema = timeline.get_latest_schema().await.unwrap();
         assert_eq!(table_schema.fields.len(), 11)
     }
 
-    #[test]
-    fn init_commits_timeline() {
-        let fixture_path = Path::new("fixtures/timeline/commits_stub");
-        let timeline = Timeline::new(fixture_path).unwrap();
+    #[tokio::test]
+    async fn init_commits_timeline() {
+        let base_path = 
canonicalize(Path::new("fixtures/timeline/commits_stub")).unwrap();
+        let timeline = 
Timeline::new(base_path.to_str().unwrap()).await.unwrap();
         assert_eq!(
             timeline.instants,
             vec![
diff --git a/crates/datafusion/Cargo.toml b/crates/datafusion/Cargo.toml
index 0a042ab..e1a4560 100644
--- a/crates/datafusion/Cargo.toml
+++ b/crates/datafusion/Cargo.toml
@@ -24,7 +24,6 @@ rust-version.workspace = true
 
 [dependencies]
 hudi-core = { path = "../core"}
-hudi-fs = { path = "../fs" }
 # arrow
 arrow = { workspace = true }
 arrow-arith = { workspace = true }
diff --git a/crates/fs/Cargo.toml b/crates/fs/Cargo.toml
deleted file mode 100644
index 84d4950..0000000
--- a/crates/fs/Cargo.toml
+++ /dev/null
@@ -1,66 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-[package]
-name = "hudi-fs"
-version = "0.1.0"
-edition.workspace = true
-license.workspace = true
-rust-version.workspace = true
-
-[dependencies]
-# arrow
-arrow = { workspace = true }
-arrow-arith = { workspace = true }
-arrow-array = { workspace = true , features = ["chrono-tz"]}
-arrow-buffer = { workspace = true }
-arrow-cast = { workspace = true }
-arrow-ipc = { workspace = true }
-arrow-json = { workspace = true }
-arrow-ord = { workspace = true }
-arrow-row = { workspace = true }
-arrow-schema = { workspace = true, features = ["serde"] }
-arrow-select = { workspace = true }
-parquet = { workspace = true, features = [
-    "async",
-    "object_store",
-] }
-pin-project-lite = "^0.2.7"
-
-# datafusion
-datafusion = { workspace = true, optional = true }
-datafusion-expr = { workspace = true, optional = true }
-datafusion-common = { workspace = true, optional = true }
-datafusion-proto = { workspace = true, optional = true }
-datafusion-sql = { workspace = true, optional = true }
-datafusion-physical-expr = { workspace = true, optional = true }
-
-# serde
-serde = { workspace = true, features = ["derive"] }
-serde_json = { workspace = true }
-
-# "stdlib"
-bytes = { workspace = true }
-chrono = { workspace = true, default-features = false, features = ["clock"] }
-hashbrown = "0.14.3"
-regex = { workspace = true }
-uuid = { workspace = true, features = ["serde", "v4"] }
-url = { workspace = true }
-
-# test
-tempfile = "3.10.1"
-zip-extract = "0.1.3"
diff --git a/crates/fs/src/file_systems.rs b/crates/fs/src/file_systems.rs
deleted file mode 100644
index 7160b7d..0000000
--- a/crates/fs/src/file_systems.rs
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-use std::error::Error;
-use std::{fs::File, path::Path};
-
-use parquet::file::reader::{FileReader, SerializedFileReader};
-
-#[derive(Clone, Debug)]
-pub struct FileMetadata {
-    pub path: String,
-    pub name: String,
-    pub size: u64,
-    pub num_records: i64,
-}
-
-impl FileMetadata {
-    pub fn from_path(p: &Path) -> Result<Self, Box<dyn Error>> {
-        let file = File::open(p)?;
-        let reader = SerializedFileReader::new(file).unwrap();
-        let num_records = reader.metadata().file_metadata().num_rows();
-        Ok(Self {
-            path: p.to_str().unwrap().to_string(),
-            name: p.file_name().unwrap().to_os_string().into_string().unwrap(),
-            size: p.metadata().unwrap().len(),
-            num_records,
-        })
-    }
-}
-
-#[cfg(test)]
-mod tests {
-    use crate::assert_approx_eq;
-    use crate::file_systems::FileMetadata;
-    use std::path::Path;
-
-    #[test]
-    fn read_file_metadata() {
-        let fixture_path = Path::new("fixtures/a.parquet");
-        let fm = FileMetadata::from_path(fixture_path).unwrap();
-        assert_eq!(fm.path, "fixtures/a.parquet");
-        assert_eq!(fm.name, "a.parquet");
-        assert_approx_eq!(fm.size, 866, 20);
-        assert_eq!(fm.num_records, 5);
-    }
-}
diff --git a/crates/fs/src/test_utils.rs b/crates/fs/src/test_utils.rs
deleted file mode 100644
index 88ba7d3..0000000
--- a/crates/fs/src/test_utils.rs
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-use std::fs;
-use std::io::Cursor;
-use std::path::{Path, PathBuf};
-
-use tempfile::tempdir;
-
-pub fn extract_test_table(fixture_path: &Path) -> PathBuf {
-    let target_dir = tempdir().unwrap().path().to_path_buf();
-    let archive = fs::read(fixture_path).unwrap();
-    zip_extract::extract(Cursor::new(archive), &target_dir, true).unwrap();
-    target_dir
-}

Reply via email to