hubcio commented on code in PR #2889:
URL: https://github.com/apache/iggy/pull/2889#discussion_r2936719770


##########
core/connectors/sinks/delta_sink/src/sink.rs:
##########
@@ -0,0 +1,308 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use crate::DeltaSink;
+use crate::SinkState;
+use crate::coercions::{coerce, create_coercion_tree};
+use crate::storage::build_storage_options;
+use async_trait::async_trait;
+use deltalake::writer::{DeltaWriter, JsonWriter};
+use iggy_connector_sdk::{ConsumedMessage, Error, MessagesMetadata, Payload, 
Sink, TopicMetadata};
+use tracing::{debug, error, info};
+
+#[async_trait]
+impl Sink for DeltaSink {
+    async fn open(&mut self) -> Result<(), Error> {
+        info!(
+            "Opening Delta Lake sink connector with ID: {} for table: {}",
+            self.id, self.config.table_uri
+        );
+
+        let table_url = url::Url::parse(&self.config.table_uri).map_err(|e| {
+            error!("Failed to parse table URI '{}': {e}", 
self.config.table_uri);
+            Error::InitError(format!("Invalid table URI: {e}"))
+        })?;
+
+        info!("Parsed table URI: {}", table_url);
+
+        let storage_options = build_storage_options(&self.config).map_err(|e| {
+            error!("Invalid storage configuration: {e}");
+            Error::InitError(format!("Invalid storage configuration: {e}"))
+        })?;
+
+        let table =
+            match deltalake::open_table_with_storage_options(table_url, 
storage_options.clone())
+                .await
+            {
+                Ok(table) => table,
+                Err(e) => {
+                    error!("Failed to load Delta table: {e}");
+                    return Err(Error::InitError(format!("Failed to load Delta 
table: {e}")));
+                }
+            };
+
+        let kernel_schema = table
+            .snapshot()
+            .map_err(|e| {
+                error!("Failed to get table snapshot: {e}");
+                Error::InitError(format!("Failed to get table snapshot: {e}"))
+            })?
+            .schema();
+        let coercion_tree = create_coercion_tree(&kernel_schema);
+
+        let writer = JsonWriter::for_table(&table).map_err(|e| {
+            error!("Failed to create JsonWriter: {e}");
+            Error::InitError(format!("Failed to create JsonWriter: {e}"))
+        })?;
+
+        *self.state.lock().await = Some(SinkState {
+            table,
+            writer,
+            coercion_tree,
+        });
+
+        info!(
+            "Delta Lake sink connector with ID: {} opened successfully.",
+            self.id
+        );
+        Ok(())
+    }
+
+    async fn consume(
+        &self,
+        _topic_metadata: &TopicMetadata,
+        messages_metadata: MessagesMetadata,
+        messages: Vec<ConsumedMessage>,
+    ) -> Result<(), Error> {
+        debug!(
+            "Delta sink with ID: {} received: {} messages, partition: {}, 
offset: {}",
+            self.id,
+            messages.len(),
+            messages_metadata.partition_id,
+            messages_metadata.current_offset,
+        );
+
+        // Extract JSON values from consumed messages
+        let mut json_values: Vec<serde_json::Value> = 
Vec::with_capacity(messages.len());
+        for msg in &messages {
+            match &msg.payload {
+                Payload::Json(simd_value) => {
+                    json_values.push(owned_value_to_serde_json(simd_value));
+                }
+                other => {
+                    error!(
+                        "Unsupported payload type: {other}. Delta sink only 
supports JSON payloads."
+                    );
+                    return Err(Error::InvalidPayloadType);
+                }
+            }
+        }
+
+        if json_values.is_empty() {
+            debug!("No JSON values to write");
+            return Ok(());
+        }
+
+        let mut state_guard = self.state.lock().await;
+        let state = state_guard.as_mut().ok_or_else(|| {

Review Comment:
   when `state` is `None` (meaning `open()` was not called or failed), 
returning `Error::InvalidConfig` is the wrong semantics - this is a 
lifecycle/invariant violation, not a config error. use `Error::InitError("Delta 
sink state not initialized - was open() called?".to_string())` to match the 
actual problem.



##########
core/connectors/sinks/delta_sink/src/coercions.rs:
##########
@@ -0,0 +1,600 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use deltalake::kernel::Schema as DeltaSchema;
+use deltalake::kernel::{DataType, PrimitiveType};
+
+use chrono::prelude::*;
+use serde_json::Value;
+use std::collections::HashMap;
+use std::str::FromStr;
+
+#[derive(Debug, Clone, PartialEq)]
+#[allow(unused)]

Review Comment:
   all four `CoercionNode` variants are constructed and matched in the 
codebase. the `#[allow(unused)]` is unnecessary and should be removed.



##########
core/integration/tests/connectors/fixtures/delta/fixture.rs:
##########
@@ -0,0 +1,316 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use async_trait::async_trait;
+use deltalake::kernel::{DataType, PrimitiveType, StructField};
+use deltalake::operations::create::CreateBuilder;
+use integration::harness::{TestBinaryError, TestFixture};
+use std::collections::HashMap;
+use std::path::PathBuf;
+use tempfile::TempDir;
+use testcontainers_modules::testcontainers::core::{IntoContainerPort, WaitFor};
+use testcontainers_modules::testcontainers::runners::AsyncRunner;
+use testcontainers_modules::testcontainers::{ContainerAsync, GenericImage, 
ImageExt};
+use tracing::info;
+use uuid::Uuid;
+
+const ENV_SINK_TABLE_URI: &str = 
"IGGY_CONNECTORS_SINK_DELTA_PLUGIN_CONFIG_TABLE_URI";
+const ENV_SINK_PATH: &str = "IGGY_CONNECTORS_SINK_DELTA_PATH";
+const ENV_SINK_STORAGE_BACKEND_TYPE: &str =
+    "IGGY_CONNECTORS_SINK_DELTA_PLUGIN_CONFIG_STORAGE_BACKEND_TYPE";
+const ENV_SINK_AWS_S3_ACCESS_KEY: &str =
+    "IGGY_CONNECTORS_SINK_DELTA_PLUGIN_CONFIG_AWS_S3_ACCESS_KEY";
+const ENV_SINK_AWS_S3_SECRET_KEY: &str =
+    "IGGY_CONNECTORS_SINK_DELTA_PLUGIN_CONFIG_AWS_S3_SECRET_KEY";
+const ENV_SINK_AWS_S3_REGION: &str = 
"IGGY_CONNECTORS_SINK_DELTA_PLUGIN_CONFIG_AWS_S3_REGION";
+const ENV_SINK_AWS_S3_ENDPOINT_URL: &str =
+    "IGGY_CONNECTORS_SINK_DELTA_PLUGIN_CONFIG_AWS_S3_ENDPOINT_URL";
+const ENV_SINK_AWS_S3_ALLOW_HTTP: &str =
+    "IGGY_CONNECTORS_SINK_DELTA_PLUGIN_CONFIG_AWS_S3_ALLOW_HTTP";
+
+const MINIO_IMAGE: &str = "minio/minio";
+const MINIO_TAG: &str = "RELEASE.2025-09-07T16-13-09Z";
+const MINIO_PORT: u16 = 9000;
+const MINIO_CONSOLE_PORT: u16 = 9001;
+const MINIO_ACCESS_KEY: &str = "admin";
+const MINIO_SECRET_KEY: &str = "password";
+const MINIO_BUCKET: &str = "delta-warehouse";
+
+pub struct DeltaFixture {
+    _temp_dir: TempDir,
+    table_path: PathBuf,
+}
+
+impl DeltaFixture {
+    pub async fn wait_for_delta_log(
+        &self,
+        min_versions: usize,
+        max_attempts: usize,
+        interval_ms: u64,
+    ) -> Result<usize, TestBinaryError> {
+        let delta_log_dir = self.table_path.join("_delta_log");
+
+        for _ in 0..max_attempts {
+            let count = Self::count_delta_versions(&delta_log_dir);
+            if count >= min_versions {
+                info!("Found {count} delta log versions (required: 
{min_versions})");
+                return Ok(count);
+            }
+            
tokio::time::sleep(std::time::Duration::from_millis(interval_ms)).await;
+        }
+
+        let final_count = Self::count_delta_versions(&delta_log_dir);
+        Err(TestBinaryError::InvalidState {
+            message: format!(
+                "Expected at least {min_versions} delta log versions, found 
{final_count} after {max_attempts} attempts"
+            ),
+        })
+    }
+
+    async fn create_table(table_uri: &str) -> Result<(), TestBinaryError> {
+        let columns = vec![
+            StructField::new("id", DataType::Primitive(PrimitiveType::Long), 
true),
+            StructField::new("name", 
DataType::Primitive(PrimitiveType::String), true),
+            StructField::new("count", 
DataType::Primitive(PrimitiveType::Integer), true),
+            StructField::new("amount", 
DataType::Primitive(PrimitiveType::Double), true),
+            StructField::new("active", 
DataType::Primitive(PrimitiveType::Boolean), true),
+            StructField::new("timestamp", 
DataType::Primitive(PrimitiveType::Long), true),

Review Comment:
   the test fixture declares `timestamp` as `PrimitiveType::Long`, not 
`PrimitiveType::Timestamp`. this means all integration tests bypass the 
coercion logic entirely. an e2e test with a `Timestamp`-typed column would 
catch issues like the `TimestampNtz` gap and null-to-"null" coercion bug at the 
integration level.



##########
core/connectors/sinks/delta_sink/src/sink.rs:
##########
@@ -0,0 +1,308 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use crate::DeltaSink;
+use crate::SinkState;
+use crate::coercions::{coerce, create_coercion_tree};
+use crate::storage::build_storage_options;
+use async_trait::async_trait;
+use deltalake::writer::{DeltaWriter, JsonWriter};
+use iggy_connector_sdk::{ConsumedMessage, Error, MessagesMetadata, Payload, 
Sink, TopicMetadata};
+use tracing::{debug, error, info};
+
+#[async_trait]
+impl Sink for DeltaSink {
+    async fn open(&mut self) -> Result<(), Error> {
+        info!(
+            "Opening Delta Lake sink connector with ID: {} for table: {}",
+            self.id, self.config.table_uri
+        );
+
+        let table_url = url::Url::parse(&self.config.table_uri).map_err(|e| {
+            error!("Failed to parse table URI '{}': {e}", 
self.config.table_uri);
+            Error::InitError(format!("Invalid table URI: {e}"))
+        })?;
+
+        info!("Parsed table URI: {}", table_url);
+
+        let storage_options = build_storage_options(&self.config).map_err(|e| {
+            error!("Invalid storage configuration: {e}");
+            Error::InitError(format!("Invalid storage configuration: {e}"))
+        })?;
+
+        let table =
+            match deltalake::open_table_with_storage_options(table_url, 
storage_options.clone())
+                .await
+            {
+                Ok(table) => table,
+                Err(e) => {
+                    error!("Failed to load Delta table: {e}");
+                    return Err(Error::InitError(format!("Failed to load Delta 
table: {e}")));
+                }
+            };
+
+        let kernel_schema = table
+            .snapshot()
+            .map_err(|e| {
+                error!("Failed to get table snapshot: {e}");
+                Error::InitError(format!("Failed to get table snapshot: {e}"))
+            })?
+            .schema();
+        let coercion_tree = create_coercion_tree(&kernel_schema);
+
+        let writer = JsonWriter::for_table(&table).map_err(|e| {
+            error!("Failed to create JsonWriter: {e}");
+            Error::InitError(format!("Failed to create JsonWriter: {e}"))
+        })?;
+
+        *self.state.lock().await = Some(SinkState {
+            table,
+            writer,
+            coercion_tree,
+        });
+
+        info!(
+            "Delta Lake sink connector with ID: {} opened successfully.",
+            self.id
+        );
+        Ok(())
+    }
+
+    async fn consume(
+        &self,
+        _topic_metadata: &TopicMetadata,
+        messages_metadata: MessagesMetadata,
+        messages: Vec<ConsumedMessage>,
+    ) -> Result<(), Error> {
+        debug!(
+            "Delta sink with ID: {} received: {} messages, partition: {}, 
offset: {}",
+            self.id,
+            messages.len(),
+            messages_metadata.partition_id,
+            messages_metadata.current_offset,
+        );
+
+        // Extract JSON values from consumed messages
+        let mut json_values: Vec<serde_json::Value> = 
Vec::with_capacity(messages.len());
+        for msg in &messages {
+            match &msg.payload {
+                Payload::Json(simd_value) => {
+                    json_values.push(owned_value_to_serde_json(simd_value));
+                }
+                other => {
+                    error!(
+                        "Unsupported payload type: {other}. Delta sink only 
supports JSON payloads."
+                    );
+                    return Err(Error::InvalidPayloadType);
+                }
+            }
+        }
+
+        if json_values.is_empty() {
+            debug!("No JSON values to write");
+            return Ok(());
+        }
+
+        let mut state_guard = self.state.lock().await;
+        let state = state_guard.as_mut().ok_or_else(|| {
+            error!("Delta sink state not initialized — was open() called?");
+            Error::InvalidConfig
+        })?;
+
+        // Apply coercions to match Delta table schema
+        for value in &mut json_values {
+            coerce(value, &state.coercion_tree);
+        }
+
+        // Write JSON values to internal Parquet buffers
+        state.writer.write(json_values).await.map_err(|e| {

Review Comment:
   if `JsonWriter::write()` partially buffers data before failing, the next 
`consume()` call will call `flush_and_commit` on a buffer containing 
stale/partial data from the failed batch. `reset()` is correctly done for 
`flush_and_commit` failures (line 142) but is missing here for `write` 
failures. either add `reset()` on write error, or verify in deltalake docs that 
`write` is atomic.



##########
core/connectors/sinks/delta_sink/src/coercions.rs:
##########
@@ -0,0 +1,600 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use deltalake::kernel::Schema as DeltaSchema;
+use deltalake::kernel::{DataType, PrimitiveType};
+
+use chrono::prelude::*;
+use serde_json::Value;
+use std::collections::HashMap;
+use std::str::FromStr;
+
+#[derive(Debug, Clone, PartialEq)]
+#[allow(unused)]
+enum CoercionNode {
+    Coercion(Coercion),
+    Tree(CoercionTree),
+    ArrayTree(CoercionTree),
+    ArrayPrimitive(Coercion),
+}
+
+#[derive(Debug, Clone, PartialEq)]
+enum Coercion {
+    ToString,
+    ToTimestamp,
+}
+
+#[derive(Debug, Clone, PartialEq)]
+pub(crate) struct CoercionTree {
+    root: HashMap<String, CoercionNode>,
+}
+
+/// Returns a [`CoercionTree`] so the schema can be walked efficiently level 
by level when performing conversions.
+pub(crate) fn create_coercion_tree(schema: &DeltaSchema) -> CoercionTree {
+    let mut root = HashMap::new();
+
+    for field in schema.fields() {
+        if let Some(node) = build_coercion_node(field.data_type()) {
+            root.insert(field.name().to_string(), node);
+        }
+    }
+
+    CoercionTree { root }
+}
+
+fn build_coercion_node(data_type: &DataType) -> Option<CoercionNode> {
+    match data_type {
+        DataType::Primitive(primitive) => match primitive {
+            PrimitiveType::String => 
Some(CoercionNode::Coercion(Coercion::ToString)),
+            PrimitiveType::Timestamp => 
Some(CoercionNode::Coercion(Coercion::ToTimestamp)),
+            _ => None,
+        },
+        DataType::Struct(st) => {
+            let nested_context = create_coercion_tree(st);
+            if !nested_context.root.is_empty() {
+                Some(CoercionNode::Tree(nested_context))
+            } else {
+                None
+            }
+        }
+        DataType::Array(array) => {
+            build_coercion_node(array.element_type()).and_then(|node| match 
node {
+                CoercionNode::Coercion(c) => 
Some(CoercionNode::ArrayPrimitive(c)),
+                CoercionNode::Tree(t) => Some(CoercionNode::ArrayTree(t)),
+                _ => None,
+            })
+        }
+        _ => None,
+    }
+}
+
+/// Applies all data coercions specified by the [`CoercionTree`] to the 
[`Value`].
+pub(crate) fn coerce(value: &mut Value, coercion_tree: &CoercionTree) {
+    if let Some(context) = value.as_object_mut() {
+        for (field_name, coercion) in coercion_tree.root.iter() {
+            if let Some(value) = context.get_mut(field_name) {
+                apply_coercion(value, coercion);
+            }
+        }
+    }
+}
+
+fn apply_coercion(value: &mut Value, node: &CoercionNode) {
+    match node {
+        CoercionNode::Coercion(Coercion::ToString) => {
+            if !value.is_string() {

Review Comment:
   when a value is JSON `null` and the delta column is `String`, 
`value.to_string()` produces the literal 4-char string `"null"` rather than 
preserving null. downstream consumers reading the delta table cannot 
distinguish actual nulls from the string `"null"`. the test at line 512-514 
asserts this behavior, but it's semantically wrong - JSON null should map to 
SQL NULL. fix: add `if value.is_null() { return; }` before the `is_string()` 
check.



##########
core/connectors/sinks/delta_sink/src/storage.rs:
##########
@@ -0,0 +1,277 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use crate::DeltaSinkConfig;
+use iggy_connector_sdk::Error;
+use std::collections::HashMap;
+
+pub(crate) fn build_storage_options(
+    config: &DeltaSinkConfig,
+) -> Result<HashMap<String, String>, Error> {
+    let mut opts = HashMap::new();
+
+    match config.storage_backend_type.as_deref() {
+        Some("s3") => {
+            let access_key = config
+                .aws_s3_access_key
+                .as_ref()
+                .ok_or(Error::InvalidConfig)?;
+            let secret_key = config
+                .aws_s3_secret_key
+                .as_ref()
+                .ok_or(Error::InvalidConfig)?;
+            let region = 
config.aws_s3_region.as_ref().ok_or(Error::InvalidConfig)?;
+            let endpoint_url = config
+                .aws_s3_endpoint_url
+                .as_ref()
+                .ok_or(Error::InvalidConfig)?;
+            let allow_http = 
config.aws_s3_allow_http.ok_or(Error::InvalidConfig)?;
+
+            opts.insert("AWS_ACCESS_KEY_ID".into(), access_key.clone());
+            opts.insert("AWS_SECRET_ACCESS_KEY".into(), secret_key.clone());
+            opts.insert("AWS_REGION".into(), region.clone());
+            opts.insert("AWS_ENDPOINT_URL".into(), endpoint_url.clone());
+            opts.insert("AWS_ALLOW_HTTP".into(), allow_http.to_string());
+            opts.insert("AWS_S3_ALLOW_HTTP".into(), allow_http.to_string());
+        }
+        Some("azure") => {

Review Comment:
   the azure path requires all four fields: `account_name`, `account_key`, 
`sas_token`, and `container_name`. in practice, azure auth uses either an 
account key OR a SAS token, never both. requiring both means users who only 
have a SAS token (common in restricted environments) can't use the connector. 
either make these two auth fields alternatives with validation, or make each 
optional and pass through whichever is provided.



##########
core/connectors/sinks/delta_sink/src/sink.rs:
##########
@@ -0,0 +1,308 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use crate::DeltaSink;
+use crate::SinkState;
+use crate::coercions::{coerce, create_coercion_tree};
+use crate::storage::build_storage_options;
+use async_trait::async_trait;
+use deltalake::writer::{DeltaWriter, JsonWriter};
+use iggy_connector_sdk::{ConsumedMessage, Error, MessagesMetadata, Payload, 
Sink, TopicMetadata};
+use tracing::{debug, error, info};
+
+#[async_trait]
+impl Sink for DeltaSink {
+    async fn open(&mut self) -> Result<(), Error> {
+        info!(
+            "Opening Delta Lake sink connector with ID: {} for table: {}",
+            self.id, self.config.table_uri
+        );
+
+        let table_url = url::Url::parse(&self.config.table_uri).map_err(|e| {
+            error!("Failed to parse table URI '{}': {e}", 
self.config.table_uri);
+            Error::InitError(format!("Invalid table URI: {e}"))
+        })?;
+
+        info!("Parsed table URI: {}", table_url);
+
+        let storage_options = build_storage_options(&self.config).map_err(|e| {
+            error!("Invalid storage configuration: {e}");
+            Error::InitError(format!("Invalid storage configuration: {e}"))
+        })?;
+
+        let table =
+            match deltalake::open_table_with_storage_options(table_url, 
storage_options.clone())
+                .await
+            {
+                Ok(table) => table,
+                Err(e) => {
+                    error!("Failed to load Delta table: {e}");
+                    return Err(Error::InitError(format!("Failed to load Delta 
table: {e}")));
+                }
+            };
+
+        let kernel_schema = table
+            .snapshot()
+            .map_err(|e| {
+                error!("Failed to get table snapshot: {e}");
+                Error::InitError(format!("Failed to get table snapshot: {e}"))
+            })?
+            .schema();
+        let coercion_tree = create_coercion_tree(&kernel_schema);
+
+        let writer = JsonWriter::for_table(&table).map_err(|e| {
+            error!("Failed to create JsonWriter: {e}");
+            Error::InitError(format!("Failed to create JsonWriter: {e}"))
+        })?;
+
+        *self.state.lock().await = Some(SinkState {
+            table,
+            writer,
+            coercion_tree,
+        });
+
+        info!(
+            "Delta Lake sink connector with ID: {} opened successfully.",
+            self.id
+        );
+        Ok(())
+    }
+
+    async fn consume(
+        &self,
+        _topic_metadata: &TopicMetadata,
+        messages_metadata: MessagesMetadata,
+        messages: Vec<ConsumedMessage>,
+    ) -> Result<(), Error> {
+        debug!(
+            "Delta sink with ID: {} received: {} messages, partition: {}, 
offset: {}",
+            self.id,
+            messages.len(),
+            messages_metadata.partition_id,
+            messages_metadata.current_offset,
+        );
+
+        // Extract JSON values from consumed messages
+        let mut json_values: Vec<serde_json::Value> = 
Vec::with_capacity(messages.len());
+        for msg in &messages {
+            match &msg.payload {
+                Payload::Json(simd_value) => {
+                    json_values.push(owned_value_to_serde_json(simd_value));
+                }
+                other => {
+                    error!(
+                        "Unsupported payload type: {other}. Delta sink only 
supports JSON payloads."
+                    );
+                    return Err(Error::InvalidPayloadType);
+                }
+            }
+        }
+
+        if json_values.is_empty() {
+            debug!("No JSON values to write");
+            return Ok(());
+        }
+
+        let mut state_guard = self.state.lock().await;
+        let state = state_guard.as_mut().ok_or_else(|| {
+            error!("Delta sink state not initialized — was open() called?");
+            Error::InvalidConfig
+        })?;
+
+        // Apply coercions to match Delta table schema
+        for value in &mut json_values {
+            coerce(value, &state.coercion_tree);
+        }
+
+        // Write JSON values to internal Parquet buffers
+        state.writer.write(json_values).await.map_err(|e| {
+            error!("Failed to write to Delta writer: {e}");
+            Error::Storage(format!("Failed to write to Delta writer: {e}"))
+        })?;
+
+        // Flush buffers to object store and commit to Delta log
+        let version = match state.writer.flush_and_commit(&mut 
state.table).await {

Review Comment:
   every `consume()` call does `flush_and_commit`, which creates a new parquet 
file and a new JSON transaction log entry in `_delta_log/`. at high throughput 
with, say, 1000-message batches at 1M ops/sec, that's 1000 parquet files and 
1000 log entries per second. delta lake degrades catastrophically under this - 
metadata parsing slows, checkpoint overhead grows, and cloud object store LIST 
calls become a bottleneck. the sink needs a buffering strategy: accumulate 
across multiple `consume()` calls and flush on a configurable row count, byte 
threshold, or time interval - not per batch.



##########
core/connectors/sinks/delta_sink/src/lib.rs:
##########
@@ -0,0 +1,90 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use deltalake::DeltaTable;
+use deltalake::writer::JsonWriter;
+use iggy_connector_sdk::sink_connector;
+use serde::{Deserialize, Serialize};
+use tokio::sync::Mutex;
+
+mod coercions;
+mod sink;
+mod storage;
+
+use crate::coercions::CoercionTree;
+
+sink_connector!(DeltaSink);
+
+#[derive(Debug)]
+pub struct DeltaSink {
+    id: u32,
+    config: DeltaSinkConfig,
+    state: Mutex<Option<SinkState>>,
+}
+
+#[derive(Debug)]
+struct SinkState {
+    table: DeltaTable,
+    writer: JsonWriter,
+    coercion_tree: CoercionTree,
+}
+
+#[derive(Debug, Serialize, Deserialize)]

Review Comment:
   `DeltaSinkConfig` derives `Debug`, which means AWS secret keys, Azure 
account keys, and GCS service account keys will appear in debug output / logs. 
consider implementing `Debug` manually to redact sensitive fields, or wrapping 
them in a newtype that redacts on display.



##########
core/connectors/sinks/delta_sink/src/storage.rs:
##########
@@ -0,0 +1,277 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use crate::DeltaSinkConfig;
+use iggy_connector_sdk::Error;
+use std::collections::HashMap;
+
+pub(crate) fn build_storage_options(
+    config: &DeltaSinkConfig,
+) -> Result<HashMap<String, String>, Error> {
+    let mut opts = HashMap::new();
+
+    match config.storage_backend_type.as_deref() {
+        Some("s3") => {
+            let access_key = config
+                .aws_s3_access_key
+                .as_ref()
+                .ok_or(Error::InvalidConfig)?;
+            let secret_key = config
+                .aws_s3_secret_key
+                .as_ref()
+                .ok_or(Error::InvalidConfig)?;
+            let region = 
config.aws_s3_region.as_ref().ok_or(Error::InvalidConfig)?;
+            let endpoint_url = config

Review Comment:
   also line 43.
   
   `endpoint_url` and `allow_http` are only needed for S3-compatible services 
(MinIO, LocalStack), not actual AWS S3. making them mandatory breaks the 
vanilla AWS use case. these should be optional with sensible defaults (no 
custom endpoint, HTTP disallowed).



##########
core/connectors/sinks/delta_sink/src/coercions.rs:
##########
@@ -0,0 +1,600 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use deltalake::kernel::Schema as DeltaSchema;
+use deltalake::kernel::{DataType, PrimitiveType};
+
+use chrono::prelude::*;
+use serde_json::Value;
+use std::collections::HashMap;
+use std::str::FromStr;
+
+#[derive(Debug, Clone, PartialEq)]
+#[allow(unused)]
+enum CoercionNode {
+    Coercion(Coercion),
+    Tree(CoercionTree),
+    ArrayTree(CoercionTree),
+    ArrayPrimitive(Coercion),
+}
+
+#[derive(Debug, Clone, PartialEq)]
+enum Coercion {
+    ToString,
+    ToTimestamp,
+}
+
+#[derive(Debug, Clone, PartialEq)]
+pub(crate) struct CoercionTree {
+    root: HashMap<String, CoercionNode>,
+}
+
+/// Returns a [`CoercionTree`] so the schema can be walked efficiently level 
by level when performing conversions.
+pub(crate) fn create_coercion_tree(schema: &DeltaSchema) -> CoercionTree {
+    let mut root = HashMap::new();
+
+    for field in schema.fields() {
+        if let Some(node) = build_coercion_node(field.data_type()) {
+            root.insert(field.name().to_string(), node);
+        }
+    }
+
+    CoercionTree { root }
+}
+
+fn build_coercion_node(data_type: &DataType) -> Option<CoercionNode> {
+    match data_type {
+        DataType::Primitive(primitive) => match primitive {
+            PrimitiveType::String => 
Some(CoercionNode::Coercion(Coercion::ToString)),
+            PrimitiveType::Timestamp => 
Some(CoercionNode::Coercion(Coercion::ToTimestamp)),
+            _ => None,
+        },
+        DataType::Struct(st) => {
+            let nested_context = create_coercion_tree(st);
+            if !nested_context.root.is_empty() {
+                Some(CoercionNode::Tree(nested_context))
+            } else {
+                None
+            }
+        }
+        DataType::Array(array) => {
+            build_coercion_node(array.element_type()).and_then(|node| match 
node {
+                CoercionNode::Coercion(c) => 
Some(CoercionNode::ArrayPrimitive(c)),
+                CoercionNode::Tree(t) => Some(CoercionNode::ArrayTree(t)),
+                _ => None,
+            })
+        }
+        _ => None,
+    }
+}
+
+/// Applies all data coercions specified by the [`CoercionTree`] to the 
[`Value`].
+pub(crate) fn coerce(value: &mut Value, coercion_tree: &CoercionTree) {
+    if let Some(context) = value.as_object_mut() {
+        for (field_name, coercion) in coercion_tree.root.iter() {
+            if let Some(value) = context.get_mut(field_name) {
+                apply_coercion(value, coercion);
+            }
+        }
+    }
+}
+
+fn apply_coercion(value: &mut Value, node: &CoercionNode) {
+    match node {
+        CoercionNode::Coercion(Coercion::ToString) => {
+            if !value.is_string() {
+                *value = Value::String(value.to_string());
+            }
+        }
+        CoercionNode::Coercion(Coercion::ToTimestamp) => {
+            if let Some(as_str) = value.as_str()
+                && let Some(parsed) = string_to_timestamp(as_str)
+            {
+                *value = parsed
+            }
+        }
+        CoercionNode::Tree(tree) => {
+            for (name, node) in tree.root.iter() {
+                let fields = value.as_object_mut();
+                if let Some(fields) = fields
+                    && let Some(value) = fields.get_mut(name)
+                {
+                    apply_coercion(value, node);
+                }
+            }
+        }
+        CoercionNode::ArrayPrimitive(coercion) => {
+            let values = value.as_array_mut();
+            if let Some(values) = values {
+                let node = CoercionNode::Coercion(coercion.clone());
+                for value in values {
+                    apply_coercion(value, &node);
+                }
+            }
+        }
+        CoercionNode::ArrayTree(tree) => {
+            let values = value.as_array_mut();
+            if let Some(values) = values {
+                let node = CoercionNode::Tree(tree.clone());
+                for value in values {
+                    apply_coercion(value, &node);
+                }
+            }
+        }
+    }
+}
+
+fn string_to_timestamp(string: &str) -> Option<Value> {
+    let parsed = DateTime::from_str(string);
+    if let Err(e) = parsed {
+        tracing::error!(

Review Comment:
   an unparseable timestamp string is an expected data quality issue, not an 
infrastructure error. using `error!` level here will flood monitoring/alerting 
under normal operation with dirty data. downgrade to `warn!`.



##########
core/connectors/sinks/delta_sink/src/sink.rs:
##########
@@ -0,0 +1,308 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use crate::DeltaSink;
+use crate::SinkState;
+use crate::coercions::{coerce, create_coercion_tree};
+use crate::storage::build_storage_options;
+use async_trait::async_trait;
+use deltalake::writer::{DeltaWriter, JsonWriter};
+use iggy_connector_sdk::{ConsumedMessage, Error, MessagesMetadata, Payload, 
Sink, TopicMetadata};
+use tracing::{debug, error, info};
+
+#[async_trait]
+impl Sink for DeltaSink {
+    async fn open(&mut self) -> Result<(), Error> {
+        info!(
+            "Opening Delta Lake sink connector with ID: {} for table: {}",
+            self.id, self.config.table_uri
+        );
+
+        let table_url = url::Url::parse(&self.config.table_uri).map_err(|e| {
+            error!("Failed to parse table URI '{}': {e}", 
self.config.table_uri);
+            Error::InitError(format!("Invalid table URI: {e}"))
+        })?;
+
+        info!("Parsed table URI: {}", table_url);
+
+        let storage_options = build_storage_options(&self.config).map_err(|e| {
+            error!("Invalid storage configuration: {e}");
+            Error::InitError(format!("Invalid storage configuration: {e}"))
+        })?;
+
+        let table =
+            match deltalake::open_table_with_storage_options(table_url, 
storage_options.clone())
+                .await
+            {
+                Ok(table) => table,
+                Err(e) => {
+                    error!("Failed to load Delta table: {e}");
+                    return Err(Error::InitError(format!("Failed to load Delta 
table: {e}")));
+                }
+            };
+
+        let kernel_schema = table
+            .snapshot()
+            .map_err(|e| {
+                error!("Failed to get table snapshot: {e}");
+                Error::InitError(format!("Failed to get table snapshot: {e}"))
+            })?
+            .schema();
+        let coercion_tree = create_coercion_tree(&kernel_schema);
+
+        let writer = JsonWriter::for_table(&table).map_err(|e| {
+            error!("Failed to create JsonWriter: {e}");
+            Error::InitError(format!("Failed to create JsonWriter: {e}"))
+        })?;
+
+        *self.state.lock().await = Some(SinkState {
+            table,
+            writer,
+            coercion_tree,
+        });
+
+        info!(
+            "Delta Lake sink connector with ID: {} opened successfully.",
+            self.id
+        );
+        Ok(())
+    }
+
+    async fn consume(
+        &self,
+        _topic_metadata: &TopicMetadata,
+        messages_metadata: MessagesMetadata,
+        messages: Vec<ConsumedMessage>,
+    ) -> Result<(), Error> {
+        debug!(
+            "Delta sink with ID: {} received: {} messages, partition: {}, 
offset: {}",
+            self.id,
+            messages.len(),
+            messages_metadata.partition_id,
+            messages_metadata.current_offset,
+        );
+
+        // Extract JSON values from consumed messages
+        let mut json_values: Vec<serde_json::Value> = 
Vec::with_capacity(messages.len());
+        for msg in &messages {
+            match &msg.payload {
+                Payload::Json(simd_value) => {
+                    json_values.push(owned_value_to_serde_json(simd_value));
+                }
+                other => {
+                    error!(
+                        "Unsupported payload type: {other}. Delta sink only 
supports JSON payloads."
+                    );
+                    return Err(Error::InvalidPayloadType);
+                }
+            }
+        }
+
+        if json_values.is_empty() {
+            debug!("No JSON values to write");
+            return Ok(());
+        }
+
+        let mut state_guard = self.state.lock().await;
+        let state = state_guard.as_mut().ok_or_else(|| {
+            error!("Delta sink state not initialized — was open() called?");
+            Error::InvalidConfig
+        })?;
+
+        // Apply coercions to match Delta table schema
+        for value in &mut json_values {
+            coerce(value, &state.coercion_tree);
+        }
+
+        // Write JSON values to internal Parquet buffers
+        state.writer.write(json_values).await.map_err(|e| {
+            error!("Failed to write to Delta writer: {e}");
+            Error::Storage(format!("Failed to write to Delta writer: {e}"))
+        })?;
+
+        // Flush buffers to object store and commit to Delta log
+        let version = match state.writer.flush_and_commit(&mut 
state.table).await {
+            Ok(v) => v,
+            Err(e) => {
+                state.writer.reset();
+                error!("Failed to flush and commit to Delta table: {e}");
+                return Err(Error::Storage(format!("Failed to flush and commit: 
{e}")));
+            }
+        };
+
+        debug!(
+            "Delta sink with ID: {} committed version {}",
+            self.id, version
+        );
+
+        Ok(())
+    }
+
+    async fn close(&mut self) -> Result<(), Error> {
+        if let Some(mut state) = self.state.lock().await.take()
+            && let Err(e) = state.writer.flush_and_commit(&mut 
state.table).await
+        {
+            error!(
+                "Delta sink with ID: {} failed to flush on close: {e}",
+                self.id
+            );
+            return Err(Error::Storage(format!("Failed to flush on close: 
{e}")));
+        }
+        info!("Delta Lake sink connector with ID: {} is closed.", self.id);
+        Ok(())
+    }
+}
+
+fn owned_value_to_serde_json(value: &simd_json::OwnedValue) -> 
serde_json::Value {

Review Comment:
   this `simd_json::OwnedValue -> serde_json::Value` conversion is copy-pasted 
identically in `elasticsearch_sink/src/lib.rs:39`. should live in 
`iggy_connector_sdk` or a shared utility crate. when the conversion strategy 
changes, both need updating independently right now.
   
   i commented same in https://github.com/apache/iggy/pull/2925



##########
core/connectors/sinks/delta_sink/src/sink.rs:
##########
@@ -0,0 +1,308 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use crate::DeltaSink;
+use crate::SinkState;
+use crate::coercions::{coerce, create_coercion_tree};
+use crate::storage::build_storage_options;
+use async_trait::async_trait;
+use deltalake::writer::{DeltaWriter, JsonWriter};
+use iggy_connector_sdk::{ConsumedMessage, Error, MessagesMetadata, Payload, 
Sink, TopicMetadata};
+use tracing::{debug, error, info};
+
+#[async_trait]
+impl Sink for DeltaSink {
+    async fn open(&mut self) -> Result<(), Error> {
+        info!(
+            "Opening Delta Lake sink connector with ID: {} for table: {}",
+            self.id, self.config.table_uri
+        );
+
+        let table_url = url::Url::parse(&self.config.table_uri).map_err(|e| {
+            error!("Failed to parse table URI '{}': {e}", 
self.config.table_uri);
+            Error::InitError(format!("Invalid table URI: {e}"))
+        })?;
+
+        info!("Parsed table URI: {}", table_url);
+
+        let storage_options = build_storage_options(&self.config).map_err(|e| {
+            error!("Invalid storage configuration: {e}");
+            Error::InitError(format!("Invalid storage configuration: {e}"))
+        })?;
+
+        let table =
+            match deltalake::open_table_with_storage_options(table_url, 
storage_options.clone())

Review Comment:
   `build_storage_options` returns an owned `HashMap` which is then 
`.clone()`-d just to pass to `open_table_with_storage_options`. since the 
original is never used again after this call, just pass the owned map directly 
and drop the clone.



##########
core/connectors/sinks/delta_sink/src/coercions.rs:
##########
@@ -0,0 +1,600 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use deltalake::kernel::Schema as DeltaSchema;
+use deltalake::kernel::{DataType, PrimitiveType};
+
+use chrono::prelude::*;
+use serde_json::Value;
+use std::collections::HashMap;
+use std::str::FromStr;
+
+#[derive(Debug, Clone, PartialEq)]
+#[allow(unused)]
+enum CoercionNode {
+    Coercion(Coercion),
+    Tree(CoercionTree),
+    ArrayTree(CoercionTree),
+    ArrayPrimitive(Coercion),
+}
+
+#[derive(Debug, Clone, PartialEq)]
+enum Coercion {
+    ToString,
+    ToTimestamp,
+}
+
+#[derive(Debug, Clone, PartialEq)]
+pub(crate) struct CoercionTree {
+    root: HashMap<String, CoercionNode>,
+}
+
+/// Returns a [`CoercionTree`] so the schema can be walked efficiently level 
by level when performing conversions.
+pub(crate) fn create_coercion_tree(schema: &DeltaSchema) -> CoercionTree {
+    let mut root = HashMap::new();
+
+    for field in schema.fields() {
+        if let Some(node) = build_coercion_node(field.data_type()) {
+            root.insert(field.name().to_string(), node);
+        }
+    }
+
+    CoercionTree { root }
+}
+
+fn build_coercion_node(data_type: &DataType) -> Option<CoercionNode> {
+    match data_type {
+        DataType::Primitive(primitive) => match primitive {
+            PrimitiveType::String => 
Some(CoercionNode::Coercion(Coercion::ToString)),
+            PrimitiveType::Timestamp => 
Some(CoercionNode::Coercion(Coercion::ToTimestamp)),
+            _ => None,
+        },
+        DataType::Struct(st) => {
+            let nested_context = create_coercion_tree(st);
+            if !nested_context.root.is_empty() {
+                Some(CoercionNode::Tree(nested_context))
+            } else {
+                None
+            }
+        }
+        DataType::Array(array) => {
+            build_coercion_node(array.element_type()).and_then(|node| match 
node {
+                CoercionNode::Coercion(c) => 
Some(CoercionNode::ArrayPrimitive(c)),
+                CoercionNode::Tree(t) => Some(CoercionNode::ArrayTree(t)),
+                _ => None,
+            })
+        }
+        _ => None,
+    }
+}
+
+/// Applies all data coercions specified by the [`CoercionTree`] to the 
[`Value`].
+pub(crate) fn coerce(value: &mut Value, coercion_tree: &CoercionTree) {
+    if let Some(context) = value.as_object_mut() {
+        for (field_name, coercion) in coercion_tree.root.iter() {
+            if let Some(value) = context.get_mut(field_name) {
+                apply_coercion(value, coercion);
+            }
+        }
+    }
+}
+
+fn apply_coercion(value: &mut Value, node: &CoercionNode) {
+    match node {
+        CoercionNode::Coercion(Coercion::ToString) => {
+            if !value.is_string() {
+                *value = Value::String(value.to_string());
+            }
+        }
+        CoercionNode::Coercion(Coercion::ToTimestamp) => {
+            if let Some(as_str) = value.as_str()
+                && let Some(parsed) = string_to_timestamp(as_str)
+            {
+                *value = parsed
+            }
+        }
+        CoercionNode::Tree(tree) => {
+            for (name, node) in tree.root.iter() {
+                let fields = value.as_object_mut();
+                if let Some(fields) = fields
+                    && let Some(value) = fields.get_mut(name)
+                {
+                    apply_coercion(value, node);
+                }
+            }
+        }
+        CoercionNode::ArrayPrimitive(coercion) => {
+            let values = value.as_array_mut();
+            if let Some(values) = values {
+                let node = CoercionNode::Coercion(coercion.clone());

Review Comment:
   also line 133.
   
   `coercion.clone()` and `tree.clone()` are called once per array-typed field 
per message. `CoercionTree` contains a `HashMap<String, CoercionNode>`, so 
cloning it for every array in every message is a heap allocation storm on wide 
schemas. refactor to pass `&Coercion` / `&CoercionTree` directly instead of 
wrapping in a new `CoercionNode`.



##########
core/connectors/sinks/delta_sink/src/coercions.rs:
##########
@@ -0,0 +1,600 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use deltalake::kernel::Schema as DeltaSchema;
+use deltalake::kernel::{DataType, PrimitiveType};
+
+use chrono::prelude::*;
+use serde_json::Value;
+use std::collections::HashMap;
+use std::str::FromStr;
+
+#[derive(Debug, Clone, PartialEq)]
+#[allow(unused)]
+enum CoercionNode {
+    Coercion(Coercion),
+    Tree(CoercionTree),
+    ArrayTree(CoercionTree),
+    ArrayPrimitive(Coercion),
+}
+
+#[derive(Debug, Clone, PartialEq)]
+enum Coercion {
+    ToString,
+    ToTimestamp,
+}
+
+#[derive(Debug, Clone, PartialEq)]
+pub(crate) struct CoercionTree {
+    root: HashMap<String, CoercionNode>,
+}
+
+/// Returns a [`CoercionTree`] so the schema can be walked efficiently level 
by level when performing conversions.
+pub(crate) fn create_coercion_tree(schema: &DeltaSchema) -> CoercionTree {
+    let mut root = HashMap::new();
+
+    for field in schema.fields() {
+        if let Some(node) = build_coercion_node(field.data_type()) {
+            root.insert(field.name().to_string(), node);
+        }
+    }
+
+    CoercionTree { root }
+}
+
+fn build_coercion_node(data_type: &DataType) -> Option<CoercionNode> {
+    match data_type {
+        DataType::Primitive(primitive) => match primitive {
+            PrimitiveType::String => 
Some(CoercionNode::Coercion(Coercion::ToString)),
+            PrimitiveType::Timestamp => 
Some(CoercionNode::Coercion(Coercion::ToTimestamp)),
+            _ => None,
+        },
+        DataType::Struct(st) => {
+            let nested_context = create_coercion_tree(st);
+            if !nested_context.root.is_empty() {
+                Some(CoercionNode::Tree(nested_context))
+            } else {
+                None
+            }
+        }
+        DataType::Array(array) => {
+            build_coercion_node(array.element_type()).and_then(|node| match 
node {
+                CoercionNode::Coercion(c) => 
Some(CoercionNode::ArrayPrimitive(c)),
+                CoercionNode::Tree(t) => Some(CoercionNode::ArrayTree(t)),
+                _ => None,
+            })
+        }
+        _ => None,

Review Comment:
   `build_coercion_node` handles `Primitive`, `Struct`, and `Array` but falls 
through to `None` for `Map` and `Variant`. if a delta table has a `Map<String, 
Timestamp>` column, timestamp values inside the map won't be coerced. if 
intentional, add a comment documenting this as a known limitation so future 
maintainers know it's deliberate.



##########
core/connectors/sinks/delta_sink/Cargo.toml:
##########
@@ -0,0 +1,48 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+[package]
+name = "iggy_connector_delta_sink"
+version = "0.1.0"
+edition = "2024"
+license = "Apache-2.0"
+keywords = ["iggy", "messaging", "streaming"]
+categories = ["command-line-utilities", "database", "network-programming"]
+homepage = "https://iggy.apache.org";
+documentation = "https://iggy.apache.org/docs";
+repository = "https://github.com/apache/iggy";
+readme = "../../README.md"
+
+[package.metadata.cargo-machete]
+ignored = ["dashmap", "once_cell"]
+
+[lib]
+crate-type = ["cdylib", "lib"]
+
+[dependencies]
+async-trait = { workspace = true }
+chrono = { workspace = true }
+dashmap = { workspace = true }

Review Comment:
   also line 42.
   
   `dashmap` and `once_cell` are listed in `[dependencies]` and then suppressed 
via `[package.metadata.cargo-machete] ignored`. if they're truly unused, remove 
them rather than suppressing the warning.



##########
core/connectors/sinks/delta_sink/src/storage.rs:
##########
@@ -0,0 +1,277 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use crate::DeltaSinkConfig;
+use iggy_connector_sdk::Error;
+use std::collections::HashMap;
+
+pub(crate) fn build_storage_options(
+    config: &DeltaSinkConfig,
+) -> Result<HashMap<String, String>, Error> {
+    let mut opts = HashMap::new();
+
+    match config.storage_backend_type.as_deref() {
+        Some("s3") => {
+            let access_key = config
+                .aws_s3_access_key
+                .as_ref()
+                .ok_or(Error::InvalidConfig)?;

Review Comment:
   also at lines 37, 38, 42, 43, 56, 60, 64, 68, 80, 89.
   
   every `.ok_or(Error::InvalidConfig)?` returns the same variant with no 
indication of which field is missing or which backend was selected. a user who 
misconfigures one field gets `"Invalid config"` with zero diagnostic. use 
`Error::InitError(format!("S3 backend requires 'aws_s3_access_key'"))` or 
similar so the error message actually tells the user what to fix.



##########
core/connectors/sinks/delta_sink/src/coercions.rs:
##########
@@ -0,0 +1,600 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use deltalake::kernel::Schema as DeltaSchema;
+use deltalake::kernel::{DataType, PrimitiveType};
+
+use chrono::prelude::*;
+use serde_json::Value;
+use std::collections::HashMap;
+use std::str::FromStr;
+
+#[derive(Debug, Clone, PartialEq)]
+#[allow(unused)]
+enum CoercionNode {
+    Coercion(Coercion),
+    Tree(CoercionTree),
+    ArrayTree(CoercionTree),
+    ArrayPrimitive(Coercion),
+}
+
+#[derive(Debug, Clone, PartialEq)]
+enum Coercion {
+    ToString,
+    ToTimestamp,
+}
+
+#[derive(Debug, Clone, PartialEq)]
+pub(crate) struct CoercionTree {
+    root: HashMap<String, CoercionNode>,
+}
+
+/// Returns a [`CoercionTree`] so the schema can be walked efficiently level 
by level when performing conversions.
+pub(crate) fn create_coercion_tree(schema: &DeltaSchema) -> CoercionTree {
+    let mut root = HashMap::new();
+
+    for field in schema.fields() {
+        if let Some(node) = build_coercion_node(field.data_type()) {
+            root.insert(field.name().to_string(), node);
+        }
+    }
+
+    CoercionTree { root }
+}
+
+fn build_coercion_node(data_type: &DataType) -> Option<CoercionNode> {
+    match data_type {
+        DataType::Primitive(primitive) => match primitive {
+            PrimitiveType::String => 
Some(CoercionNode::Coercion(Coercion::ToString)),
+            PrimitiveType::Timestamp => 
Some(CoercionNode::Coercion(Coercion::ToTimestamp)),
+            _ => None,
+        },
+        DataType::Struct(st) => {
+            let nested_context = create_coercion_tree(st);
+            if !nested_context.root.is_empty() {
+                Some(CoercionNode::Tree(nested_context))
+            } else {
+                None
+            }
+        }
+        DataType::Array(array) => {
+            build_coercion_node(array.element_type()).and_then(|node| match 
node {
+                CoercionNode::Coercion(c) => 
Some(CoercionNode::ArrayPrimitive(c)),
+                CoercionNode::Tree(t) => Some(CoercionNode::ArrayTree(t)),
+                _ => None,
+            })
+        }
+        _ => None,
+    }
+}
+
+/// Applies all data coercions specified by the [`CoercionTree`] to the 
[`Value`].
+pub(crate) fn coerce(value: &mut Value, coercion_tree: &CoercionTree) {
+    if let Some(context) = value.as_object_mut() {
+        for (field_name, coercion) in coercion_tree.root.iter() {
+            if let Some(value) = context.get_mut(field_name) {
+                apply_coercion(value, coercion);
+            }
+        }
+    }
+}
+
+fn apply_coercion(value: &mut Value, node: &CoercionNode) {
+    match node {
+        CoercionNode::Coercion(Coercion::ToString) => {
+            if !value.is_string() {
+                *value = Value::String(value.to_string());
+            }
+        }
+        CoercionNode::Coercion(Coercion::ToTimestamp) => {

Review Comment:
   when `string_to_timestamp` returns `None` (unparseable string), the value 
remains as a JSON string. `JsonWriter` then receives a string where it expects 
an i64 microsecond timestamp, causing `writer.write()` at sink.rs:133 to fail - 
rejecting the entire batch (potentially hundreds of valid messages) due to a 
single malformed timestamp in one message. either fail fast with an explicit 
error naming the field/value, or set the value to null (if the column is 
nullable) to preserve the rest of the batch.



##########
core/connectors/sinks/delta_sink/src/sink.rs:
##########
@@ -0,0 +1,308 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use crate::DeltaSink;
+use crate::SinkState;
+use crate::coercions::{coerce, create_coercion_tree};
+use crate::storage::build_storage_options;
+use async_trait::async_trait;
+use deltalake::writer::{DeltaWriter, JsonWriter};
+use iggy_connector_sdk::{ConsumedMessage, Error, MessagesMetadata, Payload, 
Sink, TopicMetadata};
+use tracing::{debug, error, info};
+
+#[async_trait]
+impl Sink for DeltaSink {
+    async fn open(&mut self) -> Result<(), Error> {
+        info!(
+            "Opening Delta Lake sink connector with ID: {} for table: {}",
+            self.id, self.config.table_uri
+        );
+
+        let table_url = url::Url::parse(&self.config.table_uri).map_err(|e| {
+            error!("Failed to parse table URI '{}': {e}", 
self.config.table_uri);
+            Error::InitError(format!("Invalid table URI: {e}"))
+        })?;
+
+        info!("Parsed table URI: {}", table_url);
+
+        let storage_options = build_storage_options(&self.config).map_err(|e| {
+            error!("Invalid storage configuration: {e}");
+            Error::InitError(format!("Invalid storage configuration: {e}"))
+        })?;
+
+        let table =
+            match deltalake::open_table_with_storage_options(table_url, 
storage_options.clone())
+                .await
+            {
+                Ok(table) => table,
+                Err(e) => {
+                    error!("Failed to load Delta table: {e}");
+                    return Err(Error::InitError(format!("Failed to load Delta 
table: {e}")));
+                }
+            };
+
+        let kernel_schema = table
+            .snapshot()
+            .map_err(|e| {
+                error!("Failed to get table snapshot: {e}");
+                Error::InitError(format!("Failed to get table snapshot: {e}"))
+            })?
+            .schema();
+        let coercion_tree = create_coercion_tree(&kernel_schema);
+
+        let writer = JsonWriter::for_table(&table).map_err(|e| {
+            error!("Failed to create JsonWriter: {e}");
+            Error::InitError(format!("Failed to create JsonWriter: {e}"))
+        })?;
+
+        *self.state.lock().await = Some(SinkState {
+            table,
+            writer,
+            coercion_tree,
+        });
+
+        info!(
+            "Delta Lake sink connector with ID: {} opened successfully.",
+            self.id
+        );
+        Ok(())
+    }
+
+    async fn consume(
+        &self,
+        _topic_metadata: &TopicMetadata,
+        messages_metadata: MessagesMetadata,
+        messages: Vec<ConsumedMessage>,
+    ) -> Result<(), Error> {
+        debug!(
+            "Delta sink with ID: {} received: {} messages, partition: {}, 
offset: {}",
+            self.id,
+            messages.len(),
+            messages_metadata.partition_id,
+            messages_metadata.current_offset,
+        );
+
+        // Extract JSON values from consumed messages
+        let mut json_values: Vec<serde_json::Value> = 
Vec::with_capacity(messages.len());
+        for msg in &messages {
+            match &msg.payload {
+                Payload::Json(simd_value) => {
+                    json_values.push(owned_value_to_serde_json(simd_value));
+                }
+                other => {
+                    error!(
+                        "Unsupported payload type: {other}. Delta sink only 
supports JSON payloads."
+                    );
+                    return Err(Error::InvalidPayloadType);
+                }
+            }
+        }
+
+        if json_values.is_empty() {
+            debug!("No JSON values to write");
+            return Ok(());
+        }
+
+        let mut state_guard = self.state.lock().await;
+        let state = state_guard.as_mut().ok_or_else(|| {
+            error!("Delta sink state not initialized — was open() called?");
+            Error::InvalidConfig
+        })?;
+
+        // Apply coercions to match Delta table schema
+        for value in &mut json_values {
+            coerce(value, &state.coercion_tree);
+        }
+
+        // Write JSON values to internal Parquet buffers
+        state.writer.write(json_values).await.map_err(|e| {
+            error!("Failed to write to Delta writer: {e}");
+            Error::Storage(format!("Failed to write to Delta writer: {e}"))
+        })?;
+
+        // Flush buffers to object store and commit to Delta log
+        let version = match state.writer.flush_and_commit(&mut 
state.table).await {
+            Ok(v) => v,
+            Err(e) => {
+                state.writer.reset();

Review Comment:
   when `write()` succeeds (line 133) but `flush_and_commit()` fails here, 
`reset()` clears the internal parquet buffer. those messages are permanently 
lost with no retry path. since the connector runtime uses 
`AutoCommitWhen::PollingMessages` (offset committed before consume), the 
consumer offset may already have been advanced past these messages. there's no 
retry, no DLQ, no metric - the messages just vanish.



##########
core/connectors/sinks/delta_sink/src/sink.rs:
##########
@@ -0,0 +1,308 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use crate::DeltaSink;
+use crate::SinkState;
+use crate::coercions::{coerce, create_coercion_tree};
+use crate::storage::build_storage_options;
+use async_trait::async_trait;
+use deltalake::writer::{DeltaWriter, JsonWriter};
+use iggy_connector_sdk::{ConsumedMessage, Error, MessagesMetadata, Payload, 
Sink, TopicMetadata};
+use tracing::{debug, error, info};
+
+#[async_trait]
+impl Sink for DeltaSink {
+    async fn open(&mut self) -> Result<(), Error> {
+        info!(
+            "Opening Delta Lake sink connector with ID: {} for table: {}",
+            self.id, self.config.table_uri
+        );
+
+        let table_url = url::Url::parse(&self.config.table_uri).map_err(|e| {
+            error!("Failed to parse table URI '{}': {e}", 
self.config.table_uri);
+            Error::InitError(format!("Invalid table URI: {e}"))
+        })?;
+
+        info!("Parsed table URI: {}", table_url);
+
+        let storage_options = build_storage_options(&self.config).map_err(|e| {
+            error!("Invalid storage configuration: {e}");
+            Error::InitError(format!("Invalid storage configuration: {e}"))
+        })?;
+
+        let table =
+            match deltalake::open_table_with_storage_options(table_url, 
storage_options.clone())
+                .await
+            {
+                Ok(table) => table,
+                Err(e) => {
+                    error!("Failed to load Delta table: {e}");
+                    return Err(Error::InitError(format!("Failed to load Delta 
table: {e}")));
+                }
+            };
+
+        let kernel_schema = table
+            .snapshot()
+            .map_err(|e| {
+                error!("Failed to get table snapshot: {e}");
+                Error::InitError(format!("Failed to get table snapshot: {e}"))
+            })?
+            .schema();
+        let coercion_tree = create_coercion_tree(&kernel_schema);
+
+        let writer = JsonWriter::for_table(&table).map_err(|e| {
+            error!("Failed to create JsonWriter: {e}");
+            Error::InitError(format!("Failed to create JsonWriter: {e}"))
+        })?;
+
+        *self.state.lock().await = Some(SinkState {
+            table,
+            writer,
+            coercion_tree,
+        });
+
+        info!(
+            "Delta Lake sink connector with ID: {} opened successfully.",
+            self.id
+        );
+        Ok(())
+    }
+
+    async fn consume(
+        &self,
+        _topic_metadata: &TopicMetadata,
+        messages_metadata: MessagesMetadata,
+        messages: Vec<ConsumedMessage>,
+    ) -> Result<(), Error> {
+        debug!(
+            "Delta sink with ID: {} received: {} messages, partition: {}, 
offset: {}",
+            self.id,
+            messages.len(),
+            messages_metadata.partition_id,
+            messages_metadata.current_offset,
+        );
+
+        // Extract JSON values from consumed messages
+        let mut json_values: Vec<serde_json::Value> = 
Vec::with_capacity(messages.len());
+        for msg in &messages {
+            match &msg.payload {
+                Payload::Json(simd_value) => {
+                    json_values.push(owned_value_to_serde_json(simd_value));
+                }
+                other => {
+                    error!(
+                        "Unsupported payload type: {other}. Delta sink only 
supports JSON payloads."
+                    );
+                    return Err(Error::InvalidPayloadType);
+                }
+            }
+        }
+
+        if json_values.is_empty() {
+            debug!("No JSON values to write");
+            return Ok(());
+        }
+
+        let mut state_guard = self.state.lock().await;
+        let state = state_guard.as_mut().ok_or_else(|| {
+            error!("Delta sink state not initialized — was open() called?");
+            Error::InvalidConfig
+        })?;
+
+        // Apply coercions to match Delta table schema
+        for value in &mut json_values {
+            coerce(value, &state.coercion_tree);
+        }
+
+        // Write JSON values to internal Parquet buffers
+        state.writer.write(json_values).await.map_err(|e| {
+            error!("Failed to write to Delta writer: {e}");
+            Error::Storage(format!("Failed to write to Delta writer: {e}"))
+        })?;
+
+        // Flush buffers to object store and commit to Delta log
+        let version = match state.writer.flush_and_commit(&mut 
state.table).await {
+            Ok(v) => v,
+            Err(e) => {
+                state.writer.reset();
+                error!("Failed to flush and commit to Delta table: {e}");
+                return Err(Error::Storage(format!("Failed to flush and commit: 
{e}")));
+            }
+        };
+
+        debug!(
+            "Delta sink with ID: {} committed version {}",
+            self.id, version
+        );
+
+        Ok(())
+    }
+
+    async fn close(&mut self) -> Result<(), Error> {
+        if let Some(mut state) = self.state.lock().await.take()
+            && let Err(e) = state.writer.flush_and_commit(&mut 
state.table).await
+        {
+            error!(
+                "Delta sink with ID: {} failed to flush on close: {e}",
+                self.id
+            );
+            return Err(Error::Storage(format!("Failed to flush on close: 
{e}")));
+        }
+        info!("Delta Lake sink connector with ID: {} is closed.", self.id);
+        Ok(())
+    }
+}
+
+fn owned_value_to_serde_json(value: &simd_json::OwnedValue) -> 
serde_json::Value {
+    match value {
+        simd_json::OwnedValue::Static(s) => match s {
+            simd_json::StaticNode::Null => serde_json::Value::Null,
+            simd_json::StaticNode::Bool(b) => serde_json::Value::Bool(*b),
+            simd_json::StaticNode::I64(n) => 
serde_json::Value::Number((*n).into()),
+            simd_json::StaticNode::U64(n) => 
serde_json::Value::Number((*n).into()),
+            simd_json::StaticNode::F64(n) => serde_json::Number::from_f64(*n)

Review Comment:
   non-finite f64 values (NaN, Infinity) are silently mapped to 
`serde_json::Value::Null` here. for a delta column typed as `Double`, writing 
null where the source had NaN is data loss. if the column is non-nullable, this 
causes a confusing write failure downstream. add a `tracing::warn!` when this 
conversion happens so operators can diagnose data quality issues.



##########
core/connectors/sinks/delta_sink/src/coercions.rs:
##########
@@ -0,0 +1,600 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use deltalake::kernel::Schema as DeltaSchema;
+use deltalake::kernel::{DataType, PrimitiveType};
+
+use chrono::prelude::*;
+use serde_json::Value;
+use std::collections::HashMap;
+use std::str::FromStr;
+
+#[derive(Debug, Clone, PartialEq)]
+#[allow(unused)]
+enum CoercionNode {
+    Coercion(Coercion),
+    Tree(CoercionTree),
+    ArrayTree(CoercionTree),
+    ArrayPrimitive(Coercion),
+}
+
+#[derive(Debug, Clone, PartialEq)]
+enum Coercion {
+    ToString,
+    ToTimestamp,
+}
+
+#[derive(Debug, Clone, PartialEq)]
+pub(crate) struct CoercionTree {
+    root: HashMap<String, CoercionNode>,
+}
+
+/// Returns a [`CoercionTree`] so the schema can be walked efficiently level 
by level when performing conversions.
+pub(crate) fn create_coercion_tree(schema: &DeltaSchema) -> CoercionTree {
+    let mut root = HashMap::new();
+
+    for field in schema.fields() {
+        if let Some(node) = build_coercion_node(field.data_type()) {
+            root.insert(field.name().to_string(), node);
+        }
+    }
+
+    CoercionTree { root }
+}
+
+fn build_coercion_node(data_type: &DataType) -> Option<CoercionNode> {
+    match data_type {
+        DataType::Primitive(primitive) => match primitive {
+            PrimitiveType::String => 
Some(CoercionNode::Coercion(Coercion::ToString)),
+            PrimitiveType::Timestamp => 
Some(CoercionNode::Coercion(Coercion::ToTimestamp)),

Review Comment:
   `build_coercion_node` handles `PrimitiveType::Timestamp` but not 
`PrimitiveType::TimestampNtz`. delta lake has both `Timestamp` (microseconds, 
UTC-adjusted) and `TimestampNtz` (microseconds, no timezone). if a delta table 
uses `TimestampNtz` columns and receives ISO 8601 strings, they pass through 
uncoerced and `JsonWriter` receives a string where it expects an i64 - causing 
a write failure. fix: add `PrimitiveType::TimestampNtz` alongside `Timestamp` 
in this match.



##########
core/connectors/sinks/delta_sink/src/sink.rs:
##########
@@ -0,0 +1,308 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use crate::DeltaSink;
+use crate::SinkState;
+use crate::coercions::{coerce, create_coercion_tree};
+use crate::storage::build_storage_options;
+use async_trait::async_trait;
+use deltalake::writer::{DeltaWriter, JsonWriter};
+use iggy_connector_sdk::{ConsumedMessage, Error, MessagesMetadata, Payload, 
Sink, TopicMetadata};
+use tracing::{debug, error, info};
+
+#[async_trait]
+impl Sink for DeltaSink {
+    async fn open(&mut self) -> Result<(), Error> {
+        info!(
+            "Opening Delta Lake sink connector with ID: {} for table: {}",
+            self.id, self.config.table_uri
+        );
+
+        let table_url = url::Url::parse(&self.config.table_uri).map_err(|e| {
+            error!("Failed to parse table URI '{}': {e}", 
self.config.table_uri);
+            Error::InitError(format!("Invalid table URI: {e}"))
+        })?;
+
+        info!("Parsed table URI: {}", table_url);
+
+        let storage_options = build_storage_options(&self.config).map_err(|e| {
+            error!("Invalid storage configuration: {e}");
+            Error::InitError(format!("Invalid storage configuration: {e}"))
+        })?;
+
+        let table =
+            match deltalake::open_table_with_storage_options(table_url, 
storage_options.clone())
+                .await
+            {
+                Ok(table) => table,
+                Err(e) => {
+                    error!("Failed to load Delta table: {e}");
+                    return Err(Error::InitError(format!("Failed to load Delta 
table: {e}")));
+                }
+            };
+
+        let kernel_schema = table
+            .snapshot()
+            .map_err(|e| {
+                error!("Failed to get table snapshot: {e}");
+                Error::InitError(format!("Failed to get table snapshot: {e}"))
+            })?
+            .schema();
+        let coercion_tree = create_coercion_tree(&kernel_schema);
+
+        let writer = JsonWriter::for_table(&table).map_err(|e| {
+            error!("Failed to create JsonWriter: {e}");
+            Error::InitError(format!("Failed to create JsonWriter: {e}"))
+        })?;
+
+        *self.state.lock().await = Some(SinkState {
+            table,
+            writer,
+            coercion_tree,
+        });
+
+        info!(
+            "Delta Lake sink connector with ID: {} opened successfully.",
+            self.id
+        );
+        Ok(())
+    }
+
+    async fn consume(
+        &self,
+        _topic_metadata: &TopicMetadata,
+        messages_metadata: MessagesMetadata,
+        messages: Vec<ConsumedMessage>,
+    ) -> Result<(), Error> {
+        debug!(
+            "Delta sink with ID: {} received: {} messages, partition: {}, 
offset: {}",
+            self.id,
+            messages.len(),
+            messages_metadata.partition_id,
+            messages_metadata.current_offset,
+        );
+
+        // Extract JSON values from consumed messages
+        let mut json_values: Vec<serde_json::Value> = 
Vec::with_capacity(messages.len());
+        for msg in &messages {
+            match &msg.payload {
+                Payload::Json(simd_value) => {
+                    json_values.push(owned_value_to_serde_json(simd_value));
+                }
+                other => {
+                    error!(
+                        "Unsupported payload type: {other}. Delta sink only 
supports JSON payloads."
+                    );
+                    return Err(Error::InvalidPayloadType);
+                }
+            }
+        }
+
+        if json_values.is_empty() {
+            debug!("No JSON values to write");
+            return Ok(());
+        }
+
+        let mut state_guard = self.state.lock().await;

Review Comment:
   the tokio mutex is acquired here and held through both `writer.write()` 
(in-memory parquet encoding) and `writer.flush_and_commit()` (network/disk I/O 
+ delta log commit). if `consume()` is called concurrently from multiple 
partitions (the SDK uses `&self` not `&mut self`), every caller serializes on 
this lock including the I/O wait. combined with per-batch flush, a slow object 
store response blocks all partitions.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to