hubcio commented on code in PR #2889:
URL: https://github.com/apache/iggy/pull/2889#discussion_r3034648884


##########
core/connectors/sinks/delta_sink/src/coercions.rs:
##########
@@ -0,0 +1,600 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use deltalake::kernel::Schema as DeltaSchema;
+use deltalake::kernel::{DataType, PrimitiveType};
+
+use chrono::prelude::*;
+use serde_json::Value;
+use std::collections::HashMap;
+use std::str::FromStr;
+
+#[derive(Debug, Clone, PartialEq)]
+#[allow(unused)]
+enum CoercionNode {
+    Coercion(Coercion),
+    Tree(CoercionTree),
+    ArrayTree(CoercionTree),
+    ArrayPrimitive(Coercion),
+}
+
+#[derive(Debug, Clone, PartialEq)]
+enum Coercion {
+    ToString,
+    ToTimestamp,
+}
+
+#[derive(Debug, Clone, PartialEq)]
+pub(crate) struct CoercionTree {
+    root: HashMap<String, CoercionNode>,
+}
+
+/// Returns a [`CoercionTree`] so the schema can be walked efficiently level 
by level when performing conversions.
+pub(crate) fn create_coercion_tree(schema: &DeltaSchema) -> CoercionTree {
+    let mut root = HashMap::new();
+
+    for field in schema.fields() {
+        if let Some(node) = build_coercion_node(field.data_type()) {
+            root.insert(field.name().to_string(), node);
+        }
+    }
+
+    CoercionTree { root }
+}
+
+fn build_coercion_node(data_type: &DataType) -> Option<CoercionNode> {
+    match data_type {
+        DataType::Primitive(primitive) => match primitive {
+            PrimitiveType::String => 
Some(CoercionNode::Coercion(Coercion::ToString)),
+            PrimitiveType::Timestamp => 
Some(CoercionNode::Coercion(Coercion::ToTimestamp)),
+            _ => None,
+        },
+        DataType::Struct(st) => {
+            let nested_context = create_coercion_tree(st);
+            if !nested_context.root.is_empty() {
+                Some(CoercionNode::Tree(nested_context))
+            } else {
+                None
+            }
+        }
+        DataType::Array(array) => {
+            build_coercion_node(array.element_type()).and_then(|node| match 
node {
+                CoercionNode::Coercion(c) => 
Some(CoercionNode::ArrayPrimitive(c)),
+                CoercionNode::Tree(t) => Some(CoercionNode::ArrayTree(t)),
+                _ => None,
+            })
+        }
+        _ => None,
+    }
+}
+
+/// Applies all data coercions specified by the [`CoercionTree`] to the 
[`Value`].
+pub(crate) fn coerce(value: &mut Value, coercion_tree: &CoercionTree) {
+    if let Some(context) = value.as_object_mut() {
+        for (field_name, coercion) in coercion_tree.root.iter() {
+            if let Some(value) = context.get_mut(field_name) {
+                apply_coercion(value, coercion);
+            }
+        }
+    }
+}
+
+fn apply_coercion(value: &mut Value, node: &CoercionNode) {
+    match node {
+        CoercionNode::Coercion(Coercion::ToString) => {
+            if !value.is_string() {
+                *value = Value::String(value.to_string());
+            }
+        }
+        CoercionNode::Coercion(Coercion::ToTimestamp) => {

Review Comment:
   1. arrow's string_to_datetime (arrow-cast/parse.rs:195) accepts ' ' as a 
timestamp separator- so "2021-11-11 22:11:58" passes Arrow but fails chrono's 
DateTime::from_str. for these strings the coercion is a no-op and you get a 
spurious warn! log, but the write succeeds.
   2. for strings that BOTH parsers reject (e.g., "This definitely is not a 
timestamp"), Arrow fails at timestamp_array.rs:63-68 with 
ArrowError::JsonError, which fails record_batch_from_message and rejects the 
entire batch. these strings are left as-is by the coercion layer and then blow 
up downstream.



##########
core/connectors/sinks/delta_sink/src/sink.rs:
##########
@@ -0,0 +1,308 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use crate::DeltaSink;
+use crate::SinkState;
+use crate::coercions::{coerce, create_coercion_tree};
+use crate::storage::build_storage_options;
+use async_trait::async_trait;
+use deltalake::writer::{DeltaWriter, JsonWriter};
+use iggy_connector_sdk::{ConsumedMessage, Error, MessagesMetadata, Payload, 
Sink, TopicMetadata};
+use tracing::{debug, error, info};
+
+#[async_trait]
+impl Sink for DeltaSink {
+    async fn open(&mut self) -> Result<(), Error> {
+        info!(
+            "Opening Delta Lake sink connector with ID: {} for table: {}",
+            self.id, self.config.table_uri
+        );
+
+        let table_url = url::Url::parse(&self.config.table_uri).map_err(|e| {
+            error!("Failed to parse table URI '{}': {e}", 
self.config.table_uri);
+            Error::InitError(format!("Invalid table URI: {e}"))
+        })?;
+
+        info!("Parsed table URI: {}", table_url);
+
+        let storage_options = build_storage_options(&self.config).map_err(|e| {
+            error!("Invalid storage configuration: {e}");
+            Error::InitError(format!("Invalid storage configuration: {e}"))
+        })?;
+
+        let table =
+            match deltalake::open_table_with_storage_options(table_url, 
storage_options.clone())
+                .await
+            {
+                Ok(table) => table,
+                Err(e) => {
+                    error!("Failed to load Delta table: {e}");
+                    return Err(Error::InitError(format!("Failed to load Delta 
table: {e}")));
+                }
+            };
+
+        let kernel_schema = table
+            .snapshot()
+            .map_err(|e| {
+                error!("Failed to get table snapshot: {e}");
+                Error::InitError(format!("Failed to get table snapshot: {e}"))
+            })?
+            .schema();
+        let coercion_tree = create_coercion_tree(&kernel_schema);
+
+        let writer = JsonWriter::for_table(&table).map_err(|e| {
+            error!("Failed to create JsonWriter: {e}");
+            Error::InitError(format!("Failed to create JsonWriter: {e}"))
+        })?;
+
+        *self.state.lock().await = Some(SinkState {
+            table,
+            writer,
+            coercion_tree,
+        });
+
+        info!(
+            "Delta Lake sink connector with ID: {} opened successfully.",
+            self.id
+        );
+        Ok(())
+    }
+
+    async fn consume(
+        &self,
+        _topic_metadata: &TopicMetadata,
+        messages_metadata: MessagesMetadata,
+        messages: Vec<ConsumedMessage>,
+    ) -> Result<(), Error> {
+        debug!(
+            "Delta sink with ID: {} received: {} messages, partition: {}, 
offset: {}",
+            self.id,
+            messages.len(),
+            messages_metadata.partition_id,
+            messages_metadata.current_offset,
+        );
+
+        // Extract JSON values from consumed messages
+        let mut json_values: Vec<serde_json::Value> = 
Vec::with_capacity(messages.len());
+        for msg in &messages {
+            match &msg.payload {
+                Payload::Json(simd_value) => {
+                    json_values.push(owned_value_to_serde_json(simd_value));
+                }
+                other => {
+                    error!(
+                        "Unsupported payload type: {other}. Delta sink only 
supports JSON payloads."
+                    );
+                    return Err(Error::InvalidPayloadType);
+                }
+            }
+        }
+
+        if json_values.is_empty() {
+            debug!("No JSON values to write");
+            return Ok(());
+        }
+
+        let mut state_guard = self.state.lock().await;
+        let state = state_guard.as_mut().ok_or_else(|| {
+            error!("Delta sink state not initialized — was open() called?");
+            Error::InvalidConfig
+        })?;
+
+        // Apply coercions to match Delta table schema
+        for value in &mut json_values {
+            coerce(value, &state.coercion_tree);
+        }
+
+        // Write JSON values to internal Parquet buffers
+        state.writer.write(json_values).await.map_err(|e| {
+            error!("Failed to write to Delta writer: {e}");
+            Error::Storage(format!("Failed to write to Delta writer: {e}"))
+        })?;
+
+        // Flush buffers to object store and commit to Delta log
+        let version = match state.writer.flush_and_commit(&mut 
state.table).await {
+            Ok(v) => v,
+            Err(e) => {
+                state.writer.reset();
+                error!("Failed to flush and commit to Delta table: {e}");
+                return Err(Error::Storage(format!("Failed to flush and commit: 
{e}")));
+            }
+        };
+
+        debug!(
+            "Delta sink with ID: {} committed version {}",
+            self.id, version
+        );
+
+        Ok(())
+    }
+
+    async fn close(&mut self) -> Result<(), Error> {
+        if let Some(mut state) = self.state.lock().await.take()
+            && let Err(e) = state.writer.flush_and_commit(&mut 
state.table).await
+        {
+            error!(
+                "Delta sink with ID: {} failed to flush on close: {e}",
+                self.id
+            );
+            return Err(Error::Storage(format!("Failed to flush on close: 
{e}")));
+        }
+        info!("Delta Lake sink connector with ID: {} is closed.", self.id);
+        Ok(())
+    }
+}
+
+fn owned_value_to_serde_json(value: &simd_json::OwnedValue) -> 
serde_json::Value {
+    match value {
+        simd_json::OwnedValue::Static(s) => match s {
+            simd_json::StaticNode::Null => serde_json::Value::Null,
+            simd_json::StaticNode::Bool(b) => serde_json::Value::Bool(*b),
+            simd_json::StaticNode::I64(n) => 
serde_json::Value::Number((*n).into()),
+            simd_json::StaticNode::U64(n) => 
serde_json::Value::Number((*n).into()),
+            simd_json::StaticNode::F64(n) => serde_json::Number::from_f64(*n)

Review Comment:
   don't drop the message - use the SDK's owned_value_to_serde_json from 
iggy_connector_sdk::convert (convert.rs:29). it maps non-finite floats to null 
instead of returning None. if the column is nullable, null is correct. if 
non-nullable, the Delta writer will reject it with a clear error,better than 
silently losing the entire message. IMHO its better this way



##########
core/integration/tests/connectors/fixtures/delta/fixture.rs:
##########
@@ -0,0 +1,316 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use async_trait::async_trait;
+use deltalake::kernel::{DataType, PrimitiveType, StructField};
+use deltalake::operations::create::CreateBuilder;
+use integration::harness::{TestBinaryError, TestFixture};
+use std::collections::HashMap;
+use std::path::PathBuf;
+use tempfile::TempDir;
+use testcontainers_modules::testcontainers::core::{IntoContainerPort, WaitFor};
+use testcontainers_modules::testcontainers::runners::AsyncRunner;
+use testcontainers_modules::testcontainers::{ContainerAsync, GenericImage, 
ImageExt};
+use tracing::info;
+use uuid::Uuid;
+
+const ENV_SINK_TABLE_URI: &str = 
"IGGY_CONNECTORS_SINK_DELTA_PLUGIN_CONFIG_TABLE_URI";
+const ENV_SINK_PATH: &str = "IGGY_CONNECTORS_SINK_DELTA_PATH";
+const ENV_SINK_STORAGE_BACKEND_TYPE: &str =
+    "IGGY_CONNECTORS_SINK_DELTA_PLUGIN_CONFIG_STORAGE_BACKEND_TYPE";
+const ENV_SINK_AWS_S3_ACCESS_KEY: &str =
+    "IGGY_CONNECTORS_SINK_DELTA_PLUGIN_CONFIG_AWS_S3_ACCESS_KEY";
+const ENV_SINK_AWS_S3_SECRET_KEY: &str =
+    "IGGY_CONNECTORS_SINK_DELTA_PLUGIN_CONFIG_AWS_S3_SECRET_KEY";
+const ENV_SINK_AWS_S3_REGION: &str = 
"IGGY_CONNECTORS_SINK_DELTA_PLUGIN_CONFIG_AWS_S3_REGION";
+const ENV_SINK_AWS_S3_ENDPOINT_URL: &str =
+    "IGGY_CONNECTORS_SINK_DELTA_PLUGIN_CONFIG_AWS_S3_ENDPOINT_URL";
+const ENV_SINK_AWS_S3_ALLOW_HTTP: &str =
+    "IGGY_CONNECTORS_SINK_DELTA_PLUGIN_CONFIG_AWS_S3_ALLOW_HTTP";
+
+const MINIO_IMAGE: &str = "minio/minio";
+const MINIO_TAG: &str = "RELEASE.2025-09-07T16-13-09Z";
+const MINIO_PORT: u16 = 9000;
+const MINIO_CONSOLE_PORT: u16 = 9001;
+const MINIO_ACCESS_KEY: &str = "admin";
+const MINIO_SECRET_KEY: &str = "password";
+const MINIO_BUCKET: &str = "delta-warehouse";
+
+pub struct DeltaFixture {
+    _temp_dir: TempDir,
+    table_path: PathBuf,
+}
+
+impl DeltaFixture {
+    pub async fn wait_for_delta_log(
+        &self,
+        min_versions: usize,
+        max_attempts: usize,
+        interval_ms: u64,
+    ) -> Result<usize, TestBinaryError> {
+        let delta_log_dir = self.table_path.join("_delta_log");
+
+        for _ in 0..max_attempts {
+            let count = Self::count_delta_versions(&delta_log_dir);
+            if count >= min_versions {
+                info!("Found {count} delta log versions (required: 
{min_versions})");
+                return Ok(count);
+            }
+            
tokio::time::sleep(std::time::Duration::from_millis(interval_ms)).await;
+        }
+
+        let final_count = Self::count_delta_versions(&delta_log_dir);
+        Err(TestBinaryError::InvalidState {
+            message: format!(
+                "Expected at least {min_versions} delta log versions, found 
{final_count} after {max_attempts} attempts"
+            ),
+        })
+    }
+
+    async fn create_table(table_uri: &str) -> Result<(), TestBinaryError> {
+        let columns = vec![
+            StructField::new("id", DataType::Primitive(PrimitiveType::Long), 
true),
+            StructField::new("name", 
DataType::Primitive(PrimitiveType::String), true),
+            StructField::new("count", 
DataType::Primitive(PrimitiveType::Integer), true),
+            StructField::new("amount", 
DataType::Primitive(PrimitiveType::Double), true),
+            StructField::new("active", 
DataType::Primitive(PrimitiveType::Boolean), true),
+            StructField::new("timestamp", 
DataType::Primitive(PrimitiveType::Long), true),

Review Comment:
   fair point, resolving



##########
core/connectors/sinks/delta_sink/src/sink.rs:
##########
@@ -0,0 +1,308 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use crate::DeltaSink;
+use crate::SinkState;
+use crate::coercions::{coerce, create_coercion_tree};
+use crate::storage::build_storage_options;
+use async_trait::async_trait;
+use deltalake::writer::{DeltaWriter, JsonWriter};
+use iggy_connector_sdk::{ConsumedMessage, Error, MessagesMetadata, Payload, 
Sink, TopicMetadata};
+use tracing::{debug, error, info};
+
+#[async_trait]
+impl Sink for DeltaSink {
+    async fn open(&mut self) -> Result<(), Error> {
+        info!(
+            "Opening Delta Lake sink connector with ID: {} for table: {}",
+            self.id, self.config.table_uri
+        );
+
+        let table_url = url::Url::parse(&self.config.table_uri).map_err(|e| {
+            error!("Failed to parse table URI '{}': {e}", 
self.config.table_uri);
+            Error::InitError(format!("Invalid table URI: {e}"))
+        })?;
+
+        info!("Parsed table URI: {}", table_url);
+
+        let storage_options = build_storage_options(&self.config).map_err(|e| {
+            error!("Invalid storage configuration: {e}");
+            Error::InitError(format!("Invalid storage configuration: {e}"))
+        })?;
+
+        let table =
+            match deltalake::open_table_with_storage_options(table_url, 
storage_options.clone())
+                .await
+            {
+                Ok(table) => table,
+                Err(e) => {
+                    error!("Failed to load Delta table: {e}");
+                    return Err(Error::InitError(format!("Failed to load Delta 
table: {e}")));
+                }
+            };
+
+        let kernel_schema = table
+            .snapshot()
+            .map_err(|e| {
+                error!("Failed to get table snapshot: {e}");
+                Error::InitError(format!("Failed to get table snapshot: {e}"))
+            })?
+            .schema();
+        let coercion_tree = create_coercion_tree(&kernel_schema);
+
+        let writer = JsonWriter::for_table(&table).map_err(|e| {
+            error!("Failed to create JsonWriter: {e}");
+            Error::InitError(format!("Failed to create JsonWriter: {e}"))
+        })?;
+
+        *self.state.lock().await = Some(SinkState {
+            table,
+            writer,
+            coercion_tree,
+        });
+
+        info!(
+            "Delta Lake sink connector with ID: {} opened successfully.",
+            self.id
+        );
+        Ok(())
+    }
+
+    async fn consume(
+        &self,
+        _topic_metadata: &TopicMetadata,
+        messages_metadata: MessagesMetadata,
+        messages: Vec<ConsumedMessage>,
+    ) -> Result<(), Error> {
+        debug!(
+            "Delta sink with ID: {} received: {} messages, partition: {}, 
offset: {}",
+            self.id,
+            messages.len(),
+            messages_metadata.partition_id,
+            messages_metadata.current_offset,
+        );
+
+        // Extract JSON values from consumed messages
+        let mut json_values: Vec<serde_json::Value> = 
Vec::with_capacity(messages.len());
+        for msg in &messages {
+            match &msg.payload {
+                Payload::Json(simd_value) => {
+                    json_values.push(owned_value_to_serde_json(simd_value));
+                }
+                other => {
+                    error!(
+                        "Unsupported payload type: {other}. Delta sink only 
supports JSON payloads."
+                    );
+                    return Err(Error::InvalidPayloadType);
+                }
+            }
+        }
+
+        if json_values.is_empty() {
+            debug!("No JSON values to write");
+            return Ok(());
+        }
+
+        let mut state_guard = self.state.lock().await;
+        let state = state_guard.as_mut().ok_or_else(|| {
+            error!("Delta sink state not initialized — was open() called?");
+            Error::InvalidConfig
+        })?;
+
+        // Apply coercions to match Delta table schema
+        for value in &mut json_values {
+            coerce(value, &state.coercion_tree);
+        }
+
+        // Write JSON values to internal Parquet buffers
+        state.writer.write(json_values).await.map_err(|e| {
+            error!("Failed to write to Delta writer: {e}");
+            Error::Storage(format!("Failed to write to Delta writer: {e}"))
+        })?;
+
+        // Flush buffers to object store and commit to Delta log
+        let version = match state.writer.flush_and_commit(&mut 
state.table).await {

Review Comment:
   fair point. lets leave it as is.



##########
core/connectors/sinks/delta_sink/src/sink.rs:
##########
@@ -0,0 +1,322 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use crate::DeltaSink;
+use crate::SinkState;
+use crate::coercions::{coerce, create_coercion_tree};
+use crate::storage::build_storage_options;
+use async_trait::async_trait;
+use deltalake::writer::{DeltaWriter, JsonWriter};
+use iggy_connector_sdk::{ConsumedMessage, Error, MessagesMetadata, Payload, 
Sink, TopicMetadata};
+use tracing::{debug, error, info};
+
+// TODO: Expose metrics for observability purposes
+
+#[async_trait]
+impl Sink for DeltaSink {
+    async fn open(&mut self) -> Result<(), Error> {
+        info!(
+            "Opening Delta Lake sink connector with ID: {} for table: {}",
+            self.id, self.config.table_uri
+        );
+
+        let table_url = url::Url::parse(&self.config.table_uri).map_err(|e| {
+            error!("Failed to parse table URI '{}': {e}", 
self.config.table_uri);
+            Error::InitError(format!("Invalid table URI: {e}"))
+        })?;
+
+        info!("Parsed table URI: {}", table_url);
+
+        let storage_options = build_storage_options(&self.config).map_err(|e| {
+            error!("Invalid storage configuration: {e}");
+            Error::InitError(format!("Invalid storage configuration: {e}"))
+        })?;
+
+        let table =
+            match deltalake::open_table_with_storage_options(table_url, 
storage_options).await {
+                Ok(table) => table,
+                Err(e) => {
+                    error!("Failed to load Delta table: {e}");
+                    return Err(Error::InitError(format!("Failed to load Delta 
table: {e}")));
+                }
+            };
+
+        let kernel_schema = table
+            .snapshot()
+            .map_err(|e| {
+                error!("Failed to get table snapshot: {e}");
+                Error::InitError(format!("Failed to get table snapshot: {e}"))
+            })?
+            .schema();
+        let coercion_tree = create_coercion_tree(&kernel_schema);
+
+        let writer = JsonWriter::for_table(&table).map_err(|e| {
+            error!("Failed to create JsonWriter: {e}");
+            Error::InitError(format!("Failed to create JsonWriter: {e}"))
+        })?;
+
+        *self.state.lock().await = Some(SinkState {
+            table,
+            writer,
+            coercion_tree,
+        });
+
+        info!(
+            "Delta Lake sink connector with ID: {} opened successfully.",
+            self.id
+        );
+        Ok(())
+    }
+
+    async fn consume(
+        &self,
+        _topic_metadata: &TopicMetadata,
+        messages_metadata: MessagesMetadata,
+        messages: Vec<ConsumedMessage>,
+    ) -> Result<(), Error> {
+        debug!(
+            "Delta sink with ID: {} received: {} messages, partition: {}, 
offset: {}",
+            self.id,
+            messages.len(),
+            messages_metadata.partition_id,
+            messages_metadata.current_offset,
+        );
+
+        // Extract JSON values from consumed messages
+        let mut json_values: Vec<serde_json::Value> = 
Vec::with_capacity(messages.len());
+        for msg in &messages {
+            match &msg.payload {
+                Payload::Json(simd_value) => match 
owned_value_to_serde_json(simd_value) {
+                    Some(v) => json_values.push(v),
+                    None => continue,
+                },
+                other => {
+                    error!(
+                        "Unsupported payload type: {other}. Delta sink only 
supports JSON payloads."
+                    );
+                    return Err(Error::InvalidPayloadType);
+                }
+            }
+        }
+
+        if json_values.is_empty() {
+            debug!("No JSON values to write");
+            return Ok(());
+        }
+
+        // TODO: all partition consume() calls serialize on this single lock, 
holding it
+        // through flush_and_commit() I/O. fix: per-partition writers keyed by 
partition_id.
+        // Ref: 
https://github.com/apache/iggy/pull/2889/#discussion_r2936719763
+        let mut state_guard = self.state.lock().await;
+        let state = state_guard.as_mut().ok_or_else(|| {
+            error!("Delta sink state not initialized — was open() called?");
+            Error::InvalidState
+        })?;
+
+        // Apply coercions to match Delta table schema
+        for value in &mut json_values {
+            coerce(value, &state.coercion_tree);
+        }
+
+        // Write JSON values to internal Parquet buffers
+        // TODO: Add retry mechanism if write fails.
+        if let Err(e) = state.writer.write(json_values).await {
+            state.writer.reset();
+            error!("Failed to write to Delta writer: {e}");
+            return Err(Error::Storage(format!(
+                "Failed to write to Delta writer: {e}"
+            )));
+        }
+
+        // Flush buffers to object store and commit to Delta log
+        let version = match state.writer.flush_and_commit(&mut 
state.table).await {
+            Ok(v) => v,
+            Err(e) => {
+                state.writer.reset();
+                error!("Failed to flush and commit to Delta table: {e}");
+                return Err(Error::Storage(format!("Failed to flush and commit: 
{e}")));
+            }
+        };
+
+        debug!(
+            "Delta sink with ID: {} committed version {}",
+            self.id, version
+        );
+
+        Ok(())
+    }
+
+    async fn close(&mut self) -> Result<(), Error> {
+        if let Some(mut state) = self.state.lock().await.take()
+            && let Err(e) = state.writer.flush_and_commit(&mut 
state.table).await
+        {
+            error!(
+                "Delta sink with ID: {} failed to flush on close: {e}",
+                self.id
+            );
+            return Err(Error::Storage(format!("Failed to flush on close: 
{e}")));
+        }
+        info!("Delta Lake sink connector with ID: {} is closed.", self.id);
+        Ok(())
+    }
+}
+
+// TODO: Similar logic exists in the Elasticsearch sink.
+// Refactor to use a common utility when it is implemented.
+fn owned_value_to_serde_json(value: &simd_json::OwnedValue) -> 
Option<serde_json::Value> {

Review Comment:
   you can use this from connectors SDK. see 
`core/connectors/sdk/src/convert.rs`



##########
core/connectors/sinks/delta_sink/Cargo.toml:
##########
@@ -0,0 +1,49 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+[package]
+name = "iggy_connector_delta_sink"
+version = "0.1.0"
+edition = "2024"
+license = "Apache-2.0"
+keywords = ["iggy", "messaging", "streaming"]
+categories = ["command-line-utilities", "database", "network-programming"]
+homepage = "https://iggy.apache.org";
+documentation = "https://iggy.apache.org/docs";
+repository = "https://github.com/apache/iggy";
+readme = "../../README.md"
+
+[package.metadata.cargo-machete]
+ignored = ["dashmap", "once_cell"]
+
+[lib]
+crate-type = ["cdylib", "lib"]
+
+[dependencies]
+async-trait = { workspace = true }
+chrono = { workspace = true }
+dashmap = { workspace = true }
+deltalake = { workspace = true }
+iggy_connector_sdk = { workspace = true }
+once_cell = { workspace = true }
+secrecy = { workspace = true }
+serde = { workspace = true }
+serde_json = { workspace = true }
+simd-json = { workspace = true }
+tokio = { workspace = true }
+tracing = { workspace = true }
+url = "2"

Review Comment:
   we track all deps versions in main `Cargo.toml` + dont use versions that are 
not pinned to major, use full semver



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to