hubcio commented on code in PR #3045:
URL: https://github.com/apache/iggy/pull/3045#discussion_r3044526771


##########
core/integration/tests/connectors/fixtures/iceberg/container.rs:
##########
@@ -532,3 +535,39 @@ impl TestFixture for IcebergPreCreatedFixture {
         self.inner.connectors_runtime_envs()
     }
 }
+
+pub struct IcebergEnvAuthFixture {
+    inner: IcebergPreCreatedFixture,
+}
+
+impl IcebergOps for IcebergEnvAuthFixture {
+    fn catalog_url(&self) -> &str {
+        self.inner.catalog_url()
+    }
+
+    fn http_client(&self) -> &HttpClient {
+        self.inner.http_client()
+    }
+}
+
+#[async_trait]
+impl TestFixture for IcebergEnvAuthFixture {
+    async fn setup() -> Result<Self, TestBinaryError> {
+        let inner = IcebergPreCreatedFixture::setup().await?;
+        // Set credentials before test server initialization.
+        unsafe {

Review Comment:
   `unsafe { std::env::set_var }` without cleanup or safety justification. the 
tokio runtime is already running at this point, so concurrent env reads from 
other threads are UB. no `Drop` impl means these vars leak into subsequent 
tests.
   
   the fix is straightforward: add `AWS_ACCESS_KEY_ID` / 
`AWS_SECRET_ACCESS_KEY` to the `HashMap` returned by 
`connectors_runtime_envs()` instead - the harness already passes that map to 
the child process via `Command::envs()` at `connectors_runtime.rs:157`, 
eliminating the `unsafe` entirely.



##########
core/connectors/sinks/iceberg_sink/src/props.rs:
##########
@@ -30,14 +30,49 @@ pub fn init_props(config: &IcebergSinkConfig) -> 
Result<HashMap<String, String>,
 fn get_props_s3(config: &IcebergSinkConfig) -> Result<HashMap<String, String>, 
Error> {
     let mut props: HashMap<String, String> = HashMap::new();
     props.insert("s3.region".to_string(), config.store_region.clone());
-    props.insert(
-        "s3.access-key-id".to_string(),
-        config.store_access_key_id.clone(),
-    );
-    props.insert(
-        "s3.secret-access-key".to_string(),
-        config.store_secret_access_key.clone(),
-    );
+    if let Some(access_key_id) = &config.store_access_key_id {

Review Comment:
   `get_props_s3()` handles each credential independently with separate `if let 
Some(...)` blocks. the partial-credential invariant (both-or-neither) is only 
enforced in `sink.rs::open()`, but `init_props` is `pub` - a future caller 
could bypass validation and get a silently half-configured props map. consider 
making `init_props` `pub(crate)` or adding both-or-neither validation here as 
defense-in-depth.



##########
core/integration/tests/connectors/iceberg/iceberg_sink.rs:
##########
@@ -207,3 +207,50 @@ async fn iceberg_sink_handles_bulk_messages(
     assert_eq!(sinks.len(), 1);
     assert!(sinks[0].last_error.is_none());
 }
+
+#[iggy_harness(
+    server(connectors_runtime(config_path = 
"tests/connectors/iceberg/sink_default_credentials.toml")),
+    seed = seeds::connector_stream
+)]
+async fn iceberg_sink_uses_default_credential_chain(
+    harness: &TestHarness,
+    fixture: IcebergEnvAuthFixture,
+) {
+    let client = harness.root_client().await.unwrap();

Review Comment:
   bare `.unwrap()` calls here and at lines 220, 221, 227, 232 - the existing 
tests in this file consistently use `.expect("...")` with context messages.



##########
core/connectors/sinks/iceberg_sink/src/sink.rs:
##########
@@ -30,22 +30,32 @@ use tracing::{debug, error, info};
 #[async_trait]
 impl Sink for IcebergSink {
     async fn open(&mut self) -> Result<(), Error> {
-        let redacted_store_key = self
-            .config
-            .store_access_key_id
-            .chars()
-            .take(3)
-            .collect::<String>();
-        let redacted_store_secret = self
-            .config
-            .store_secret_access_key
-            .chars()
-            .take(3)
-            .collect::<String>();
-        info!(
-            "Opened Iceberg sink connector with ID: {} for URL: {}, store 
access key ID: {redacted_store_key}***  store secret: 
{redacted_store_secret}***",
-            self.id, self.config.uri
-        );
+        match (
+            &self.config.store_access_key_id,
+            &self.config.store_secret_access_key,
+        ) {
+            (Some(store_access_key_id), Some(store_secret_access_key)) => {
+                let redacted_store_key = 
store_access_key_id.chars().take(3).collect::<String>();
+                let redacted_store_secret =
+                    
store_secret_access_key.chars().take(3).collect::<String>();
+                info!(
+                    "Opened Iceberg sink connector with ID: {} for URL: {}, 
store access key ID: {redacted_store_key}***  store secret: 
{redacted_store_secret}***",
+                    self.id, self.config.uri
+                );
+            }
+            (None, None) => {
+                info!(
+                    "Opened Iceberg sink connector with ID: {} for URL: {}. No 
explicit credentials provided, falling back to default credential provider 
chain",
+                    self.id, self.config.uri
+                );
+            }
+            _ => {
+                error!(
+                    "Partially configured Iceberg credentials. You must 
provide both store_access_key_id and store_secret_access_key, or omit both."
+                );
+                return Err(Error::InvalidConfig);

Review Comment:
   `Error::InvalidConfig` produces a bare "Invalid config" message. 
`Error::InvalidConfigValue(String)` exists (used by influxdb connectors) and 
carries a description - use it so the error is self-explanatory without needing 
to correlate with the `error!()` log above.



##########
core/connectors/sinks/iceberg_sink/src/props.rs:
##########
@@ -30,14 +30,49 @@ pub fn init_props(config: &IcebergSinkConfig) -> 
Result<HashMap<String, String>,
 fn get_props_s3(config: &IcebergSinkConfig) -> Result<HashMap<String, String>, 
Error> {
     let mut props: HashMap<String, String> = HashMap::new();
     props.insert("s3.region".to_string(), config.store_region.clone());
-    props.insert(
-        "s3.access-key-id".to_string(),
-        config.store_access_key_id.clone(),
-    );
-    props.insert(
-        "s3.secret-access-key".to_string(),
-        config.store_secret_access_key.clone(),
-    );
+    if let Some(access_key_id) = &config.store_access_key_id {
+        props.insert("s3.access-key-id".to_string(), access_key_id.clone());
+    }
+    if let Some(secret_access_key) = &config.store_secret_access_key {
+        props.insert(
+            "s3.secret-access-key".to_string(),
+            secret_access_key.clone(),
+        );
+    }
     props.insert("s3.endpoint".to_string(), config.store_url.clone());
     Ok(props)
 }
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use crate::{IcebergSinkConfig, IcebergSinkStoreClass, IcebergSinkTypes};
+
+    #[test]
+    fn test_get_props_s3() {

Review Comment:
   test covers both-`None` and both-`Some` but not the mixed case (one `Some`, 
one `None`). if validation moves into `get_props_s3` per the above comment, 
this test should verify the error path too.



##########
core/integration/tests/connectors/iceberg/default_credentials_config/config.toml:
##########
@@ -0,0 +1,45 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+type = "sink"
+key = "iceberg"
+enabled = true
+version = 0
+name = "Iceberg sink"
+path = "../../target/release/libiggy_connector_iceberg_sink"

Review Comment:
   `path` points to `target/release` but `ENV_SINK_PATH` always overrides to 
debug at runtime. not broken, but misleading. this is the same pattern as the 
original `config.toml` so not introduced by this PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to