This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new 3ec678d  feat(io): support S3 file IO (#142) (#143)
3ec678d is described below

commit 3ec678dec1f8bc4fd933d5754c01da1a4e2c2f96
Author: Zach <[email protected]>
AuthorDate: Fri Mar 20 22:04:38 2026 +0800

    feat(io): support S3 file IO (#142) (#143)
---
 crates/paimon/Cargo.toml           |   3 +-
 crates/paimon/src/io/mod.rs        |   5 +
 crates/paimon/src/io/storage.rs    |  32 +++
 crates/paimon/src/io/storage_s3.rs | 415 +++++++++++++++++++++++++++++++++++++
 4 files changed, 454 insertions(+), 1 deletion(-)

diff --git a/crates/paimon/Cargo.toml b/crates/paimon/Cargo.toml
index 4c49a43..d2768b4 100644
--- a/crates/paimon/Cargo.toml
+++ b/crates/paimon/Cargo.toml
@@ -28,11 +28,12 @@ version.workspace = true
 
 [features]
 default = ["storage-memory", "storage-fs"]
-storage-all = ["storage-memory", "storage-fs", "storage-oss"]
+storage-all = ["storage-memory", "storage-fs", "storage-oss", "storage-s3"]
 
 storage-memory = ["opendal/services-memory"]
 storage-fs = ["opendal/services-fs"]
 storage-oss = ["opendal/services-oss"]
+storage-s3 = ["opendal/services-s3"]
 
 [dependencies]
 url = "2.5.2"
diff --git a/crates/paimon/src/io/mod.rs b/crates/paimon/src/io/mod.rs
index 4e69be4..226aa72 100644
--- a/crates/paimon/src/io/mod.rs
+++ b/crates/paimon/src/io/mod.rs
@@ -35,3 +35,8 @@ use storage_memory::*;
 mod storage_oss;
 #[cfg(feature = "storage-oss")]
 use storage_oss::*;
+
+#[cfg(feature = "storage-s3")]
+mod storage_s3;
+#[cfg(feature = "storage-s3")]
+use storage_s3::*;
diff --git a/crates/paimon/src/io/storage.rs b/crates/paimon/src/io/storage.rs
index 1abfbc0..77739a1 100644
--- a/crates/paimon/src/io/storage.rs
+++ b/crates/paimon/src/io/storage.rs
@@ -17,6 +17,8 @@
 
 #[cfg(feature = "storage-oss")]
 use opendal::services::OssConfig;
+#[cfg(feature = "storage-s3")]
+use opendal::services::S3Config;
 use opendal::{Operator, Scheme};
 
 use crate::error;
@@ -32,6 +34,8 @@ pub enum Storage {
     LocalFs,
     #[cfg(feature = "storage-oss")]
     Oss { config: Box<OssConfig> },
+    #[cfg(feature = "storage-s3")]
+    S3 { config: Box<S3Config> },
 }
 
 impl Storage {
@@ -51,6 +55,13 @@ impl Storage {
                     config: Box::new(config),
                 })
             }
+            #[cfg(feature = "storage-s3")]
+            Scheme::S3 => {
+                let config = super::s3_config_parse(_props)?;
+                Ok(Self::S3 {
+                    config: Box::new(config),
+                })
+            }
             _ => Err(error::Error::IoUnsupported {
                 message: "Unsupported storage feature".to_string(),
             }),
@@ -91,6 +102,26 @@ impl Storage {
                     })
                 }
             }
+            #[cfg(feature = "storage-s3")]
+            Storage::S3 { config } => {
+                let op = super::s3_config_build(config, path)?;
+                // Support both s3:// and s3a:// URL prefixes.
+                let info = op.info();
+                let bucket = info.name();
+                let s3_prefix = format!("s3://{}/", bucket);
+                let s3a_prefix = format!("s3a://{}/", bucket);
+                if let Some(stripped) = path.strip_prefix(&s3_prefix) {
+                    Ok((op, stripped))
+                } else if let Some(stripped) = path.strip_prefix(&s3a_prefix) {
+                    Ok((op, stripped))
+                } else {
+                    Err(error::Error::ConfigInvalid {
+                        message: format!(
+                            "Invalid S3 url: {path}, should start with 
{s3_prefix} or {s3a_prefix}"
+                        ),
+                    })
+                }
+            }
         }
     }
 
