This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-rust.git
The following commit(s) were added to refs/heads/main by this push:
new f2e0dc7 feat(io): refactor FileIO to reuse storage operators and
improve path handling (#106)
f2e0dc7 is described below
commit f2e0dc78e4ca05ff568463736e81dbac333667bd
Author: Zach <[email protected]>
AuthorDate: Thu Apr 2 11:46:23 2026 +0800
feat(io): refactor FileIO to reuse storage operators and improve path
handling (#106)
---
crates/paimon/src/io/file_io.rs | 101 ++++++++++++++-----
crates/paimon/src/io/storage.rs | 211 +++++++++++++++++++++++++++++-----------
2 files changed, 230 insertions(+), 82 deletions(-)
diff --git a/crates/paimon/src/io/file_io.rs b/crates/paimon/src/io/file_io.rs
index 84c134a..aba327f 100644
--- a/crates/paimon/src/io/file_io.rs
+++ b/crates/paimon/src/io/file_io.rs
@@ -19,10 +19,11 @@ use crate::error::*;
use std::collections::HashMap;
use std::ops::Range;
use std::sync::Arc;
+use std::time::SystemTime;
use bytes::Bytes;
+use chrono::{DateTime, Utc};
use opendal::raw::normalize_root;
-use opendal::raw::Timestamp;
use opendal::Operator;
use snafu::ResultExt;
use url::Url;
@@ -105,7 +106,9 @@ impl FileIO {
Ok(FileStatus {
size: meta.content_length(),
is_dir: meta.is_dir(),
- last_modified: meta.last_modified(),
+ last_modified: meta
+ .last_modified()
+ .map(|v| DateTime::<Utc>::from(SystemTime::from(v))),
path: path.to_string(),
})
}
@@ -129,20 +132,18 @@ impl FileIO {
let mut statuses = Vec::new();
let list_path_normalized = list_path.trim_start_matches('/');
for entry in entries {
- // opendal list_with includes the root directory itself as the
first entry.
- // The root entry's path equals list_path (with or without leading
slash).
- // Skip it so callers only see the direct children.
let entry_path = entry.path();
- let entry_path_normalized = entry_path.trim_start_matches('/');
- if entry_path_normalized == list_path_normalized {
+ if entry_path.trim_start_matches('/') == list_path_normalized {
continue;
}
let meta = entry.metadata();
statuses.push(FileStatus {
size: meta.content_length(),
is_dir: meta.is_dir(),
- path: format!("{base_path}{}", entry.path()),
- last_modified: meta.last_modified(),
+ path: format!("{base_path}{entry_path}"),
+ last_modified: meta
+ .last_modified()
+ .map(|v| DateTime::<Utc>::from(SystemTime::from(v))),
});
}
@@ -298,7 +299,7 @@ pub struct FileStatus {
pub size: u64,
pub is_dir: bool,
pub path: String,
- pub last_modified: Option<Timestamp>,
+ pub last_modified: Option<DateTime<Utc>>,
}
#[derive(Debug)]
@@ -324,7 +325,9 @@ impl InputFile {
size: meta.content_length(),
is_dir: meta.is_dir(),
path: self.path.clone(),
- last_modified: meta.last_modified(),
+ last_modified: meta
+ .last_modified()
+ .map(|v| DateTime::<Utc>::from(SystemTime::from(v))),
})
}
@@ -387,17 +390,11 @@ mod file_action_test {
use bytes::Bytes;
fn setup_memory_file_io() -> FileIO {
- let storage = Storage::Memory;
- FileIO {
- storage: Arc::new(storage),
- }
+ FileIOBuilder::new("memory").build().unwrap()
}
fn setup_fs_file_io() -> FileIO {
- let storage = Storage::LocalFs;
- FileIO {
- storage: Arc::new(storage),
- }
+ FileIOBuilder::new("file").build().unwrap()
}
async fn common_test_get_status(file_io: &FileIO, path: &str) {
@@ -500,6 +497,62 @@ mod file_action_test {
common_test_delete_file(&file_io,
"memory:/test_file_delete_mem").await;
}
+ #[tokio::test]
+ async fn test_empty_path_should_return_error_for_exists_fs() {
+ let file_io = setup_fs_file_io();
+ let result = file_io.exists("").await;
+ assert!(matches!(result, Err(Error::ConfigInvalid { .. })));
+ }
+
+ #[tokio::test]
+ async fn test_empty_path_should_return_error_for_exists_memory() {
+ let file_io = setup_memory_file_io();
+ let result = file_io.exists("").await;
+ assert!(matches!(result, Err(Error::ConfigInvalid { .. })));
+ }
+
+ #[tokio::test]
+ async fn test_memory_operator_reuse_across_file_io_calls() {
+ let file_io = setup_memory_file_io();
+ let path = "memory:/tmp/reuse_case";
+ let dir = "memory:/tmp/";
+
+ file_io
+ .new_output(path)
+ .unwrap()
+ .write(Bytes::from("data"))
+ .await
+ .unwrap();
+
+ assert!(file_io.exists(path).await.unwrap());
+ assert_eq!(file_io.get_status(path).await.unwrap().size, 4);
+ assert!(file_io
+ .list_status(dir)
+ .await
+ .unwrap()
+ .iter()
+ .any(|status| status.path == path));
+
+ file_io.delete_dir(dir).await.unwrap();
+ }
+
+ #[tokio::test]
+ async fn test_memory_operator_not_shared_between_file_io_instances() {
+ let file_io_1 = setup_memory_file_io();
+ let file_io_2 = setup_memory_file_io();
+ let path = "memory:/tmp/reuse_isolation_case";
+
+ file_io_1
+ .new_output(path)
+ .unwrap()
+ .write(Bytes::from("data"))
+ .await
+ .unwrap();
+
+ assert!(file_io_1.exists(path).await.unwrap());
+ assert!(!file_io_2.exists(path).await.unwrap());
+ }
+
#[tokio::test]
async fn test_get_status_fs() {
let file_io = setup_fs_file_io();
@@ -548,17 +601,11 @@ mod input_output_test {
use bytes::Bytes;
fn setup_memory_file_io() -> FileIO {
- let storage = Storage::Memory;
- FileIO {
- storage: Arc::new(storage),
- }
+ FileIOBuilder::new("memory").build().unwrap()
}
fn setup_fs_file_io() -> FileIO {
- let storage = Storage::LocalFs;
- FileIO {
- storage: Arc::new(storage),
- }
+ FileIOBuilder::new("file").build().unwrap()
}
async fn common_test_output_file_write_and_read(file_io: &FileIO, path:
&str) {
diff --git a/crates/paimon/src/io/storage.rs b/crates/paimon/src/io/storage.rs
index 77739a1..31eab86 100644
--- a/crates/paimon/src/io/storage.rs
+++ b/crates/paimon/src/io/storage.rs
@@ -15,11 +15,17 @@
// specific language governing permissions and limitations
// under the License.
+use std::collections::HashMap;
+#[cfg(any(feature = "storage-oss", feature = "storage-s3"))]
+use std::sync::{Mutex, MutexGuard};
+
#[cfg(feature = "storage-oss")]
use opendal::services::OssConfig;
#[cfg(feature = "storage-s3")]
use opendal::services::S3Config;
use opendal::{Operator, Scheme};
+#[cfg(any(feature = "storage-oss", feature = "storage-s3"))]
+use url::Url;
use crate::error;
@@ -29,37 +35,49 @@ use super::FileIOBuilder;
#[derive(Debug)]
pub enum Storage {
#[cfg(feature = "storage-memory")]
- Memory,
+ Memory { op: Operator },
#[cfg(feature = "storage-fs")]
- LocalFs,
+ LocalFs { op: Operator },
#[cfg(feature = "storage-oss")]
- Oss { config: Box<OssConfig> },
+ Oss {
+ config: Box<OssConfig>,
+ operators: Mutex<HashMap<String, Operator>>,
+ },
#[cfg(feature = "storage-s3")]
- S3 { config: Box<S3Config> },
+ S3 {
+ config: Box<S3Config>,
+ operators: Mutex<HashMap<String, Operator>>,
+ },
}
impl Storage {
pub(crate) fn build(file_io_builder: FileIOBuilder) -> crate::Result<Self>
{
- let (scheme_str, _props) = file_io_builder.into_parts();
+ let (scheme_str, props) = file_io_builder.into_parts();
let scheme = Self::parse_scheme(&scheme_str)?;
match scheme {
#[cfg(feature = "storage-memory")]
- Scheme::Memory => Ok(Self::Memory),
+ Scheme::Memory => Ok(Self::Memory {
+ op: super::memory_config_build()?,
+ }),
#[cfg(feature = "storage-fs")]
- Scheme::Fs => Ok(Self::LocalFs),
+ Scheme::Fs => Ok(Self::LocalFs {
+ op: super::fs_config_build()?,
+ }),
#[cfg(feature = "storage-oss")]
Scheme::Oss => {
- let config = super::oss_config_parse(_props)?;
+ let config = super::oss_config_parse(props)?;
Ok(Self::Oss {
config: Box::new(config),
+ operators: Mutex::new(HashMap::new()),
})
}
#[cfg(feature = "storage-s3")]
Scheme::S3 => {
- let config = super::s3_config_parse(_props)?;
+ let config = super::s3_config_parse(props)?;
Ok(Self::S3 {
config: Box::new(config),
+ operators: Mutex::new(HashMap::new()),
})
}
_ => Err(error::Error::IoUnsupported {
@@ -71,58 +89,141 @@ impl Storage {
pub(crate) fn create<'a>(&self, path: &'a str) -> crate::Result<(Operator,
&'a str)> {
match self {
#[cfg(feature = "storage-memory")]
- Storage::Memory => {
- let op = super::memory_config_build()?;
-
- if let Some(stripped) = path.strip_prefix("memory:/") {
- Ok((op, stripped))
- } else {
- Ok((op, &path[1..]))
- }
- }
+ Storage::Memory { op } => Ok((op.clone(),
Self::memory_relative_path(path)?)),
#[cfg(feature = "storage-fs")]
- Storage::LocalFs => {
- let op = super::fs_config_build()?;
-
- if let Some(stripped) = path.strip_prefix("file:/") {
- Ok((op, stripped))
- } else {
- Ok((op, &path[1..]))
- }
- }
+ Storage::LocalFs { op } => Ok((op.clone(),
Self::fs_relative_path(path)?)),
#[cfg(feature = "storage-oss")]
- Storage::Oss { config } => {
- let op = super::oss_config_build(config, path)?;
- let prefix = format!("oss://{}/", op.info().name());
- if let Some(stripped) = path.strip_prefix(&prefix) {
- Ok((op, stripped))
- } else {
- Err(error::Error::ConfigInvalid {
- message: format!("Invalid OSS url: {path}, should
start with {prefix}"),
- })
- }
+ Storage::Oss { config, operators } => {
+ let (bucket, relative_path) =
Self::oss_bucket_and_relative_path(path)?;
+ let op = Self::cached_oss_operator(config, operators, path,
&bucket)?;
+ Ok((op, relative_path))
}
#[cfg(feature = "storage-s3")]
- Storage::S3 { config } => {
- let op = super::s3_config_build(config, path)?;
- // Support both s3:// and s3a:// URL prefixes.
- let info = op.info();
- let bucket = info.name();
- let s3_prefix = format!("s3://{}/", bucket);
- let s3a_prefix = format!("s3a://{}/", bucket);
- if let Some(stripped) = path.strip_prefix(&s3_prefix) {
- Ok((op, stripped))
- } else if let Some(stripped) = path.strip_prefix(&s3a_prefix) {
- Ok((op, stripped))
- } else {
- Err(error::Error::ConfigInvalid {
- message: format!(
- "Invalid S3 url: {path}, should start with
{s3_prefix} or {s3a_prefix}"
- ),
- })
- }
+ Storage::S3 { config, operators } => {
+ let (bucket, relative_path) =
Self::s3_bucket_and_relative_path(path)?;
+ let op = Self::cached_s3_operator(config, operators, path,
&bucket)?;
+ Ok((op, relative_path))
+ }
+ }
+ }
+
+ #[cfg(feature = "storage-memory")]
+ fn memory_relative_path(path: &str) -> crate::Result<&str> {
+ if let Some(stripped) = path.strip_prefix("memory:/") {
+ Ok(stripped)
+ } else {
+ path.get(1..).ok_or_else(|| error::Error::ConfigInvalid {
+ message: format!("Invalid memory path: {path}"),
+ })
+ }
+ }
+
+ #[cfg(feature = "storage-fs")]
+ fn fs_relative_path(path: &str) -> crate::Result<&str> {
+ if let Some(stripped) = path.strip_prefix("file:/") {
+ Ok(stripped)
+ } else {
+ path.get(1..).ok_or_else(|| error::Error::ConfigInvalid {
+ message: format!("Invalid file path: {path}"),
+ })
+ }
+ }
+
+ #[cfg(feature = "storage-oss")]
+ fn oss_bucket_and_relative_path(path: &str) -> crate::Result<(String,
&str)> {
+ let url = Url::parse(path).map_err(|_| error::Error::ConfigInvalid {
+ message: format!("Invalid OSS url: {path}"),
+ })?;
+ let bucket = url
+ .host_str()
+ .ok_or_else(|| error::Error::ConfigInvalid {
+ message: format!("Invalid OSS url: {path}, missing bucket"),
+ })?
+ .to_string();
+ let prefix = format!("oss://{bucket}/");
+ let relative_path =
+ path.strip_prefix(&prefix)
+ .ok_or_else(|| error::Error::ConfigInvalid {
+ message: format!("Invalid OSS url: {path}, should start
with {prefix}"),
+ })?;
+ Ok((bucket, relative_path))
+ }
+
+ #[cfg(feature = "storage-s3")]
+ fn s3_bucket_and_relative_path<'a>(path: &'a str) ->
crate::Result<(String, &'a str)> {
+ let url = Url::parse(path).map_err(|_| error::Error::ConfigInvalid {
+ message: format!("Invalid S3 url: {path}"),
+ })?;
+ let bucket = url
+ .host_str()
+ .ok_or_else(|| error::Error::ConfigInvalid {
+ message: format!("Invalid S3 url: {path}, missing bucket"),
+ })?
+ .to_string();
+ let scheme = url.scheme();
+ let prefix = match scheme {
+ "s3" | "s3a" => format!("{scheme}://{bucket}/"),
+ _ => {
+ return Err(error::Error::ConfigInvalid {
+ message: format!(
+ "Invalid S3 url: {path}, should start with
s3://{bucket}/ or s3a://{bucket}/"
+ ),
+ });
}
+ };
+ let relative_path =
+ path.strip_prefix(&prefix)
+ .ok_or_else(|| error::Error::ConfigInvalid {
+ message: format!(
+ "Invalid S3 url: {path}, should start with s3://{bucket}/
or s3a://{bucket}/"
+ ),
+ })?;
+ Ok((bucket, relative_path))
+ }
+
+ #[cfg(any(feature = "storage-oss", feature = "storage-s3"))]
+ fn lock_operator_cache<'a>(
+ operators: &'a Mutex<HashMap<String, Operator>>,
+ storage_name: &str,
+ ) -> crate::Result<MutexGuard<'a, HashMap<String, Operator>>> {
+ operators.lock().map_err(|_| error::Error::UnexpectedError {
+ message: format!("Failed to lock {storage_name} operator cache"),
+ source: None,
+ })
+ }
+
+ #[cfg(feature = "storage-oss")]
+ fn cached_oss_operator(
+ config: &OssConfig,
+ operators: &Mutex<HashMap<String, Operator>>,
+ path: &str,
+ bucket: &str,
+ ) -> crate::Result<Operator> {
+ let mut operators = Self::lock_operator_cache(operators, "OSS")?;
+ if let Some(op) = operators.get(bucket) {
+ return Ok(op.clone());
+ }
+
+ let op = super::oss_config_build(config, path)?;
+ operators.insert(bucket.to_string(), op.clone());
+ Ok(op)
+ }
+
+ #[cfg(feature = "storage-s3")]
+ fn cached_s3_operator(
+ config: &S3Config,
+ operators: &Mutex<HashMap<String, Operator>>,
+ path: &str,
+ bucket: &str,
+ ) -> crate::Result<Operator> {
+ let mut operators = Self::lock_operator_cache(operators, "S3")?;
+ if let Some(op) = operators.get(bucket) {
+ return Ok(op.clone());
}
+
+ let op = super::s3_config_build(config, path)?;
+ operators.insert(bucket.to_string(), op.clone());
+ Ok(op)
}
fn parse_scheme(scheme: &str) -> crate::Result<Scheme> {