This is an automated email from the ASF dual-hosted git repository.

hgruszecki pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git


The following commit(s) were added to refs/heads/master by this push:
     new af9273081 fix(connectors): support default credential provider chain 
for iceberg sink (#3045)
af9273081 is described below

commit af9273081b6f0771a48e4b58ec08f1d513bb271c
Author: omnitrix <[email protected]>
AuthorDate: Sat Apr 11 05:16:34 2026 -0700

    fix(connectors): support default credential provider chain for iceberg sink 
(#3045)
---
 core/connectors/sinks/iceberg_sink/src/lib.rs      |  4 +-
 core/connectors/sinks/iceberg_sink/src/props.rs    | 93 ++++++++++++++++++++--
 core/connectors/sinks/iceberg_sink/src/sink.rs     | 41 ++++++----
 .../tests/connectors/fixtures/iceberg/container.rs | 43 ++++++++++
 .../tests/connectors/fixtures/iceberg/mod.rs       |  4 +-
 core/integration/tests/connectors/fixtures/mod.rs  |  4 +-
 .../iceberg/default_credentials_config/config.toml | 45 +++++++++++
 .../tests/connectors/iceberg/iceberg_sink.rs       | 54 ++++++++++++-
 .../iceberg/sink_default_credentials.toml          | 20 +++++
 9 files changed, 279 insertions(+), 29 deletions(-)

diff --git a/core/connectors/sinks/iceberg_sink/src/lib.rs 
b/core/connectors/sinks/iceberg_sink/src/lib.rs
index 4d3954086..33c091728 100644
--- a/core/connectors/sinks/iceberg_sink/src/lib.rs
+++ b/core/connectors/sinks/iceberg_sink/src/lib.rs
@@ -60,8 +60,8 @@ pub struct IcebergSinkConfig {
     pub dynamic_routing: bool,
     pub dynamic_route_field: String,
     pub store_url: String,
-    pub store_access_key_id: String,
-    pub store_secret_access_key: String,
+    pub store_access_key_id: Option<String>,
+    pub store_secret_access_key: Option<String>,
     pub store_region: String,
     pub store_class: IcebergSinkStoreClass,
 }
diff --git a/core/connectors/sinks/iceberg_sink/src/props.rs 
b/core/connectors/sinks/iceberg_sink/src/props.rs
index aa01d6d00..b2107de15 100644
--- a/core/connectors/sinks/iceberg_sink/src/props.rs
+++ b/core/connectors/sinks/iceberg_sink/src/props.rs
@@ -30,14 +30,91 @@ pub fn init_props(config: &IcebergSinkConfig) -> 
Result<HashMap<String, String>,
 fn get_props_s3(config: &IcebergSinkConfig) -> Result<HashMap<String, String>, 
Error> {
     let mut props: HashMap<String, String> = HashMap::new();
     props.insert("s3.region".to_string(), config.store_region.clone());
-    props.insert(
-        "s3.access-key-id".to_string(),
-        config.store_access_key_id.clone(),
-    );
-    props.insert(
-        "s3.secret-access-key".to_string(),
-        config.store_secret_access_key.clone(),
-    );
     props.insert("s3.endpoint".to_string(), config.store_url.clone());
+    match (&config.store_access_key_id, &config.store_secret_access_key) {
+        (Some(access_key_id), Some(secret_access_key)) => {
+            props.insert("s3.access-key-id".to_string(), 
access_key_id.clone());
+            props.insert(
+                "s3.secret-access-key".to_string(),
+                secret_access_key.clone(),
+            );
+        }
+        (None, None) => {}
+        _ => {
+            return Err(Error::InvalidConfigValue("Partially configured 
credentials. You must provide both store_access_key_id and 
store_secret_access_key, or omit both.".to_owned()));
+        }
+    }
     Ok(props)
 }
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use crate::{IcebergSinkConfig, IcebergSinkStoreClass, IcebergSinkTypes};
+
+    fn base_config() -> IcebergSinkConfig {
+        IcebergSinkConfig {
+            tables: vec![],
+            catalog_type: IcebergSinkTypes::REST,
+            warehouse: "warehouse".to_string(),
+            uri: "http://localhost:8181".to_string(),
+            dynamic_routing: false,
+            dynamic_route_field: "".to_string(),
+            store_url: "http://localhost:9000".to_string(),
+            store_access_key_id: None,
+            store_secret_access_key: None,
+            store_region: "us-east-1".to_string(),
+            store_class: IcebergSinkStoreClass::S3,
+        }
+    }
+
+    #[test]
+    fn test_get_props_s3_no_credentials() {
+        let config = base_config();
+        let props = get_props_s3(&config).expect("Should succeed without 
credentials");
+        assert_eq!(props.get("s3.region").unwrap(), "us-east-1");
+        assert_eq!(props.get("s3.endpoint").unwrap(), "http://localhost:9000";);
+        assert!(!props.contains_key("s3.access-key-id"));
+        assert!(!props.contains_key("s3.secret-access-key"));
+    }
+
+    #[test]
+    fn test_get_props_s3_full_credentials() {
+        let config = IcebergSinkConfig {
+            store_access_key_id: Some("admin".to_string()),
+            store_secret_access_key: Some("password".to_string()),
+            ..base_config()
+        };
+        let props = get_props_s3(&config).expect("Should succeed with full 
credentials");
+        assert_eq!(props.get("s3.region").unwrap(), "us-east-1");
+        assert_eq!(props.get("s3.endpoint").unwrap(), "http://localhost:9000";);
+        assert_eq!(props.get("s3.access-key-id").unwrap(), "admin");
+        assert_eq!(props.get("s3.secret-access-key").unwrap(), "password");
+    }
+
+    #[test]
+    fn test_get_props_s3_partial_access_key() {
+        let config = IcebergSinkConfig {
+            store_access_key_id: Some("admin".to_string()),
+            store_secret_access_key: None,
+            ..base_config()
+        };
+        assert!(
+            get_props_s3(&config).is_err(),
+            "Partial credentials (only access_key_id) should be rejected"
+        );
+    }
+
+    #[test]
+    fn test_get_props_s3_partial_secret_key() {
+        let config = IcebergSinkConfig {
+            store_access_key_id: None,
+            store_secret_access_key: Some("password".to_string()),
+            ..base_config()
+        };
+        assert!(
+            get_props_s3(&config).is_err(),
+            "Partial credentials (only secret_access_key) should be rejected"
+        );
+    }
+}
diff --git a/core/connectors/sinks/iceberg_sink/src/sink.rs 
b/core/connectors/sinks/iceberg_sink/src/sink.rs
index 1616e68aa..2cc128226 100644
--- a/core/connectors/sinks/iceberg_sink/src/sink.rs
+++ b/core/connectors/sinks/iceberg_sink/src/sink.rs
@@ -30,22 +30,31 @@ use tracing::{debug, error, info};
 #[async_trait]
 impl Sink for IcebergSink {
     async fn open(&mut self) -> Result<(), Error> {
-        let redacted_store_key = self
-            .config
-            .store_access_key_id
-            .chars()
-            .take(3)
-            .collect::<String>();
-        let redacted_store_secret = self
-            .config
-            .store_secret_access_key
-            .chars()
-            .take(3)
-            .collect::<String>();
-        info!(
-            "Opened Iceberg sink connector with ID: {} for URL: {}, store 
access key ID: {redacted_store_key}***  store secret: 
{redacted_store_secret}***",
-            self.id, self.config.uri
-        );
+        match (
+            &self.config.store_access_key_id,
+            &self.config.store_secret_access_key,
+        ) {
+            (Some(store_access_key_id), Some(store_secret_access_key)) => {
+                let redacted_store_key = 
store_access_key_id.chars().take(3).collect::<String>();
+                let redacted_store_secret =
+                    
store_secret_access_key.chars().take(3).collect::<String>();
+                info!(
+                    "Opened Iceberg sink connector with ID: {} for URL: {}, 
store access key ID: {redacted_store_key}***  store secret: 
{redacted_store_secret}***",
+                    self.id, self.config.uri
+                );
+            }
+            (None, None) => {
+                info!(
+                    "Opened Iceberg sink connector with ID: {} for URL: {}. No 
explicit credentials provided, falling back to default credential provider 
chain",
+                    self.id, self.config.uri
+                );
+            }
+            _ => {
+                return Err(Error::InvalidConfigValue(
+                    "Partially configured credentials. You must provide both 
store_access_key_id and store_secret_access_key, or omit both.".to_owned(),
+                ));
+            }
+        }
 
         info!(
             "Configuring Iceberg catalog with the following config:\n-region: 
{}\n-url: {}\n-store class: {}\n-catalog type: {}\n",
diff --git a/core/integration/tests/connectors/fixtures/iceberg/container.rs 
b/core/integration/tests/connectors/fixtures/iceberg/container.rs
index e0fb27215..4c0f4088b 100644
--- a/core/integration/tests/connectors/fixtures/iceberg/container.rs
+++ b/core/integration/tests/connectors/fixtures/iceberg/container.rs
@@ -52,6 +52,9 @@ pub const ENV_SINK_STORE_SECRET: &str =
 pub const ENV_SINK_STORE_REGION: &str = 
"IGGY_CONNECTORS_SINK_ICEBERG_PLUGIN_CONFIG_STORE_REGION";
 pub const ENV_SINK_PATH: &str = "IGGY_CONNECTORS_SINK_ICEBERG_PATH";
 
+pub const ENV_AWS_ACCESS_KEY_ID: &str = "AWS_ACCESS_KEY_ID";
+pub const ENV_AWS_SECRET_ACCESS_KEY: &str = "AWS_SECRET_ACCESS_KEY";
+
 pub struct MinioContainer {
     #[allow(dead_code)]
     container: ContainerAsync<GenericImage>,
@@ -532,3 +535,43 @@ impl TestFixture for IcebergPreCreatedFixture {
         self.inner.connectors_runtime_envs()
     }
 }
+
+pub struct IcebergEnvAuthFixture {
+    inner: IcebergPreCreatedFixture,
+}
+
+impl IcebergOps for IcebergEnvAuthFixture {
+    fn catalog_url(&self) -> &str {
+        self.inner.catalog_url()
+    }
+
+    fn http_client(&self) -> &HttpClient {
+        self.inner.http_client()
+    }
+}
+
+#[async_trait]
+impl TestFixture for IcebergEnvAuthFixture {
+    async fn setup() -> Result<Self, TestBinaryError> {
+        let inner = IcebergPreCreatedFixture::setup().await?;
+        Ok(Self { inner })
+    }
+
+    fn connectors_runtime_envs(&self) -> HashMap<String, String> {
+        let mut envs = self.inner.connectors_runtime_envs();
+        // Remove the explicit credentials injected by the underlying fixture.
+        // This forces the Iceberg Sink to use the default credential provider 
chain instead of explicit config.
+        envs.remove(ENV_SINK_STORE_ACCESS_KEY);
+        envs.remove(ENV_SINK_STORE_SECRET);
+        // Inject standard AWS env vars to test the default credential 
provider chain.
+        envs.insert(
+            ENV_AWS_ACCESS_KEY_ID.to_string(),
+            MINIO_ACCESS_KEY.to_string(),
+        );
+        envs.insert(
+            ENV_AWS_SECRET_ACCESS_KEY.to_string(),
+            MINIO_SECRET_KEY.to_string(),
+        );
+        envs
+    }
+}
diff --git a/core/integration/tests/connectors/fixtures/iceberg/mod.rs 
b/core/integration/tests/connectors/fixtures/iceberg/mod.rs
index 30c047f35..4724d841d 100644
--- a/core/integration/tests/connectors/fixtures/iceberg/mod.rs
+++ b/core/integration/tests/connectors/fixtures/iceberg/mod.rs
@@ -19,4 +19,6 @@
 
 mod container;
 
-pub use container::{DEFAULT_NAMESPACE, DEFAULT_TABLE, IcebergOps, 
IcebergPreCreatedFixture};
+pub use container::{
+    DEFAULT_NAMESPACE, DEFAULT_TABLE, IcebergEnvAuthFixture, IcebergOps, 
IcebergPreCreatedFixture,
+};
diff --git a/core/integration/tests/connectors/fixtures/mod.rs 
b/core/integration/tests/connectors/fixtures/mod.rs
index 8e0009bb3..2aebf97fd 100644
--- a/core/integration/tests/connectors/fixtures/mod.rs
+++ b/core/integration/tests/connectors/fixtures/mod.rs
@@ -31,7 +31,9 @@ pub use http::{
     HttpSinkIndividualFixture, HttpSinkJsonArrayFixture, 
HttpSinkMultiTopicFixture,
     HttpSinkNdjsonFixture, HttpSinkNoMetadataFixture, HttpSinkRawFixture,
 };
-pub use iceberg::{DEFAULT_NAMESPACE, DEFAULT_TABLE, IcebergOps, 
IcebergPreCreatedFixture};
+pub use iceberg::{
+    DEFAULT_NAMESPACE, DEFAULT_TABLE, IcebergEnvAuthFixture, IcebergOps, 
IcebergPreCreatedFixture,
+};
 pub use influxdb::{
     InfluxDbSinkBase64Fixture, InfluxDbSinkFixture, 
InfluxDbSinkNoMetadataFixture,
     InfluxDbSinkNsPrecisionFixture, InfluxDbSinkTextFixture, 
InfluxDbSourceFixture,
diff --git 
a/core/integration/tests/connectors/iceberg/default_credentials_config/config.toml
 
b/core/integration/tests/connectors/iceberg/default_credentials_config/config.toml
new file mode 100644
index 000000000..5c6ffc6b7
--- /dev/null
+++ 
b/core/integration/tests/connectors/iceberg/default_credentials_config/config.toml
@@ -0,0 +1,45 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+type = "sink"
+key = "iceberg"
+enabled = true
+version = 0
+name = "Iceberg sink"
+path = "../../target/debug/libiggy_connector_iceberg_sink"
+verbose = true
+
+[[streams]]
+stream = "test_stream"
+topics = ["test_topic"]
+schema = "json"
+batch_length = 100
+poll_interval = "5ms"
+consumer_group = "iceberg_sink_connector"
+
+# Notice: This configuration deliberately omits 'store_access_key_id' and 
'store_secret_access_key'.
+# It is used exclusively by integration tests to test the default credential 
provider chain fallback behavior.
+[plugin_config]
+tables = ["test.messages"]
+catalog_type = "rest"
+warehouse = "warehouse"
+uri = "http://localhost:8181";
+dynamic_routing = false
+dynamic_route_field = ""
+store_url = "http://localhost:9000";
+store_region = "us-east-1"
+store_class = "s3"
diff --git a/core/integration/tests/connectors/iceberg/iceberg_sink.rs 
b/core/integration/tests/connectors/iceberg/iceberg_sink.rs
index 777214116..b40688fdc 100644
--- a/core/integration/tests/connectors/iceberg/iceberg_sink.rs
+++ b/core/integration/tests/connectors/iceberg/iceberg_sink.rs
@@ -19,7 +19,7 @@
 
 use crate::connectors::create_test_messages;
 use crate::connectors::fixtures::{
-    DEFAULT_NAMESPACE, DEFAULT_TABLE, IcebergOps, IcebergPreCreatedFixture,
+    DEFAULT_NAMESPACE, DEFAULT_TABLE, IcebergEnvAuthFixture, IcebergOps, 
IcebergPreCreatedFixture,
 };
 use bytes::Bytes;
 use iggy::prelude::{IggyMessage, Partitioning};
@@ -207,3 +207,55 @@ async fn iceberg_sink_handles_bulk_messages(
     assert_eq!(sinks.len(), 1);
     assert!(sinks[0].last_error.is_none());
 }
+
+#[iggy_harness(
+    server(connectors_runtime(config_path = 
"tests/connectors/iceberg/sink_default_credentials.toml")),
+    seed = seeds::connector_stream
+)]
+async fn iceberg_sink_uses_default_credential_chain(
+    harness: &TestHarness,
+    fixture: IcebergEnvAuthFixture,
+) {
+    let client = harness
+        .root_client()
+        .await
+        .expect("Failed to get root client");
+    let stream_id: iggy_common::Identifier =
+        seeds::names::STREAM.try_into().expect("Invalid stream id");
+    let topic_id: iggy_common::Identifier =
+        seeds::names::TOPIC.try_into().expect("Invalid topic id");
+    let test_messages = crate::connectors::create_test_messages(5);
+    let mut messages: Vec<IggyMessage> = test_messages
+        .iter()
+        .enumerate()
+        .map(|(i, msg)| {
+            let payload = serde_json::to_vec(msg).expect("Failed to serialize 
message");
+            IggyMessage::builder()
+                .id((i + 1) as u128)
+                .payload(bytes::Bytes::from(payload))
+                .build()
+                .expect("Failed to build message")
+        })
+        .collect();
+    client
+        .send_messages(
+            &stream_id,
+            &topic_id,
+            &Partitioning::partition_id(0),
+            &mut messages,
+        )
+        .await
+        .expect("Failed to send fake messages");
+    let snapshot_count = fixture
+        .wait_for_snapshots(
+            DEFAULT_NAMESPACE,
+            DEFAULT_TABLE,
+            1,
+            SNAPSHOT_POLL_ATTEMPTS,
+            SNAPSHOT_POLL_INTERVAL_MS,
+        )
+        .await
+        .expect("Data should be written to Iceberg table");
+    assert!(snapshot_count >= 1);
+    drop(fixture);
+}
diff --git 
a/core/integration/tests/connectors/iceberg/sink_default_credentials.toml 
b/core/integration/tests/connectors/iceberg/sink_default_credentials.toml
new file mode 100644
index 000000000..c9ff51ebf
--- /dev/null
+++ b/core/integration/tests/connectors/iceberg/sink_default_credentials.toml
@@ -0,0 +1,20 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+[connectors]
+config_type = "local"
+config_dir = "tests/connectors/iceberg/default_credentials_config"

Reply via email to