This is an automated email from the ASF dual-hosted git repository.
hgruszecki pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git
The following commit(s) were added to refs/heads/master by this push:
new af9273081 fix(connectors): support default credential provider chain
for iceberg sink (#3045)
af9273081 is described below
commit af9273081b6f0771a48e4b58ec08f1d513bb271c
Author: omnitrix <[email protected]>
AuthorDate: Sat Apr 11 05:16:34 2026 -0700
fix(connectors): support default credential provider chain for iceberg sink
(#3045)
---
core/connectors/sinks/iceberg_sink/src/lib.rs | 4 +-
core/connectors/sinks/iceberg_sink/src/props.rs | 93 ++++++++++++++++++++--
core/connectors/sinks/iceberg_sink/src/sink.rs | 41 ++++++----
.../tests/connectors/fixtures/iceberg/container.rs | 43 ++++++++++
.../tests/connectors/fixtures/iceberg/mod.rs | 4 +-
core/integration/tests/connectors/fixtures/mod.rs | 4 +-
.../iceberg/default_credentials_config/config.toml | 45 +++++++++++
.../tests/connectors/iceberg/iceberg_sink.rs | 54 ++++++++++++-
.../iceberg/sink_default_credentials.toml | 20 +++++
9 files changed, 279 insertions(+), 29 deletions(-)
diff --git a/core/connectors/sinks/iceberg_sink/src/lib.rs
b/core/connectors/sinks/iceberg_sink/src/lib.rs
index 4d3954086..33c091728 100644
--- a/core/connectors/sinks/iceberg_sink/src/lib.rs
+++ b/core/connectors/sinks/iceberg_sink/src/lib.rs
@@ -60,8 +60,8 @@ pub struct IcebergSinkConfig {
pub dynamic_routing: bool,
pub dynamic_route_field: String,
pub store_url: String,
- pub store_access_key_id: String,
- pub store_secret_access_key: String,
+ pub store_access_key_id: Option<String>,
+ pub store_secret_access_key: Option<String>,
pub store_region: String,
pub store_class: IcebergSinkStoreClass,
}
diff --git a/core/connectors/sinks/iceberg_sink/src/props.rs
b/core/connectors/sinks/iceberg_sink/src/props.rs
index aa01d6d00..b2107de15 100644
--- a/core/connectors/sinks/iceberg_sink/src/props.rs
+++ b/core/connectors/sinks/iceberg_sink/src/props.rs
@@ -30,14 +30,91 @@ pub fn init_props(config: &IcebergSinkConfig) ->
Result<HashMap<String, String>,
fn get_props_s3(config: &IcebergSinkConfig) -> Result<HashMap<String, String>,
Error> {
let mut props: HashMap<String, String> = HashMap::new();
props.insert("s3.region".to_string(), config.store_region.clone());
- props.insert(
- "s3.access-key-id".to_string(),
- config.store_access_key_id.clone(),
- );
- props.insert(
- "s3.secret-access-key".to_string(),
- config.store_secret_access_key.clone(),
- );
props.insert("s3.endpoint".to_string(), config.store_url.clone());
+ match (&config.store_access_key_id, &config.store_secret_access_key) {
+ (Some(access_key_id), Some(secret_access_key)) => {
+ props.insert("s3.access-key-id".to_string(),
access_key_id.clone());
+ props.insert(
+ "s3.secret-access-key".to_string(),
+ secret_access_key.clone(),
+ );
+ }
+ (None, None) => {}
+ _ => {
+ return Err(Error::InvalidConfigValue("Partially configured
credentials. You must provide both store_access_key_id and
store_secret_access_key, or omit both.".to_owned()));
+ }
+ }
Ok(props)
}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use crate::{IcebergSinkConfig, IcebergSinkStoreClass, IcebergSinkTypes};
+
+ fn base_config() -> IcebergSinkConfig {
+ IcebergSinkConfig {
+ tables: vec![],
+ catalog_type: IcebergSinkTypes::REST,
+ warehouse: "warehouse".to_string(),
+ uri: "http://localhost:8181".to_string(),
+ dynamic_routing: false,
+ dynamic_route_field: "".to_string(),
+ store_url: "http://localhost:9000".to_string(),
+ store_access_key_id: None,
+ store_secret_access_key: None,
+ store_region: "us-east-1".to_string(),
+ store_class: IcebergSinkStoreClass::S3,
+ }
+ }
+
+ #[test]
+ fn test_get_props_s3_no_credentials() {
+ let config = base_config();
+ let props = get_props_s3(&config).expect("Should succeed without
credentials");
+ assert_eq!(props.get("s3.region").unwrap(), "us-east-1");
+ assert_eq!(props.get("s3.endpoint").unwrap(), "http://localhost:9000");
+ assert!(!props.contains_key("s3.access-key-id"));
+ assert!(!props.contains_key("s3.secret-access-key"));
+ }
+
+ #[test]
+ fn test_get_props_s3_full_credentials() {
+ let config = IcebergSinkConfig {
+ store_access_key_id: Some("admin".to_string()),
+ store_secret_access_key: Some("password".to_string()),
+ ..base_config()
+ };
+ let props = get_props_s3(&config).expect("Should succeed with full
credentials");
+ assert_eq!(props.get("s3.region").unwrap(), "us-east-1");
+ assert_eq!(props.get("s3.endpoint").unwrap(), "http://localhost:9000");
+ assert_eq!(props.get("s3.access-key-id").unwrap(), "admin");
+ assert_eq!(props.get("s3.secret-access-key").unwrap(), "password");
+ }
+
+ #[test]
+ fn test_get_props_s3_partial_access_key() {
+ let config = IcebergSinkConfig {
+ store_access_key_id: Some("admin".to_string()),
+ store_secret_access_key: None,
+ ..base_config()
+ };
+ assert!(
+ get_props_s3(&config).is_err(),
+ "Partial credentials (only access_key_id) should be rejected"
+ );
+ }
+
+ #[test]
+ fn test_get_props_s3_partial_secret_key() {
+ let config = IcebergSinkConfig {
+ store_access_key_id: None,
+ store_secret_access_key: Some("password".to_string()),
+ ..base_config()
+ };
+ assert!(
+ get_props_s3(&config).is_err(),
+ "Partial credentials (only secret_access_key) should be rejected"
+ );
+ }
+}
diff --git a/core/connectors/sinks/iceberg_sink/src/sink.rs
b/core/connectors/sinks/iceberg_sink/src/sink.rs
index 1616e68aa..2cc128226 100644
--- a/core/connectors/sinks/iceberg_sink/src/sink.rs
+++ b/core/connectors/sinks/iceberg_sink/src/sink.rs
@@ -30,22 +30,31 @@ use tracing::{debug, error, info};
#[async_trait]
impl Sink for IcebergSink {
async fn open(&mut self) -> Result<(), Error> {
- let redacted_store_key = self
- .config
- .store_access_key_id
- .chars()
- .take(3)
- .collect::<String>();
- let redacted_store_secret = self
- .config
- .store_secret_access_key
- .chars()
- .take(3)
- .collect::<String>();
- info!(
- "Opened Iceberg sink connector with ID: {} for URL: {}, store
access key ID: {redacted_store_key}*** store secret:
{redacted_store_secret}***",
- self.id, self.config.uri
- );
+ match (
+ &self.config.store_access_key_id,
+ &self.config.store_secret_access_key,
+ ) {
+ (Some(store_access_key_id), Some(store_secret_access_key)) => {
+ let redacted_store_key =
store_access_key_id.chars().take(3).collect::<String>();
+ let redacted_store_secret =
+
store_secret_access_key.chars().take(3).collect::<String>();
+ info!(
+ "Opened Iceberg sink connector with ID: {} for URL: {},
store access key ID: {redacted_store_key}*** store secret:
{redacted_store_secret}***",
+ self.id, self.config.uri
+ );
+ }
+ (None, None) => {
+ info!(
+ "Opened Iceberg sink connector with ID: {} for URL: {}. No
explicit credentials provided, falling back to default credential provider
chain",
+ self.id, self.config.uri
+ );
+ }
+ _ => {
+ return Err(Error::InvalidConfigValue(
+ "Partially configured credentials. You must provide both
store_access_key_id and store_secret_access_key, or omit both.".to_owned(),
+ ));
+ }
+ }
info!(
"Configuring Iceberg catalog with the following config:\n-region:
{}\n-url: {}\n-store class: {}\n-catalog type: {}\n",
diff --git a/core/integration/tests/connectors/fixtures/iceberg/container.rs
b/core/integration/tests/connectors/fixtures/iceberg/container.rs
index e0fb27215..4c0f4088b 100644
--- a/core/integration/tests/connectors/fixtures/iceberg/container.rs
+++ b/core/integration/tests/connectors/fixtures/iceberg/container.rs
@@ -52,6 +52,9 @@ pub const ENV_SINK_STORE_SECRET: &str =
pub const ENV_SINK_STORE_REGION: &str =
"IGGY_CONNECTORS_SINK_ICEBERG_PLUGIN_CONFIG_STORE_REGION";
pub const ENV_SINK_PATH: &str = "IGGY_CONNECTORS_SINK_ICEBERG_PATH";
+pub const ENV_AWS_ACCESS_KEY_ID: &str = "AWS_ACCESS_KEY_ID";
+pub const ENV_AWS_SECRET_ACCESS_KEY: &str = "AWS_SECRET_ACCESS_KEY";
+
pub struct MinioContainer {
#[allow(dead_code)]
container: ContainerAsync<GenericImage>,
@@ -532,3 +535,43 @@ impl TestFixture for IcebergPreCreatedFixture {
self.inner.connectors_runtime_envs()
}
}
+
+pub struct IcebergEnvAuthFixture {
+ inner: IcebergPreCreatedFixture,
+}
+
+impl IcebergOps for IcebergEnvAuthFixture {
+ fn catalog_url(&self) -> &str {
+ self.inner.catalog_url()
+ }
+
+ fn http_client(&self) -> &HttpClient {
+ self.inner.http_client()
+ }
+}
+
+#[async_trait]
+impl TestFixture for IcebergEnvAuthFixture {
+ async fn setup() -> Result<Self, TestBinaryError> {
+ let inner = IcebergPreCreatedFixture::setup().await?;
+ Ok(Self { inner })
+ }
+
+ fn connectors_runtime_envs(&self) -> HashMap<String, String> {
+ let mut envs = self.inner.connectors_runtime_envs();
+ // Remove the explicit credentials injected by the underlying fixture.
+ // This forces the Iceberg Sink to use the default credential provider
chain instead of explicit config.
+ envs.remove(ENV_SINK_STORE_ACCESS_KEY);
+ envs.remove(ENV_SINK_STORE_SECRET);
+ // Inject standard AWS env vars to test the default credential
provider chain.
+ envs.insert(
+ ENV_AWS_ACCESS_KEY_ID.to_string(),
+ MINIO_ACCESS_KEY.to_string(),
+ );
+ envs.insert(
+ ENV_AWS_SECRET_ACCESS_KEY.to_string(),
+ MINIO_SECRET_KEY.to_string(),
+ );
+ envs
+ }
+}
diff --git a/core/integration/tests/connectors/fixtures/iceberg/mod.rs
b/core/integration/tests/connectors/fixtures/iceberg/mod.rs
index 30c047f35..4724d841d 100644
--- a/core/integration/tests/connectors/fixtures/iceberg/mod.rs
+++ b/core/integration/tests/connectors/fixtures/iceberg/mod.rs
@@ -19,4 +19,6 @@
mod container;
-pub use container::{DEFAULT_NAMESPACE, DEFAULT_TABLE, IcebergOps,
IcebergPreCreatedFixture};
+pub use container::{
+ DEFAULT_NAMESPACE, DEFAULT_TABLE, IcebergEnvAuthFixture, IcebergOps,
IcebergPreCreatedFixture,
+};
diff --git a/core/integration/tests/connectors/fixtures/mod.rs
b/core/integration/tests/connectors/fixtures/mod.rs
index 8e0009bb3..2aebf97fd 100644
--- a/core/integration/tests/connectors/fixtures/mod.rs
+++ b/core/integration/tests/connectors/fixtures/mod.rs
@@ -31,7 +31,9 @@ pub use http::{
HttpSinkIndividualFixture, HttpSinkJsonArrayFixture,
HttpSinkMultiTopicFixture,
HttpSinkNdjsonFixture, HttpSinkNoMetadataFixture, HttpSinkRawFixture,
};
-pub use iceberg::{DEFAULT_NAMESPACE, DEFAULT_TABLE, IcebergOps,
IcebergPreCreatedFixture};
+pub use iceberg::{
+ DEFAULT_NAMESPACE, DEFAULT_TABLE, IcebergEnvAuthFixture, IcebergOps,
IcebergPreCreatedFixture,
+};
pub use influxdb::{
InfluxDbSinkBase64Fixture, InfluxDbSinkFixture,
InfluxDbSinkNoMetadataFixture,
InfluxDbSinkNsPrecisionFixture, InfluxDbSinkTextFixture,
InfluxDbSourceFixture,
diff --git
a/core/integration/tests/connectors/iceberg/default_credentials_config/config.toml
b/core/integration/tests/connectors/iceberg/default_credentials_config/config.toml
new file mode 100644
index 000000000..5c6ffc6b7
--- /dev/null
+++
b/core/integration/tests/connectors/iceberg/default_credentials_config/config.toml
@@ -0,0 +1,45 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+type = "sink"
+key = "iceberg"
+enabled = true
+version = 0
+name = "Iceberg sink"
+path = "../../target/debug/libiggy_connector_iceberg_sink"
+verbose = true
+
+[[streams]]
+stream = "test_stream"
+topics = ["test_topic"]
+schema = "json"
+batch_length = 100
+poll_interval = "5ms"
+consumer_group = "iceberg_sink_connector"
+
+# Notice: This configuration deliberately omits 'store_access_key_id' and
'store_secret_access_key'.
+# It is used exclusively by integration tests to test the default credential
provider chain fallback behavior.
+[plugin_config]
+tables = ["test.messages"]
+catalog_type = "rest"
+warehouse = "warehouse"
+uri = "http://localhost:8181"
+dynamic_routing = false
+dynamic_route_field = ""
+store_url = "http://localhost:9000"
+store_region = "us-east-1"
+store_class = "s3"
diff --git a/core/integration/tests/connectors/iceberg/iceberg_sink.rs
b/core/integration/tests/connectors/iceberg/iceberg_sink.rs
index 777214116..b40688fdc 100644
--- a/core/integration/tests/connectors/iceberg/iceberg_sink.rs
+++ b/core/integration/tests/connectors/iceberg/iceberg_sink.rs
@@ -19,7 +19,7 @@
use crate::connectors::create_test_messages;
use crate::connectors::fixtures::{
- DEFAULT_NAMESPACE, DEFAULT_TABLE, IcebergOps, IcebergPreCreatedFixture,
+ DEFAULT_NAMESPACE, DEFAULT_TABLE, IcebergEnvAuthFixture, IcebergOps,
IcebergPreCreatedFixture,
};
use bytes::Bytes;
use iggy::prelude::{IggyMessage, Partitioning};
@@ -207,3 +207,55 @@ async fn iceberg_sink_handles_bulk_messages(
assert_eq!(sinks.len(), 1);
assert!(sinks[0].last_error.is_none());
}
+
+#[iggy_harness(
+ server(connectors_runtime(config_path =
"tests/connectors/iceberg/sink_default_credentials.toml")),
+ seed = seeds::connector_stream
+)]
+async fn iceberg_sink_uses_default_credential_chain(
+ harness: &TestHarness,
+ fixture: IcebergEnvAuthFixture,
+) {
+ let client = harness
+ .root_client()
+ .await
+ .expect("Failed to get root client");
+ let stream_id: iggy_common::Identifier =
+ seeds::names::STREAM.try_into().expect("Invalid stream id");
+ let topic_id: iggy_common::Identifier =
+ seeds::names::TOPIC.try_into().expect("Invalid topic id");
+ let test_messages = crate::connectors::create_test_messages(5);
+ let mut messages: Vec<IggyMessage> = test_messages
+ .iter()
+ .enumerate()
+ .map(|(i, msg)| {
+ let payload = serde_json::to_vec(msg).expect("Failed to serialize
message");
+ IggyMessage::builder()
+ .id((i + 1) as u128)
+ .payload(bytes::Bytes::from(payload))
+ .build()
+ .expect("Failed to build message")
+ })
+ .collect();
+ client
+ .send_messages(
+ &stream_id,
+ &topic_id,
+ &Partitioning::partition_id(0),
+ &mut messages,
+ )
+ .await
+ .expect("Failed to send fake messages");
+ let snapshot_count = fixture
+ .wait_for_snapshots(
+ DEFAULT_NAMESPACE,
+ DEFAULT_TABLE,
+ 1,
+ SNAPSHOT_POLL_ATTEMPTS,
+ SNAPSHOT_POLL_INTERVAL_MS,
+ )
+ .await
+ .expect("Data should be written to Iceberg table");
+ assert!(snapshot_count >= 1);
+ drop(fixture);
+}
diff --git
a/core/integration/tests/connectors/iceberg/sink_default_credentials.toml
b/core/integration/tests/connectors/iceberg/sink_default_credentials.toml
new file mode 100644
index 000000000..c9ff51ebf
--- /dev/null
+++ b/core/integration/tests/connectors/iceberg/sink_default_credentials.toml
@@ -0,0 +1,20 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+[connectors]
+config_type = "local"
+config_dir = "tests/connectors/iceberg/default_credentials_config"