@@ -98,6 +129,7 @@ impl Storage {
         match scheme {
             "memory" => Ok(Scheme::Memory),
             "file" | "" => Ok(Scheme::Fs),
+            "s3" | "s3a" => Ok(Scheme::S3),
             s => Ok(s.parse::<Scheme>()?),
         }
     }
diff --git a/crates/paimon/src/io/storage_s3.rs 
b/crates/paimon/src/io/storage_s3.rs
new file mode 100644
index 0000000..57b77c7
--- /dev/null
+++ b/crates/paimon/src/io/storage_s3.rs
@@ -0,0 +1,415 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::collections::HashMap;
+
+use opendal::services::S3Config;
+use opendal::{Configurator, Operator};
+use url::Url;
+
+use crate::error::Error;
+use crate::Result;
+
+/// Configuration key for S3 endpoint.
+///
+/// Compatible with paimon-java's `s3.endpoint` / `fs.s3a.endpoint`.
+const S3_ENDPOINT: &str = "s3.endpoint";
+
+/// Configuration key for S3 access key ID.
+///
+/// Compatible with paimon-java's `s3.access-key` / `fs.s3a.access.key`.
+const S3_ACCESS_KEY: &str = "s3.access-key";
+
+/// Configuration key for S3 secret key.
+///
+/// Compatible with paimon-java's `s3.secret-key` / `fs.s3a.secret.key`.
+const S3_SECRET_KEY: &str = "s3.secret-key";
+
+/// Configuration key for S3 path-style access.
+///
+/// When set to `true`, uses path-style URLs (`https://s3.endpoint/bucket`)
+/// instead of virtual-hosted style (`https://bucket.s3.endpoint`).
+///
+/// Compatible with paimon-java's `s3.path-style-access` / 
`fs.s3a.path.style.access`.
+const S3_PATH_STYLE_ACCESS: &str = "s3.path-style-access";
+
+/// Configuration key for S3 region.
+///
+/// Compatible with paimon-java's `s3.region`.
+const S3_REGION: &str = "s3.region";
+
+/// Paimon-java config key prefixes that map to S3 configuration.
+///
+/// Java's `S3FileIO` normalizes all of these prefixes to `fs.s3a.*` before
+/// passing them to Hadoop. Here we extract the suffix after matching prefix
+/// and map to our canonical `s3.*` keys.
+///
+/// Reference: `S3FileIO.CONFIG_PREFIXES` in Java Paimon.
+const JAVA_CONFIG_PREFIXES: &[&str] = &["fs.s3a.", "s3a.", "s3."];
+
+/// Mirrored config keys — Java Paimon maps these interchangeably.
+/// Both directions are applied so users can use either form.
+///
+/// Reference: `S3FileIO.MIRRORED_CONFIG_KEYS` in Java Paimon.
+const MIRRORED_KEYS: &[(&str, &str)] = &[
+    ("s3.access-key", "s3.access.key"),
+    ("s3.secret-key", "s3.secret.key"),
+    ("s3.path-style-access", "s3.path.style.access"),
+];
+
+/// Parse paimon catalog options into an [`S3Config`].
+///
+/// Extracts S3-related configuration keys from the provided properties map.
+/// Supports multiple key prefixes for Java compatibility (`s3.`, `s3a.`, 
`fs.s3a.`).
+///
+/// By default, virtual-hosted style addressing is enabled (matching AWS
+/// and Java Paimon behavior). Set `s3.path-style-access=true` to switch
+/// to path-style for S3-compatible stores like MinIO.
+pub(crate) fn s3_config_parse(props: HashMap<String, String>) -> 
Result<S3Config> {
+    let normalized = normalize_config(props);
+
+    let mut cfg = S3Config::default();
+
+    // Default to virtual-hosted style, matching AWS and Java Paimon.
+    // Only disable when path-style-access is explicitly set to true.
+    cfg.enable_virtual_host_style = true;
+
+    // Core connection settings.
+    cfg.endpoint = normalized.get(S3_ENDPOINT).cloned();
+    cfg.access_key_id = normalized.get(S3_ACCESS_KEY).cloned();
+    cfg.secret_access_key = normalized.get(S3_SECRET_KEY).cloned();
+    cfg.region = normalized.get(S3_REGION).cloned();
+
+    if let Some(v) = normalized.get(S3_PATH_STYLE_ACCESS) {
+        if v.eq_ignore_ascii_case("true") {
+            cfg.enable_virtual_host_style = false;
+        }
+    }
+
+    // Session / assume-role credentials.
+    cfg.session_token = normalized.get("s3.session.token").cloned();
+    cfg.role_arn = normalized.get("s3.assumed.role.arn").cloned();
+    cfg.external_id = normalized.get("s3.assumed.role.externalId").cloned();
+    cfg.role_session_name = 
normalized.get("s3.assumed.role.session.name").cloned();
+
+    // Anonymous access.
+    if let Some(v) = normalized.get("s3.anonymous") {
+        if v.eq_ignore_ascii_case("true") {
+            cfg.allow_anonymous = true;
+        }
+    }
+
+    // Server-side encryption.
+    cfg.server_side_encryption = normalized.get("s3.sse.type").cloned();
+    cfg.server_side_encryption_aws_kms_key_id = 
normalized.get("s3.sse.key").cloned();
+    cfg.server_side_encryption_customer_algorithm = 
normalized.get("s3.sse-c.algorithm").cloned();
+    cfg.server_side_encryption_customer_key = 
normalized.get("s3.sse-c.key").cloned();
+    cfg.server_side_encryption_customer_key_md5 = 
normalized.get("s3.sse-c.key.md5").cloned();
+
+    // Storage class.
+    cfg.default_storage_class = normalized.get("s3.storage.class").cloned();
+
+    Ok(cfg)
+}
+
+/// Build an [`Operator`] for the given S3 path.
+///
+/// Parses the bucket name from the `s3://bucket/key` or `s3a://bucket/key`
+/// URL and combines it with the provided [`S3Config`] to construct an
+/// OpenDAL operator.
+pub(crate) fn s3_config_build(cfg: &S3Config, path: &str) -> Result<Operator> {
+    let url = Url::parse(path).map_err(|_| Error::ConfigInvalid {
+        message: format!("Invalid S3 url: {path}"),
+    })?;
+
+    let bucket = url.host_str().ok_or_else(|| Error::ConfigInvalid {
+        message: format!("Invalid S3 url: {path}, missing bucket"),
+    })?;
+
+    let builder = cfg.clone().into_builder().bucket(bucket);
+    Ok(Operator::new(builder)?.finish())
+}
+
+/// Normalize Java-compatible config keys to canonical `s3.*` form.
+///
+/// 1. Strips known prefixes (`fs.s3a.`, `s3a.`, `s3.`) and remaps to `s3.*`.
+/// 2. Applies mirrored key mappings for cross-compatibility.
+/// 3. Earlier prefixes in the list take lower priority (later ones overwrite).
+fn normalize_config(props: HashMap<String, String>) -> HashMap<String, String> 
{
+    let mut result = HashMap::new();
+
+    // First pass: normalize prefixes. Process in priority order —
+    // `fs.s3a.` (lowest) → `s3a.` → `s3.` (highest, canonical).
+    for prefix in JAVA_CONFIG_PREFIXES {
+        for (key, value) in &props {
+            if let Some(suffix) = key.strip_prefix(prefix) {
+                let canonical = format!("s3.{suffix}");
+                result.insert(canonical, value.clone());
+            }
+        }
+    }
+
+    // Second pass: apply mirrored keys bidirectionally (only if target not 
already set).
+    let mirrored_additions: Vec<(String, String)> = MIRRORED_KEYS
+        .iter()
+        .flat_map(|(a, b)| {
+            let mut pairs = Vec::new();
+            // a → b
+            if !result.contains_key(*b) {
+                if let Some(v) = result.get(*a) {
+                    pairs.push((b.to_string(), v.clone()));
+                }
+            }
+            // b → a
+            if !result.contains_key(*a) {
+                if let Some(v) = result.get(*b) {
+                    pairs.push((a.to_string(), v.clone()));
+                }
+            }
+            pairs
+        })
+        .collect();
+
+    for (k, v) in mirrored_additions {
+        result.insert(k, v);
+    }
+
+    result
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+
+    fn make_props(pairs: &[(&str, &str)]) -> HashMap<String, String> {
+        pairs
+            .iter()
+            .map(|(k, v)| (k.to_string(), v.to_string()))
+            .collect()
+    }
+
+    #[test]
+    fn test_s3_config_parse_canonical_keys() {
+        let props = make_props(&[
+            ("s3.endpoint", "https://s3.us-east-1.amazonaws.com";),
+            ("s3.access-key", "AKID"),
+            ("s3.secret-key", "SECRET"),
+            ("s3.region", "us-east-1"),
+        ]);
+
+        let cfg = s3_config_parse(props).unwrap();
+        assert_eq!(
+            cfg.endpoint.as_deref(),
+            Some("https://s3.us-east-1.amazonaws.com";)
+        );
+        assert_eq!(cfg.access_key_id.as_deref(), Some("AKID"));
+        assert_eq!(cfg.secret_access_key.as_deref(), Some("SECRET"));
+        assert_eq!(cfg.region.as_deref(), Some("us-east-1"));
+    }
+
+    #[test]
+    fn test_s3_config_parse_hadoop_prefix() {
+        // Keys with `fs.s3a.` prefix should be normalized to `s3.*`.
+        let props = make_props(&[
+            ("fs.s3a.endpoint", "https://s3.eu-west-1.amazonaws.com";),
+            ("fs.s3a.access.key", "AKID2"),
+            ("fs.s3a.secret.key", "SECRET2"),
+        ]);
+
+        let cfg = s3_config_parse(props).unwrap();
+        assert_eq!(
+            cfg.endpoint.as_deref(),
+            Some("https://s3.eu-west-1.amazonaws.com";)
+        );
+        // `fs.s3a.access.key` → `s3.access.key`, then mirrored → 
`s3.access-key`
+        assert_eq!(cfg.access_key_id.as_deref(), Some("AKID2"));
+        assert_eq!(cfg.secret_access_key.as_deref(), Some("SECRET2"));
+    }
+
+    #[test]
+    fn test_s3_config_parse_s3a_prefix() {
+        let props = make_props(&[
+            ("s3a.endpoint", "https://s3.ap-southeast-1.amazonaws.com";),
+            ("s3a.access-key", "AKID3"),
+            ("s3a.secret-key", "SECRET3"),
+        ]);
+
+        let cfg = s3_config_parse(props).unwrap();
+        assert_eq!(
+            cfg.endpoint.as_deref(),
+            Some("https://s3.ap-southeast-1.amazonaws.com";)
+        );
+        assert_eq!(cfg.access_key_id.as_deref(), Some("AKID3"));
+        assert_eq!(cfg.secret_access_key.as_deref(), Some("SECRET3"));
+    }
+
+    #[test]
+    fn test_s3_config_default_virtual_hosted_style() {
+        // Default should be virtual-hosted style (matching AWS behavior).
+        let props = make_props(&[("s3.endpoint", "https://s3.amazonaws.com";)]);
+
+        let cfg = s3_config_parse(props).unwrap();
+        assert!(cfg.enable_virtual_host_style);
+    }
+
+    #[test]
+    fn test_s3_config_parse_path_style_access_true() {
+        let props = make_props(&[
+            ("s3.endpoint", "https://minio.local:9000";),
+            ("s3.path-style-access", "true"),
+        ]);
+
+        let cfg = s3_config_parse(props).unwrap();
+        assert!(!cfg.enable_virtual_host_style);
+    }
+
+    #[test]
+    fn test_s3_config_parse_path_style_access_false() {
+        // Explicit false should keep virtual-hosted style enabled.
+        let props = make_props(&[
+            ("s3.endpoint", "https://s3.amazonaws.com";),
+            ("s3.path-style-access", "false"),
+        ]);
+
+        let cfg = s3_config_parse(props).unwrap();
+        assert!(cfg.enable_virtual_host_style);
+    }
+
+    #[test]
+    fn test_s3_config_parse_no_credentials() {
+        // Only endpoint, no credentials — valid for IAM role / env var auth.
+        let props = make_props(&[("s3.endpoint", "https://s3.amazonaws.com";)]);
+
+        let cfg = s3_config_parse(props).unwrap();
+        assert_eq!(cfg.endpoint.as_deref(), Some("https://s3.amazonaws.com";));
+        assert!(cfg.access_key_id.is_none());
+        assert!(cfg.secret_access_key.is_none());
+    }
+
+    #[test]
+    fn test_s3_config_parse_empty_props() {
+        let cfg = s3_config_parse(HashMap::new()).unwrap();
+        assert!(cfg.endpoint.is_none());
+        assert!(cfg.access_key_id.is_none());
+        // Even with empty props, virtual-hosted style should be the default.
+        assert!(cfg.enable_virtual_host_style);
+    }
+
+    #[test]
+    fn test_s3_config_parse_session_and_role() {
+        let props = make_props(&[
+            ("s3.endpoint", "https://s3.amazonaws.com";),
+            ("s3.session.token", "TOKEN"),
+            ("s3.assumed.role.arn", "arn:aws:iam::123456:role/test"),
+            ("s3.assumed.role.externalId", "ext-id"),
+        ]);
+
+        let cfg = s3_config_parse(props).unwrap();
+        assert_eq!(cfg.session_token.as_deref(), Some("TOKEN"));
+        assert_eq!(
+            cfg.role_arn.as_deref(),
+            Some("arn:aws:iam::123456:role/test")
+        );
+        assert_eq!(cfg.external_id.as_deref(), Some("ext-id"));
+    }
+
+    #[test]
+    fn test_s3_config_parse_sse() {
+        let props = make_props(&[("s3.sse.type", "aws:kms"), ("s3.sse.key", 
"my-kms-key-id")]);
+
+        let cfg = s3_config_parse(props).unwrap();
+        assert_eq!(cfg.server_side_encryption.as_deref(), Some("aws:kms"));
+        assert_eq!(
+            cfg.server_side_encryption_aws_kms_key_id.as_deref(),
+            Some("my-kms-key-id")
+        );
+    }
+
+    #[test]
+    fn test_s3_config_unrelated_keys_ignored() {
+        // Keys that don't match any S3 prefix should not affect config.
+        let props = make_props(&[
+            ("fs.oss.endpoint", "https://oss.aliyuncs.com";),
+            ("hive.metastore.uris", "thrift://localhost:9083"),
+            ("s3.endpoint", "https://s3.amazonaws.com";),
+        ]);
+
+        let cfg = s3_config_parse(props).unwrap();
+        assert_eq!(cfg.endpoint.as_deref(), Some("https://s3.amazonaws.com";));
+    }
+
+    #[test]
+    fn test_s3_config_build_extracts_bucket() {
+        let mut cfg = S3Config::default();
+        cfg.endpoint = Some("https://s3.us-east-1.amazonaws.com".to_string());
+        cfg.region = Some("us-east-1".to_string());
+
+        let op = s3_config_build(&cfg, "s3://my-bucket/some/path").unwrap();
+        assert_eq!(op.info().name(), "my-bucket");
+    }
+
+    #[test]
+    fn test_s3_config_build_s3a_scheme() {
+        let mut cfg = S3Config::default();
+        cfg.endpoint = Some("https://s3.us-east-1.amazonaws.com".to_string());
+        cfg.region = Some("us-east-1".to_string());
+
+        let op = s3_config_build(&cfg, "s3a://my-bucket/some/path").unwrap();
+        assert_eq!(op.info().name(), "my-bucket");
+    }
+
+    #[test]
+    fn test_s3_config_build_invalid_url() {
+        let cfg = S3Config::default();
+        let result = s3_config_build(&cfg, "not-a-valid-url");
+        assert!(result.is_err());
+    }
+
+    #[test]
+    fn test_s3_config_build_missing_bucket() {
+        let cfg = S3Config::default();
+        let result = s3_config_build(&cfg, "s3:///path/without/bucket");
+        assert!(result.is_err());
+    }
+
+    #[test]
+    fn test_mirrored_keys() {
+        // `s3.access.key` (dot form) should be mirrored from `s3.access-key` 
(dash form)
+        let props = make_props(&[("s3.access-key", "AKID")]);
+        let normalized = normalize_config(props);
+        assert_eq!(
+            normalized.get("s3.access.key").map(|s| s.as_str()),
+            Some("AKID")
+        );
+        assert_eq!(
+            normalized.get("s3.access-key").map(|s| s.as_str()),
+            Some("AKID")
+        );
+    }
+
+    #[test]
+    fn test_canonical_overrides_hadoop_prefix() {
+        // `s3.endpoint` should take priority over `fs.s3a.endpoint`.
+        let props = make_props(&[
+            ("fs.s3a.endpoint", "https://old.endpoint.com";),
+            ("s3.endpoint", "https://new.endpoint.com";),
+        ]);
+
+        let cfg = s3_config_parse(props).unwrap();
+        assert_eq!(cfg.endpoint.as_deref(), Some("https://new.endpoint.com";));
+    }
+}

Reply via email to