hubcio commented on code in PR #2889:
URL: https://github.com/apache/iggy/pull/2889#discussion_r3140609200


##########
core/integration/tests/connectors/delta/delta_sink.rs:
##########
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use crate::connectors::create_test_messages;
+use crate::connectors::fixtures::{DeltaFixture, DeltaS3Fixture};
+use bytes::Bytes;
+use iggy::prelude::{IggyMessage, Partitioning};
+use iggy_common::Identifier;
+use iggy_common::MessageClient;
+use iggy_connector_sdk::api::SinkInfoResponse;
+use integration::harness::seeds;
+use integration::iggy_harness;
+use reqwest::Client;
+
+const API_KEY: &str = "test-api-key";
+const DELTA_SINK_KEY: &str = "delta";
+const VERSION_POLL_ATTEMPTS: usize = 30;
+const VERSION_POLL_INTERVAL_MS: u64 = 500;
+const BULK_VERSION_POLL_ATTEMPTS: usize = 60;
+
+#[iggy_harness(
+    server(connectors_runtime(config_path = 
"tests/connectors/delta/sink.toml")),
+    seed = seeds::connector_stream
+)]
+async fn delta_sink_initializes_and_runs(harness: &TestHarness, fixture: 
DeltaFixture) {
+    let api_address = harness
+        .connectors_runtime()
+        .expect("connector runtime should be available")
+        .http_url();
+    let http_client = Client::new();
+
+    let response = http_client
+        .get(format!("{}/sinks", api_address))
+        .header("api-key", API_KEY)
+        .send()
+        .await
+        .expect("Failed to get sinks");
+
+    assert_eq!(response.status(), 200);
+    let sinks: Vec<SinkInfoResponse> = response.json().await.expect("Failed to 
parse sinks");
+
+    assert_eq!(sinks.len(), 1);
+    assert_eq!(sinks[0].key, DELTA_SINK_KEY);
+    assert!(sinks[0].enabled);
+
+    drop(fixture);
+}
+
+#[iggy_harness(
+    server(connectors_runtime(config_path = 
"tests/connectors/delta/sink.toml")),
+    seed = seeds::connector_stream
+)]
+async fn delta_sink_consumes_json_messages(harness: &TestHarness, fixture: 
DeltaFixture) {
+    let client = harness.root_client().await.unwrap();
+    let api_address = harness
+        .connectors_runtime()
+        .expect("connector runtime should be available")
+        .http_url();
+    let http_client = Client::new();
+
+    let stream_id: Identifier = seeds::names::STREAM.try_into().unwrap();
+    let topic_id: Identifier = seeds::names::TOPIC.try_into().unwrap();
+
+    let message_count = 5;
+    let test_messages = create_test_messages(message_count);
+    let mut messages: Vec<IggyMessage> = test_messages
+        .iter()
+        .enumerate()
+        .map(|(i, msg)| {
+            let payload = serde_json::to_vec(msg).expect("Failed to serialize 
message");
+            IggyMessage::builder()
+                .id((i + 1) as u128)
+                .payload(Bytes::from(payload))
+                .build()
+                .expect("Failed to build message")
+        })
+        .collect();
+
+    client
+        .send_messages(
+            &stream_id,
+            &topic_id,
+            &Partitioning::partition_id(0),
+            &mut messages,
+        )
+        .await
+        .expect("Failed to send messages");
+
+    let version_count = fixture
+        .wait_for_delta_log(1, VERSION_POLL_ATTEMPTS, VERSION_POLL_INTERVAL_MS)
+        .await
+        .expect("Data should be written to Delta table");
+
+    assert!(version_count >= 1);
+
+    let response = http_client
+        .get(format!("{}/sinks", api_address))
+        .header("api-key", API_KEY)
+        .send()
+        .await
+        .expect("Failed to get sinks");
+
+    assert_eq!(response.status(), 200);
+    let sinks: Vec<SinkInfoResponse> = response.json().await.expect("Failed to 
parse sinks");
+
+    assert_eq!(sinks.len(), 1);
+    assert_eq!(sinks[0].key, DELTA_SINK_KEY);
+    assert!(sinks[0].last_error.is_none());
+
+    drop(fixture);
+}
+
+#[iggy_harness(
+    server(connectors_runtime(config_path = 
"tests/connectors/delta/sink.toml")),
+    seed = seeds::connector_stream
+)]
+async fn delta_sink_handles_bulk_messages(harness: &TestHarness, fixture: 
DeltaFixture) {
+    let client = harness.root_client().await.unwrap();
+    let api_address = harness
+        .connectors_runtime()
+        .expect("connector runtime should be available")
+        .http_url();
+    let http_client = Client::new();
+
+    let stream_id: Identifier = seeds::names::STREAM.try_into().unwrap();
+    let topic_id: Identifier = seeds::names::TOPIC.try_into().unwrap();
+
+    let message_count = 100;
+    let test_messages = create_test_messages(message_count);
+    let mut messages: Vec<IggyMessage> = test_messages
+        .iter()
+        .enumerate()
+        .map(|(i, msg)| {
+            let payload = serde_json::to_vec(msg).expect("Failed to serialize 
message");
+            IggyMessage::builder()
+                .id((i + 1) as u128)
+                .payload(Bytes::from(payload))
+                .build()
+                .expect("Failed to build message")
+        })
+        .collect();
+
+    client
+        .send_messages(
+            &stream_id,
+            &topic_id,
+            &Partitioning::partition_id(0),
+            &mut messages,
+        )
+        .await
+        .expect("Failed to send messages");
+
+    let version_count = fixture
+        .wait_for_delta_log(1, BULK_VERSION_POLL_ATTEMPTS, 
VERSION_POLL_INTERVAL_MS)
+        .await
+        .expect("Data should be written to Delta table");
+
+    assert!(version_count >= 1);

Review Comment:
   same issue as `delta_sink_consumes_json_messages` above - 
`count_delta_versions >= 1` is satisfied by the fixture's version-0 log entry 
before the sink runs. 100 messages sent, 0 messages written, still green. 
please gate on row count.



##########
core/integration/tests/connectors/fixtures/delta/fixture.rs:
##########
@@ -0,0 +1,324 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use async_trait::async_trait;
+use deltalake::kernel::{DataType, PrimitiveType, StructField};
+use deltalake::operations::create::CreateBuilder;
+use integration::harness::{TestBinaryError, TestFixture};
+use std::collections::HashMap;
+use std::path::PathBuf;
+use tempfile::TempDir;
+use testcontainers_modules::testcontainers::core::{IntoContainerPort, WaitFor};
+use testcontainers_modules::testcontainers::runners::AsyncRunner;
+use testcontainers_modules::testcontainers::{ContainerAsync, GenericImage, 
ImageExt};
+use tracing::info;
+use uuid::Uuid;
+
+const ENV_SINK_TABLE_URI: &str = 
"IGGY_CONNECTORS_SINK_DELTA_PLUGIN_CONFIG_TABLE_URI";
+const ENV_SINK_PATH: &str = "IGGY_CONNECTORS_SINK_DELTA_PATH";
+const ENV_SINK_STORAGE_BACKEND_TYPE: &str =
+    "IGGY_CONNECTORS_SINK_DELTA_PLUGIN_CONFIG_STORAGE_BACKEND_TYPE";
+const ENV_SINK_AWS_S3_ACCESS_KEY: &str =
+    "IGGY_CONNECTORS_SINK_DELTA_PLUGIN_CONFIG_AWS_S3_ACCESS_KEY";
+const ENV_SINK_AWS_S3_SECRET_KEY: &str =
+    "IGGY_CONNECTORS_SINK_DELTA_PLUGIN_CONFIG_AWS_S3_SECRET_KEY";
+const ENV_SINK_AWS_S3_REGION: &str = 
"IGGY_CONNECTORS_SINK_DELTA_PLUGIN_CONFIG_AWS_S3_REGION";
+const ENV_SINK_AWS_S3_ENDPOINT_URL: &str =
+    "IGGY_CONNECTORS_SINK_DELTA_PLUGIN_CONFIG_AWS_S3_ENDPOINT_URL";
+const ENV_SINK_AWS_S3_ALLOW_HTTP: &str =
+    "IGGY_CONNECTORS_SINK_DELTA_PLUGIN_CONFIG_AWS_S3_ALLOW_HTTP";
+
+const MINIO_IMAGE: &str = "minio/minio";
+const MINIO_TAG: &str = "RELEASE.2025-09-07T16-13-09Z";
+const MINIO_PORT: u16 = 9000;
+const MINIO_CONSOLE_PORT: u16 = 9001;
+const MINIO_ACCESS_KEY: &str = "admin";
+const MINIO_SECRET_KEY: &str = "password";
+const MINIO_BUCKET: &str = "delta-warehouse";
+
+pub struct DeltaFixture {
+    _temp_dir: TempDir,
+    table_path: PathBuf,
+}
+
+impl DeltaFixture {
+    pub async fn wait_for_delta_log(
+        &self,
+        min_versions: usize,
+        max_attempts: usize,
+        interval_ms: u64,
+    ) -> Result<usize, TestBinaryError> {
+        let delta_log_dir = self.table_path.join("_delta_log");
+
+        for _ in 0..max_attempts {
+            let count = Self::count_delta_versions(&delta_log_dir);
+            if count >= min_versions {
+                info!("Found {count} delta log versions (required: 
{min_versions})");
+                return Ok(count);
+            }
+            
tokio::time::sleep(std::time::Duration::from_millis(interval_ms)).await;
+        }
+
+        let final_count = Self::count_delta_versions(&delta_log_dir);
+        Err(TestBinaryError::InvalidState {
+            message: format!(
+                "Expected at least {min_versions} delta log versions, found 
{final_count} after {max_attempts} attempts"
+            ),
+        })
+    }
+
+    async fn create_table(table_uri: &str) -> Result<(), TestBinaryError> {
+        let columns = vec![
+            StructField::new("id", DataType::Primitive(PrimitiveType::Long), 
true),
+            StructField::new("name", 
DataType::Primitive(PrimitiveType::String), true),
+            StructField::new("count", 
DataType::Primitive(PrimitiveType::Integer), true),
+            StructField::new("amount", 
DataType::Primitive(PrimitiveType::Double), true),
+            StructField::new("active", 
DataType::Primitive(PrimitiveType::Boolean), true),
+            StructField::new(
+                "timestamp",
+                DataType::Primitive(PrimitiveType::TimestampNtz),
+                true,
+            ),
+        ];
+        CreateBuilder::new()
+            .with_location(table_uri)
+            .with_columns(columns)
+            .await
+            .map_err(|error| TestBinaryError::FixtureSetup {
+                fixture_type: "DeltaFixture".to_string(),
+                message: format!("Failed to create Delta table: {error}"),
+            })?;
+        Ok(())

Review Comment:
   worth noting: this call writes `_delta_log/00000000000000000000.json` 
(version 0) as part of table creation. any test assertion of 
`count_delta_versions >= 1` is trivially satisfied by this alone. capturing the 
baseline count here and exposing it on `DeltaFixture` would let tests assert 
`baseline + 1` meaningfully.



##########
core/connectors/sinks/delta_sink/src/sink.rs:
##########
@@ -0,0 +1,179 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use crate::DeltaSink;
+use crate::SinkState;
+use crate::coercions::{coerce, create_coercion_tree};
+use crate::storage::build_storage_options;
+use async_trait::async_trait;
+use deltalake::writer::{DeltaWriter, JsonWriter};
+use iggy_connector_sdk::{
+    ConsumedMessage, Error, MessagesMetadata, Payload, Sink, TopicMetadata,
+    owned_value_to_serde_json,
+};
+use tracing::{debug, error, info};
+
+// TODO: Expose metrics for observability purposes
+
+#[async_trait]
+impl Sink for DeltaSink {
+    async fn open(&mut self) -> Result<(), Error> {
+        info!(
+            "Opening Delta Lake sink connector with ID: {} for table: {}",
+            self.id, self.config.table_uri
+        );
+
+        let table_url = url::Url::parse(&self.config.table_uri).map_err(|e| {
+            error!("Failed to parse table URI '{}': {e}", 
self.config.table_uri);
+            Error::InitError(format!("Invalid table URI: {e}"))
+        })?;
+
+        info!("Parsed table URI: {}", table_url);
+
+        let storage_options = build_storage_options(&self.config).map_err(|e| {
+            error!("Invalid storage configuration: {e}");
+            Error::InitError(format!("Invalid storage configuration: {e}"))
+        })?;
+
+        let table =
+            match deltalake::open_table_with_storage_options(table_url, 
storage_options).await {
+                Ok(table) => table,
+                Err(e) => {
+                    error!("Failed to load Delta table: {e}");
+                    return Err(Error::InitError(format!("Failed to load Delta 
table: {e}")));
+                }
+            };
+
+        let kernel_schema = table
+            .snapshot()
+            .map_err(|e| {
+                error!("Failed to get table snapshot: {e}");
+                Error::InitError(format!("Failed to get table snapshot: {e}"))
+            })?
+            .schema();
+        let coercion_tree = create_coercion_tree(&kernel_schema);

Review Comment:
   the coercion tree is computed once from the schema seen at `open()` and 
never refreshed. if another writer evolves the delta schema between `open()` 
and a later `consume()` (column added, type changed), coercion for new/changed 
fields won't fire and arrow will reject rows with an opaque `JsonError` - and 
combined with the data-loss path on `flush_and_commit` failure, those rows just 
vanish.
   
   either refresh on schema version change or document that no concurrent 
writers are assumed.



##########
core/integration/tests/connectors/delta/delta_sink.rs:
##########
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use crate::connectors::create_test_messages;
+use crate::connectors::fixtures::{DeltaFixture, DeltaS3Fixture};
+use bytes::Bytes;
+use iggy::prelude::{IggyMessage, Partitioning};
+use iggy_common::Identifier;
+use iggy_common::MessageClient;
+use iggy_connector_sdk::api::SinkInfoResponse;
+use integration::harness::seeds;
+use integration::iggy_harness;
+use reqwest::Client;
+
+const API_KEY: &str = "test-api-key";
+const DELTA_SINK_KEY: &str = "delta";
+const VERSION_POLL_ATTEMPTS: usize = 30;
+const VERSION_POLL_INTERVAL_MS: u64 = 500;
+const BULK_VERSION_POLL_ATTEMPTS: usize = 60;
+
+#[iggy_harness(
+    server(connectors_runtime(config_path = 
"tests/connectors/delta/sink.toml")),
+    seed = seeds::connector_stream
+)]
+async fn delta_sink_initializes_and_runs(harness: &TestHarness, fixture: 
DeltaFixture) {
+    let api_address = harness
+        .connectors_runtime()
+        .expect("connector runtime should be available")
+        .http_url();
+    let http_client = Client::new();
+
+    let response = http_client
+        .get(format!("{}/sinks", api_address))
+        .header("api-key", API_KEY)
+        .send()
+        .await
+        .expect("Failed to get sinks");
+
+    assert_eq!(response.status(), 200);
+    let sinks: Vec<SinkInfoResponse> = response.json().await.expect("Failed to 
parse sinks");
+
+    assert_eq!(sinks.len(), 1);
+    assert_eq!(sinks[0].key, DELTA_SINK_KEY);
+    assert!(sinks[0].enabled);
+
+    drop(fixture);
+}
+
+#[iggy_harness(
+    server(connectors_runtime(config_path = 
"tests/connectors/delta/sink.toml")),
+    seed = seeds::connector_stream
+)]
+async fn delta_sink_consumes_json_messages(harness: &TestHarness, fixture: 
DeltaFixture) {
+    let client = harness.root_client().await.unwrap();
+    let api_address = harness
+        .connectors_runtime()
+        .expect("connector runtime should be available")
+        .http_url();
+    let http_client = Client::new();
+
+    let stream_id: Identifier = seeds::names::STREAM.try_into().unwrap();
+    let topic_id: Identifier = seeds::names::TOPIC.try_into().unwrap();
+
+    let message_count = 5;
+    let test_messages = create_test_messages(message_count);
+    let mut messages: Vec<IggyMessage> = test_messages
+        .iter()
+        .enumerate()
+        .map(|(i, msg)| {
+            let payload = serde_json::to_vec(msg).expect("Failed to serialize 
message");
+            IggyMessage::builder()
+                .id((i + 1) as u128)
+                .payload(Bytes::from(payload))
+                .build()
+                .expect("Failed to build message")
+        })
+        .collect();
+
+    client
+        .send_messages(
+            &stream_id,
+            &topic_id,
+            &Partitioning::partition_id(0),
+            &mut messages,
+        )
+        .await
+        .expect("Failed to send messages");
+
+    let version_count = fixture
+        .wait_for_delta_log(1, VERSION_POLL_ATTEMPTS, VERSION_POLL_INTERVAL_MS)
+        .await
+        .expect("Data should be written to Delta table");
+
+    assert!(version_count >= 1);
+
+    let response = http_client
+        .get(format!("{}/sinks", api_address))
+        .header("api-key", API_KEY)
+        .send()
+        .await
+        .expect("Failed to get sinks");
+
+    assert_eq!(response.status(), 200);
+    let sinks: Vec<SinkInfoResponse> = response.json().await.expect("Failed to 
parse sinks");
+
+    assert_eq!(sinks.len(), 1);
+    assert_eq!(sinks[0].key, DELTA_SINK_KEY);
+    assert!(sinks[0].last_error.is_none());
+
+    drop(fixture);
+}
+
+#[iggy_harness(
+    server(connectors_runtime(config_path = 
"tests/connectors/delta/sink.toml")),
+    seed = seeds::connector_stream
+)]
+async fn delta_sink_handles_bulk_messages(harness: &TestHarness, fixture: 
DeltaFixture) {
+    let client = harness.root_client().await.unwrap();
+    let api_address = harness
+        .connectors_runtime()
+        .expect("connector runtime should be available")
+        .http_url();
+    let http_client = Client::new();
+
+    let stream_id: Identifier = seeds::names::STREAM.try_into().unwrap();
+    let topic_id: Identifier = seeds::names::TOPIC.try_into().unwrap();
+
+    let message_count = 100;
+    let test_messages = create_test_messages(message_count);
+    let mut messages: Vec<IggyMessage> = test_messages
+        .iter()
+        .enumerate()
+        .map(|(i, msg)| {
+            let payload = serde_json::to_vec(msg).expect("Failed to serialize 
message");
+            IggyMessage::builder()
+                .id((i + 1) as u128)
+                .payload(Bytes::from(payload))
+                .build()
+                .expect("Failed to build message")
+        })
+        .collect();
+
+    client
+        .send_messages(
+            &stream_id,
+            &topic_id,
+            &Partitioning::partition_id(0),
+            &mut messages,
+        )
+        .await
+        .expect("Failed to send messages");
+
+    let version_count = fixture
+        .wait_for_delta_log(1, BULK_VERSION_POLL_ATTEMPTS, 
VERSION_POLL_INTERVAL_MS)
+        .await
+        .expect("Data should be written to Delta table");
+
+    assert!(version_count >= 1);
+
+    let response = http_client
+        .get(format!("{}/sinks", api_address))
+        .header("api-key", API_KEY)
+        .send()
+        .await
+        .expect("Failed to get sinks");
+
+    assert_eq!(response.status(), 200);
+    let sinks: Vec<SinkInfoResponse> = response.json().await.expect("Failed to 
parse sinks");
+
+    assert_eq!(sinks.len(), 1);
+    assert!(sinks[0].last_error.is_none());
+
+    drop(fixture);
+}
+
+#[iggy_harness(
+    server(connectors_runtime(config_path = 
"tests/connectors/delta/sink.toml")),
+    seed = seeds::connector_stream
+)]
+async fn delta_sink_writes_to_s3(harness: &TestHarness, fixture: 
DeltaS3Fixture) {
+    let client = harness.root_client().await.unwrap();
+    let api_address = harness
+        .connectors_runtime()
+        .expect("connector runtime should be available")
+        .http_url();
+    let http_client = Client::new();
+
+    let stream_id: Identifier = seeds::names::STREAM.try_into().unwrap();
+    let topic_id: Identifier = seeds::names::TOPIC.try_into().unwrap();
+
+    let message_count = 5;
+    let test_messages = create_test_messages(message_count);
+    let mut messages: Vec<IggyMessage> = test_messages
+        .iter()
+        .enumerate()
+        .map(|(i, msg)| {
+            let payload = serde_json::to_vec(msg).expect("Failed to serialize 
message");
+            IggyMessage::builder()
+                .id((i + 1) as u128)
+                .payload(Bytes::from(payload))
+                .build()
+                .expect("Failed to build message")
+        })
+        .collect();
+
+    client
+        .send_messages(
+            &stream_id,
+            &topic_id,
+            &Partitioning::partition_id(0),
+            &mut messages,
+        )
+        .await
+        .expect("Failed to send messages");
+
+    // For S3, we can't check local delta log files.
+    // Instead, poll the connector runtime API to verify successful 
consumption.
+    let max_attempts = 30;
+    let interval_ms = 500;
+    let mut last_error = None;
+
+    for _ in 0..max_attempts {
+        let response = http_client
+            .get(format!("{}/sinks", api_address))
+            .header("api-key", API_KEY)
+            .send()
+            .await
+            .expect("Failed to get sinks");
+
+        let sinks: Vec<SinkInfoResponse> = 
response.json().await.expect("Failed to parse sinks");
+        if !sinks.is_empty() && sinks[0].last_error.is_none() && 
sinks[0].enabled {
+            last_error = None;
+            // Give the sink time to process the messages
+            tokio::time::sleep(std::time::Duration::from_secs(5)).await;
+
+            // Verify still healthy after processing
+            let response = http_client
+                .get(format!("{}/sinks", api_address))
+                .header("api-key", API_KEY)
+                .send()
+                .await
+                .expect("Failed to get sinks");
+            let sinks: Vec<SinkInfoResponse> =
+                response.json().await.expect("Failed to parse sinks");
+            assert_eq!(sinks.len(), 1);
+            assert_eq!(sinks[0].key, DELTA_SINK_KEY);
+            assert!(
+                sinks[0].last_error.is_none(),
+                "Sink reported error: {:?}",
+                sinks[0].last_error
+            );
+            break;
+        }
+
+        last_error = sinks.first().and_then(|s| s.last_error.clone());
+        
tokio::time::sleep(std::time::Duration::from_millis(interval_ms)).await;
+    }
+
+    assert!(
+        last_error.is_none(),
+        "Delta S3 sink had errors: {:?}",
+        last_error
+    );

Review Comment:
   the s3 test polls the runtime api for `last_error.is_none()`. the runtime's 
`process_messages` discards the `i32` return from the FFI consume callback, so 
a per-batch `flush_and_commit` failure doesn't propagate to `last_error` (which 
is only set on task-level errors). combined with 
`AutoCommitWhen::PollingMessages` advancing the offset before consume 
completes, this test is green while messages are dropped.
   
   read back from the delta table using `DeltaTableBuilder` with the minio 
storage options and assert the row count.



##########
core/integration/tests/connectors/fixtures/delta/fixture.rs:
##########
@@ -0,0 +1,324 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use async_trait::async_trait;
+use deltalake::kernel::{DataType, PrimitiveType, StructField};
+use deltalake::operations::create::CreateBuilder;
+use integration::harness::{TestBinaryError, TestFixture};
+use std::collections::HashMap;
+use std::path::PathBuf;
+use tempfile::TempDir;
+use testcontainers_modules::testcontainers::core::{IntoContainerPort, WaitFor};
+use testcontainers_modules::testcontainers::runners::AsyncRunner;
+use testcontainers_modules::testcontainers::{ContainerAsync, GenericImage, 
ImageExt};
+use tracing::info;
+use uuid::Uuid;
+
+const ENV_SINK_TABLE_URI: &str = 
"IGGY_CONNECTORS_SINK_DELTA_PLUGIN_CONFIG_TABLE_URI";
+const ENV_SINK_PATH: &str = "IGGY_CONNECTORS_SINK_DELTA_PATH";
+const ENV_SINK_STORAGE_BACKEND_TYPE: &str =
+    "IGGY_CONNECTORS_SINK_DELTA_PLUGIN_CONFIG_STORAGE_BACKEND_TYPE";
+const ENV_SINK_AWS_S3_ACCESS_KEY: &str =
+    "IGGY_CONNECTORS_SINK_DELTA_PLUGIN_CONFIG_AWS_S3_ACCESS_KEY";
+const ENV_SINK_AWS_S3_SECRET_KEY: &str =
+    "IGGY_CONNECTORS_SINK_DELTA_PLUGIN_CONFIG_AWS_S3_SECRET_KEY";
+const ENV_SINK_AWS_S3_REGION: &str = 
"IGGY_CONNECTORS_SINK_DELTA_PLUGIN_CONFIG_AWS_S3_REGION";
+const ENV_SINK_AWS_S3_ENDPOINT_URL: &str =
+    "IGGY_CONNECTORS_SINK_DELTA_PLUGIN_CONFIG_AWS_S3_ENDPOINT_URL";
+const ENV_SINK_AWS_S3_ALLOW_HTTP: &str =
+    "IGGY_CONNECTORS_SINK_DELTA_PLUGIN_CONFIG_AWS_S3_ALLOW_HTTP";
+
+const MINIO_IMAGE: &str = "minio/minio";
+const MINIO_TAG: &str = "RELEASE.2025-09-07T16-13-09Z";
+const MINIO_PORT: u16 = 9000;
+const MINIO_CONSOLE_PORT: u16 = 9001;
+const MINIO_ACCESS_KEY: &str = "admin";
+const MINIO_SECRET_KEY: &str = "password";
+const MINIO_BUCKET: &str = "delta-warehouse";
+
+pub struct DeltaFixture {
+    _temp_dir: TempDir,
+    table_path: PathBuf,
+}
+
+impl DeltaFixture {
+    pub async fn wait_for_delta_log(
+        &self,
+        min_versions: usize,
+        max_attempts: usize,
+        interval_ms: u64,
+    ) -> Result<usize, TestBinaryError> {
+        let delta_log_dir = self.table_path.join("_delta_log");
+
+        for _ in 0..max_attempts {
+            let count = Self::count_delta_versions(&delta_log_dir);
+            if count >= min_versions {
+                info!("Found {count} delta log versions (required: 
{min_versions})");
+                return Ok(count);
+            }
+            
tokio::time::sleep(std::time::Duration::from_millis(interval_ms)).await;
+        }
+
+        let final_count = Self::count_delta_versions(&delta_log_dir);
+        Err(TestBinaryError::InvalidState {
+            message: format!(
+                "Expected at least {min_versions} delta log versions, found 
{final_count} after {max_attempts} attempts"
+            ),
+        })
+    }
+
+    async fn create_table(table_uri: &str) -> Result<(), TestBinaryError> {
+        let columns = vec![
+            StructField::new("id", DataType::Primitive(PrimitiveType::Long), 
true),
+            StructField::new("name", 
DataType::Primitive(PrimitiveType::String), true),
+            StructField::new("count", 
DataType::Primitive(PrimitiveType::Integer), true),
+            StructField::new("amount", 
DataType::Primitive(PrimitiveType::Double), true),
+            StructField::new("active", 
DataType::Primitive(PrimitiveType::Boolean), true),
+            StructField::new(
+                "timestamp",
+                DataType::Primitive(PrimitiveType::TimestampNtz),
+                true,
+            ),
+        ];
+        CreateBuilder::new()
+            .with_location(table_uri)
+            .with_columns(columns)
+            .await
+            .map_err(|error| TestBinaryError::FixtureSetup {
+                fixture_type: "DeltaFixture".to_string(),
+                message: format!("Failed to create Delta table: {error}"),
+            })?;
+        Ok(())
+    }
+
+    fn count_delta_versions(delta_log_dir: &std::path::Path) -> usize {
+        std::fs::read_dir(delta_log_dir)
+            .map(|entries| {
+                entries
+                    .filter_map(|e| e.ok())
+                    .filter(|e| e.path().extension().is_some_and(|ext| ext == 
"json"))
+                    .count()
+            })
+            .unwrap_or(0)
+    }
+}
+
+#[async_trait]
+impl TestFixture for DeltaFixture {
+    async fn setup() -> Result<Self, TestBinaryError> {
+        let temp_dir = TempDir::new().map_err(|error| 
TestBinaryError::FixtureSetup {
+            fixture_type: "DeltaFixture".to_string(),
+            message: format!("Failed to create temp directory: {error}"),
+        })?;
+
+        let table_path = temp_dir.path().join("delta_table");
+        let table_uri = format!("file://{}", table_path.display());
+        Self::create_table(&table_uri).await?;
+        info!(
+            "Delta fixture created with table path: {}",
+            table_path.display()
+        );
+
+        Ok(Self {
+            _temp_dir: temp_dir,
+            table_path,
+        })
+    }
+
+    fn connectors_runtime_envs(&self) -> HashMap<String, String> {
+        let table_uri = format!("file://{}", self.table_path.display());
+
+        let mut envs = HashMap::new();
+        envs.insert(ENV_SINK_TABLE_URI.to_string(), table_uri);
+        envs.insert(
+            ENV_SINK_PATH.to_string(),
+            "../../target/debug/libiggy_connector_delta_sink".to_string(),
+        );
+        envs
+    }
+}
+
+pub struct DeltaS3Fixture {
+    #[allow(dead_code)]
+    minio: ContainerAsync<GenericImage>,
+    minio_endpoint: String,
+}
+
+impl DeltaS3Fixture {
+    async fn start_minio(
+        network: &str,
+        container_name: &str,
+    ) -> Result<(ContainerAsync<GenericImage>, String), TestBinaryError> {
+        let container = GenericImage::new(MINIO_IMAGE, MINIO_TAG)
+            .with_exposed_port(MINIO_PORT.tcp())
+            .with_exposed_port(MINIO_CONSOLE_PORT.tcp())
+            .with_wait_for(WaitFor::message_on_stderr("API:"))
+            .with_network(network)
+            .with_container_name(container_name)
+            .with_env_var("MINIO_ROOT_USER", MINIO_ACCESS_KEY)
+            .with_env_var("MINIO_ROOT_PASSWORD", MINIO_SECRET_KEY)
+            .with_cmd(vec!["server", "/data", "--console-address", ":9001"])
+            .with_mapped_port(0, MINIO_PORT.tcp())
+            .with_mapped_port(0, MINIO_CONSOLE_PORT.tcp())
+            .start()
+            .await
+            .map_err(|error| TestBinaryError::FixtureSetup {
+                fixture_type: "DeltaS3Fixture".to_string(),
+                message: format!("Failed to start MinIO container: {error}"),
+            })?;
+
+        info!("Started MinIO container for Delta S3 tests");
+
+        let mapped_port = container
+            .ports()
+            .await
+            .map_err(|error| TestBinaryError::FixtureSetup {
+                fixture_type: "DeltaS3Fixture".to_string(),
+                message: format!("Failed to get ports: {error}"),
+            })?
+            .map_to_host_port_ipv4(MINIO_PORT)
+            .ok_or_else(|| TestBinaryError::FixtureSetup {
+                fixture_type: "DeltaS3Fixture".to_string(),
+                message: "No mapping for MinIO port".to_string(),
+            })?;
+
+        let endpoint = format!("http://localhost:{mapped_port}";);
+        info!("MinIO container available at {endpoint}");
+
+        Ok((container, endpoint))
+    }
+
+    async fn create_bucket(minio_endpoint: &str) -> Result<(), 
TestBinaryError> {
+        use std::process::Command;
+
+        let host = minio_endpoint.trim_start_matches("http://";);
+        let mc_host = format!("http://{}:{}@{}";, MINIO_ACCESS_KEY, 
MINIO_SECRET_KEY, host);
+
+        let output = Command::new("docker")
+            .args([
+                "run",
+                "--rm",
+                "--network=host",
+                "-e",
+                &format!("MC_HOST_minio={}", mc_host),
+                "minio/mc",
+                "mb",
+                "--ignore-existing",
+                &format!("minio/{}", MINIO_BUCKET),
+            ])
+            .output()
+            .map_err(|error| TestBinaryError::FixtureSetup {
+                fixture_type: "DeltaS3Fixture".to_string(),
+                message: format!("Failed to run mc command: {error}"),
+            })?;

Review Comment:
   `std::process::Command::output()` blocks the tokio worker thread for the 
duration of the `docker run` call. use `tokio::process::Command` or wrap in 
`spawn_blocking`. integration-test only so not functional, but it serializes 
with other async work on the test runtime.



##########
core/integration/tests/connectors/delta/delta_sink.rs:
##########
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use crate::connectors::create_test_messages;
+use crate::connectors::fixtures::{DeltaFixture, DeltaS3Fixture};
+use bytes::Bytes;
+use iggy::prelude::{IggyMessage, Partitioning};
+use iggy_common::Identifier;
+use iggy_common::MessageClient;
+use iggy_connector_sdk::api::SinkInfoResponse;
+use integration::harness::seeds;
+use integration::iggy_harness;
+use reqwest::Client;
+
+const API_KEY: &str = "test-api-key";
+const DELTA_SINK_KEY: &str = "delta";
+const VERSION_POLL_ATTEMPTS: usize = 30;
+const VERSION_POLL_INTERVAL_MS: u64 = 500;
+const BULK_VERSION_POLL_ATTEMPTS: usize = 60;
+
+#[iggy_harness(
+    server(connectors_runtime(config_path = 
"tests/connectors/delta/sink.toml")),
+    seed = seeds::connector_stream
+)]
+async fn delta_sink_initializes_and_runs(harness: &TestHarness, fixture: 
DeltaFixture) {
+    let api_address = harness
+        .connectors_runtime()
+        .expect("connector runtime should be available")
+        .http_url();
+    let http_client = Client::new();
+
+    let response = http_client
+        .get(format!("{}/sinks", api_address))
+        .header("api-key", API_KEY)
+        .send()
+        .await
+        .expect("Failed to get sinks");
+
+    assert_eq!(response.status(), 200);
+    let sinks: Vec<SinkInfoResponse> = response.json().await.expect("Failed to 
parse sinks");
+
+    assert_eq!(sinks.len(), 1);
+    assert_eq!(sinks[0].key, DELTA_SINK_KEY);
+    assert!(sinks[0].enabled);
+
+    drop(fixture);
+}
+
+#[iggy_harness(
+    server(connectors_runtime(config_path = 
"tests/connectors/delta/sink.toml")),
+    seed = seeds::connector_stream
+)]
+async fn delta_sink_consumes_json_messages(harness: &TestHarness, fixture: 
DeltaFixture) {
+    let client = harness.root_client().await.unwrap();
+    let api_address = harness
+        .connectors_runtime()
+        .expect("connector runtime should be available")
+        .http_url();
+    let http_client = Client::new();
+
+    let stream_id: Identifier = seeds::names::STREAM.try_into().unwrap();
+    let topic_id: Identifier = seeds::names::TOPIC.try_into().unwrap();
+
+    let message_count = 5;
+    let test_messages = create_test_messages(message_count);
+    let mut messages: Vec<IggyMessage> = test_messages
+        .iter()
+        .enumerate()
+        .map(|(i, msg)| {
+            let payload = serde_json::to_vec(msg).expect("Failed to serialize 
message");
+            IggyMessage::builder()
+                .id((i + 1) as u128)
+                .payload(Bytes::from(payload))
+                .build()
+                .expect("Failed to build message")
+        })
+        .collect();
+
+    client
+        .send_messages(
+            &stream_id,
+            &topic_id,
+            &Partitioning::partition_id(0),
+            &mut messages,
+        )
+        .await
+        .expect("Failed to send messages");
+
+    let version_count = fixture
+        .wait_for_delta_log(1, VERSION_POLL_ATTEMPTS, VERSION_POLL_INTERVAL_MS)
+        .await
+        .expect("Data should be written to Delta table");
+
+    assert!(version_count >= 1);

Review Comment:
   `DeltaFixture::create_table` already writes 
`_delta_log/00000000000000000000.json` at setup (delta creates version 0 as 
part of establishing the table). so `count_delta_versions` returns 1 before the 
sink ever runs, `wait_for_delta_log(1, ...)` resolves immediately, and 
`assert!(version_count >= 1)` passes even if the sink wrote zero messages.
   
   fix: either wait for `>= 2`, capture the baseline after `create_table` and 
wait for `baseline + 1`, or better - open the delta table and assert the row 
count.



##########
core/connectors/sinks/delta_sink/src/lib.rs:
##########
@@ -0,0 +1,99 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use deltalake::DeltaTable;
+use deltalake::writer::JsonWriter;
+use iggy_connector_sdk::sink_connector;
+use secrecy::SecretString;
+use serde::Deserialize;
+use tokio::sync::Mutex;
+
+#[derive(Debug, Deserialize)]
+#[serde(rename_all = "lowercase")]
+pub enum StorageBackendType {
+    S3,
+    Azure,
+    Gcs,
+}
+
+mod coercions;
+mod sink;
+mod storage;
+
+use crate::coercions::CoercionTree;
+
+sink_connector!(DeltaSink);

Review Comment:
   the `sink_connector!` macro expansion does `INSTANCES.insert(id, container)` 
in `core/connectors/sdk/src/sink.rs` without checking for an existing entry. a 
second `iggy_sink_open(id, ...)` with the same id overwrites the prior 
`SinkContainer`, dropping the old `DeltaTable` and `JsonWriter` without calling 
`close()` - any records buffered in the old writer are silently discarded.
   
   the runtime doesn't currently double-open, but the macro is the public SDK 
contract and delta_sink is the first sink with real buffered state where this 
actually causes data loss. worth either rejecting same-id reopens in the macro 
or documenting the contract. SDK-level fix, not a blocker here,imo worth issue.



##########
core/integration/tests/connectors/fixtures/delta/fixture.rs:
##########
@@ -0,0 +1,324 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use async_trait::async_trait;
+use deltalake::kernel::{DataType, PrimitiveType, StructField};
+use deltalake::operations::create::CreateBuilder;
+use integration::harness::{TestBinaryError, TestFixture};
+use std::collections::HashMap;
+use std::path::PathBuf;
+use tempfile::TempDir;
+use testcontainers_modules::testcontainers::core::{IntoContainerPort, WaitFor};
+use testcontainers_modules::testcontainers::runners::AsyncRunner;
+use testcontainers_modules::testcontainers::{ContainerAsync, GenericImage, 
ImageExt};
+use tracing::info;
+use uuid::Uuid;
+
+const ENV_SINK_TABLE_URI: &str = 
"IGGY_CONNECTORS_SINK_DELTA_PLUGIN_CONFIG_TABLE_URI";
+const ENV_SINK_PATH: &str = "IGGY_CONNECTORS_SINK_DELTA_PATH";
+const ENV_SINK_STORAGE_BACKEND_TYPE: &str =
+    "IGGY_CONNECTORS_SINK_DELTA_PLUGIN_CONFIG_STORAGE_BACKEND_TYPE";
+const ENV_SINK_AWS_S3_ACCESS_KEY: &str =
+    "IGGY_CONNECTORS_SINK_DELTA_PLUGIN_CONFIG_AWS_S3_ACCESS_KEY";
+const ENV_SINK_AWS_S3_SECRET_KEY: &str =
+    "IGGY_CONNECTORS_SINK_DELTA_PLUGIN_CONFIG_AWS_S3_SECRET_KEY";
+const ENV_SINK_AWS_S3_REGION: &str = 
"IGGY_CONNECTORS_SINK_DELTA_PLUGIN_CONFIG_AWS_S3_REGION";
+const ENV_SINK_AWS_S3_ENDPOINT_URL: &str =
+    "IGGY_CONNECTORS_SINK_DELTA_PLUGIN_CONFIG_AWS_S3_ENDPOINT_URL";
+const ENV_SINK_AWS_S3_ALLOW_HTTP: &str =
+    "IGGY_CONNECTORS_SINK_DELTA_PLUGIN_CONFIG_AWS_S3_ALLOW_HTTP";
+
+const MINIO_IMAGE: &str = "minio/minio";
+const MINIO_TAG: &str = "RELEASE.2025-09-07T16-13-09Z";
+const MINIO_PORT: u16 = 9000;
+const MINIO_CONSOLE_PORT: u16 = 9001;
+const MINIO_ACCESS_KEY: &str = "admin";
+const MINIO_SECRET_KEY: &str = "password";
+const MINIO_BUCKET: &str = "delta-warehouse";
+
+pub struct DeltaFixture {
+    _temp_dir: TempDir,
+    table_path: PathBuf,
+}
+
+impl DeltaFixture {
+    pub async fn wait_for_delta_log(
+        &self,
+        min_versions: usize,
+        max_attempts: usize,
+        interval_ms: u64,
+    ) -> Result<usize, TestBinaryError> {
+        let delta_log_dir = self.table_path.join("_delta_log");
+
+        for _ in 0..max_attempts {
+            let count = Self::count_delta_versions(&delta_log_dir);
+            if count >= min_versions {
+                info!("Found {count} delta log versions (required: 
{min_versions})");
+                return Ok(count);
+            }
+            
tokio::time::sleep(std::time::Duration::from_millis(interval_ms)).await;
+        }
+
+        let final_count = Self::count_delta_versions(&delta_log_dir);
+        Err(TestBinaryError::InvalidState {
+            message: format!(
+                "Expected at least {min_versions} delta log versions, found 
{final_count} after {max_attempts} attempts"
+            ),
+        })
+    }
+
+    async fn create_table(table_uri: &str) -> Result<(), TestBinaryError> {
+        let columns = vec![
+            StructField::new("id", DataType::Primitive(PrimitiveType::Long), 
true),
+            StructField::new("name", 
DataType::Primitive(PrimitiveType::String), true),
+            StructField::new("count", 
DataType::Primitive(PrimitiveType::Integer), true),
+            StructField::new("amount", 
DataType::Primitive(PrimitiveType::Double), true),
+            StructField::new("active", 
DataType::Primitive(PrimitiveType::Boolean), true),
+            StructField::new(
+                "timestamp",
+                DataType::Primitive(PrimitiveType::TimestampNtz),
+                true,
+            ),
+        ];

Review Comment:
   identical 6-column schema duplicated at lines 246-257 
(`DeltaS3Fixture::create_table`). extract to a `fn test_columns() -> 
Vec<StructField>` helper.



##########
core/connectors/sinks/delta_sink/src/storage.rs:
##########
@@ -0,0 +1,313 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use crate::{DeltaSinkConfig, StorageBackendType};
+use iggy_connector_sdk::Error;
+use secrecy::ExposeSecret;
+use std::collections::HashMap;
+
+pub(crate) fn build_storage_options(
+    config: &DeltaSinkConfig,
+) -> Result<HashMap<String, String>, Error> {
+    let mut opts = HashMap::new();
+
+    match config.storage_backend_type {
+        Some(StorageBackendType::S3) => {
+            let access_key = config.aws_s3_access_key.as_ref().ok_or_else(|| {
+                Error::InitError("S3 backend requires 
'aws_s3_access_key'".into())
+            })?;
+            let secret_key = config.aws_s3_secret_key.as_ref().ok_or_else(|| {
+                Error::InitError("S3 backend requires 
'aws_s3_secret_key'".into())
+            })?;
+            let region = config
+                .aws_s3_region
+                .as_ref()
+                .ok_or_else(|| Error::InitError("S3 backend requires 
'aws_s3_region'".into()))?;
+
+            opts.insert("AWS_ACCESS_KEY_ID".into(), access_key.clone());
+            opts.insert(
+                "AWS_SECRET_ACCESS_KEY".into(),
+                secret_key.expose_secret().to_owned(),
+            );
+            opts.insert("AWS_REGION".into(), region.clone());
+
+            if let Some(endpoint_url) = config.aws_s3_endpoint_url.as_ref() {
+                opts.insert("AWS_ENDPOINT_URL".into(), endpoint_url.clone());
+            }
+            if let Some(allow_http) = config.aws_s3_allow_http {
+                opts.insert("AWS_ALLOW_HTTP".into(), allow_http.to_string());
+                opts.insert("AWS_S3_ALLOW_HTTP".into(), 
allow_http.to_string());
+            }
+        }
+        Some(StorageBackendType::Azure) => {
+            let account_name = 
config.azure_storage_account_name.as_ref().ok_or_else(|| {
+                Error::InitError("Azure backend requires 
'azure_storage_account_name'".into())
+            })?;
+            let container_name = 
config.azure_container_name.as_ref().ok_or_else(|| {
+                Error::InitError("Azure backend requires 
'azure_container_name'".into())
+            })?;
+
+            opts.insert("AZURE_STORAGE_ACCOUNT_NAME".into(), 
account_name.clone());
+            opts.insert("AZURE_CONTAINER_NAME".into(), container_name.clone());
+
+            match (
+                config.azure_storage_account_key.as_ref(),
+                config.azure_storage_sas_token.as_ref(),
+            ) {
+                (Some(key), None) => {
+                    opts.insert(
+                        "AZURE_STORAGE_ACCOUNT_KEY".into(),
+                        key.expose_secret().to_owned(),
+                    );
+                }
+                (None, Some(sas)) => {
+                    opts.insert(
+                        "AZURE_STORAGE_SAS_TOKEN".into(),
+                        sas.expose_secret().to_owned(),
+                    );
+                }
+                (Some(_), Some(_)) => {
+                    return Err(Error::InitError("Azure backend requires 
exactly one of 'azure_storage_account_key' or 'azure_storage_sas_token', but 
both were provided".into()));
+                }
+                (None, None) => {
+                    return Err(Error::InitError("Azure backend requires one of 
'azure_storage_account_key' or 'azure_storage_sas_token'".into()));
+                }
+            }
+        }
+        Some(StorageBackendType::Gcs) => {
+            let service_account_key = 
config.gcs_service_account_key.as_ref().ok_or_else(|| {
+                Error::InitError("GCS backend requires 
'gcs_service_account_key'".into())
+            })?;
+            let bucket = config
+                .gcs_bucket
+                .as_ref()
+                .ok_or_else(|| Error::InitError("GCS backend requires 
'gcs_bucket'".into()))?;
+
+            opts.insert(
+                "GOOGLE_SERVICE_ACCOUNT_KEY".into(),
+                service_account_key.expose_secret().to_owned(),
+            );
+            opts.insert("GCS_BUCKET".into(), bucket.clone());

Review Comment:
   `deltalake-gcp` 0.14.0's `GoogleConfigKey::from_str` only accepts 
`google_bucket | google_bucket_name | bucket | bucket_name`. `GCS_BUCKET` 
lowercases to `gcs_bucket` and gets filtered out by the backend's 
`filter_map(...).ok()?`. the bucket comes solely from the `gs://bucket/...` url 
- this field is dead config.
   
   rename to `google_bucket` or drop it + the readme entry. (azure's 
`AZURE_CONTAINER_NAME` at line 66 does work - that backend accepts it after 
lowercasing.)



##########
core/integration/tests/connectors/delta/delta_sink.rs:
##########
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use crate::connectors::create_test_messages;
+use crate::connectors::fixtures::{DeltaFixture, DeltaS3Fixture};
+use bytes::Bytes;
+use iggy::prelude::{IggyMessage, Partitioning};
+use iggy_common::Identifier;
+use iggy_common::MessageClient;
+use iggy_connector_sdk::api::SinkInfoResponse;
+use integration::harness::seeds;
+use integration::iggy_harness;
+use reqwest::Client;
+
+const API_KEY: &str = "test-api-key";
+const DELTA_SINK_KEY: &str = "delta";
+const VERSION_POLL_ATTEMPTS: usize = 30;
+const VERSION_POLL_INTERVAL_MS: u64 = 500;
+const BULK_VERSION_POLL_ATTEMPTS: usize = 60;
+
+#[iggy_harness(
+    server(connectors_runtime(config_path = 
"tests/connectors/delta/sink.toml")),
+    seed = seeds::connector_stream
+)]
+async fn delta_sink_initializes_and_runs(harness: &TestHarness, fixture: 
DeltaFixture) {
+    let api_address = harness
+        .connectors_runtime()
+        .expect("connector runtime should be available")
+        .http_url();
+    let http_client = Client::new();
+
+    let response = http_client
+        .get(format!("{}/sinks", api_address))
+        .header("api-key", API_KEY)
+        .send()
+        .await
+        .expect("Failed to get sinks");
+
+    assert_eq!(response.status(), 200);
+    let sinks: Vec<SinkInfoResponse> = response.json().await.expect("Failed to 
parse sinks");
+
+    assert_eq!(sinks.len(), 1);
+    assert_eq!(sinks[0].key, DELTA_SINK_KEY);
+    assert!(sinks[0].enabled);
+
+    drop(fixture);

Review Comment:
   `fixture` is a function parameter, already dropped at scope exit. this 
pattern repeats at the end of all four tests - safe to remove.



##########
core/connectors/sinks/delta_sink/src/sink.rs:
##########
@@ -0,0 +1,179 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use crate::DeltaSink;
+use crate::SinkState;
+use crate::coercions::{coerce, create_coercion_tree};
+use crate::storage::build_storage_options;
+use async_trait::async_trait;
+use deltalake::writer::{DeltaWriter, JsonWriter};
+use iggy_connector_sdk::{
+    ConsumedMessage, Error, MessagesMetadata, Payload, Sink, TopicMetadata,
+    owned_value_to_serde_json,
+};
+use tracing::{debug, error, info};
+
+// TODO: Expose metrics for observability purposes
+
+#[async_trait]
+impl Sink for DeltaSink {
+    async fn open(&mut self) -> Result<(), Error> {
+        info!(
+            "Opening Delta Lake sink connector with ID: {} for table: {}",
+            self.id, self.config.table_uri
+        );
+
+        let table_url = url::Url::parse(&self.config.table_uri).map_err(|e| {
+            error!("Failed to parse table URI '{}': {e}", 
self.config.table_uri);
+            Error::InitError(format!("Invalid table URI: {e}"))
+        })?;
+
+        info!("Parsed table URI: {}", table_url);
+
+        let storage_options = build_storage_options(&self.config).map_err(|e| {
+            error!("Invalid storage configuration: {e}");
+            Error::InitError(format!("Invalid storage configuration: {e}"))
+        })?;
+
+        let table =
+            match deltalake::open_table_with_storage_options(table_url, 
storage_options).await {
+                Ok(table) => table,
+                Err(e) => {
+                    error!("Failed to load Delta table: {e}");
+                    return Err(Error::InitError(format!("Failed to load Delta 
table: {e}")));
+                }
+            };
+
+        let kernel_schema = table
+            .snapshot()
+            .map_err(|e| {
+                error!("Failed to get table snapshot: {e}");
+                Error::InitError(format!("Failed to get table snapshot: {e}"))
+            })?
+            .schema();
+        let coercion_tree = create_coercion_tree(&kernel_schema);
+
+        let writer = JsonWriter::for_table(&table).map_err(|e| {
+            error!("Failed to create JsonWriter: {e}");
+            Error::InitError(format!("Failed to create JsonWriter: {e}"))
+        })?;
+
+        *self.state.lock().await = Some(SinkState {
+            table,
+            writer,
+            coercion_tree,
+        });
+
+        info!(
+            "Delta Lake sink connector with ID: {} opened successfully.",
+            self.id
+        );
+        Ok(())
+    }
+
+    async fn consume(
+        &self,
+        _topic_metadata: &TopicMetadata,
+        messages_metadata: MessagesMetadata,
+        messages: Vec<ConsumedMessage>,
+    ) -> Result<(), Error> {
+        debug!(
+            "Delta sink with ID: {} received: {} messages, partition: {}, 
offset: {}",
+            self.id,
+            messages.len(),
+            messages_metadata.partition_id,
+            messages_metadata.current_offset,
+        );
+
+        // Extract JSON values from consumed messages
+        let mut json_values: Vec<serde_json::Value> = 
Vec::with_capacity(messages.len());
+        for msg in &messages {
+            match &msg.payload {
+                Payload::Json(simd_value) => {
+                    json_values.push(owned_value_to_serde_json(simd_value));
+                }
+                other => {
+                    error!(
+                        "Unsupported payload type: {other}. Delta sink only 
supports JSON payloads."
+                    );
+                    return Err(Error::InvalidPayloadType);
+                }
+            }
+        }
+
+        if json_values.is_empty() {
+            debug!("No JSON values to write");
+            return Ok(());
+        }
+
+        // TODO: all partition consume() calls serialize on this single lock, 
holding it
+        // through flush_and_commit() I/O. fix: per-partition writers keyed by 
partition_id.
+        // Ref: 
https://github.com/apache/iggy/pull/2889/#discussion_r2936719763
+        let mut state_guard = self.state.lock().await;
+        let state = state_guard.as_mut().ok_or_else(|| {
+            error!("Delta sink state not initialized — was open() called?");
+            Error::InvalidState
+        })?;
+
+        // Apply coercions to match Delta table schema
+        for value in &mut json_values {
+            coerce(value, 
&state.coercion_tree).map_err(Error::InvalidRecordValue)?;
+        }
+
+        // Write JSON values to internal Parquet buffers
+        // TODO: Add retry mechanism if write fails.
+        if let Err(e) = state.writer.write(json_values).await {
+            state.writer.reset();
+            error!("Failed to write to Delta writer: {e}");
+            return Err(Error::Storage(format!(
+                "Failed to write to Delta writer: {e}"
+            )));
+        }
+
+        // Flush buffers to object store and commit to Delta log
+        let version = match state.writer.flush_and_commit(&mut 
state.table).await {
+            Ok(v) => v,
+            Err(e) => {
+                state.writer.reset();
+                error!("Failed to flush and commit to Delta table: {e}");
+                return Err(Error::Storage(format!("Failed to flush and commit: 
{e}")));
+            }
+        };
+
+        debug!(
+            "Delta sink with ID: {} committed version {}",
+            self.id, version
+        );
+
+        Ok(())
+    }
+
+    async fn close(&mut self) -> Result<(), Error> {
+        if let Some(mut state) = self.state.lock().await.take()
+            && let Err(e) = state.writer.flush_and_commit(&mut 
state.table).await
+        {
+            error!(
+                "Delta sink with ID: {} failed to flush on close: {e}",
+                self.id
+            );
+            return Err(Error::Storage(format!("Failed to flush on close: 
{e}")));
+        }
+        info!("Delta Lake sink connector with ID: {} is closed.", self.id);
+        Ok(())
+    }

Review Comment:
   consume path calls `writer.reset()` before returning an error; close 
doesn't. consistency nit - not load-bearing since we're shutting down anyway.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to