hubcio commented on code in PR #2889: URL: https://github.com/apache/iggy/pull/2889#discussion_r2936719770
########## core/connectors/sinks/delta_sink/src/sink.rs: ########## @@ -0,0 +1,308 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use crate::DeltaSink; +use crate::SinkState; +use crate::coercions::{coerce, create_coercion_tree}; +use crate::storage::build_storage_options; +use async_trait::async_trait; +use deltalake::writer::{DeltaWriter, JsonWriter}; +use iggy_connector_sdk::{ConsumedMessage, Error, MessagesMetadata, Payload, Sink, TopicMetadata}; +use tracing::{debug, error, info}; + +#[async_trait] +impl Sink for DeltaSink { + async fn open(&mut self) -> Result<(), Error> { + info!( + "Opening Delta Lake sink connector with ID: {} for table: {}", + self.id, self.config.table_uri + ); + + let table_url = url::Url::parse(&self.config.table_uri).map_err(|e| { + error!("Failed to parse table URI '{}': {e}", self.config.table_uri); + Error::InitError(format!("Invalid table URI: {e}")) + })?; + + info!("Parsed table URI: {}", table_url); + + let storage_options = build_storage_options(&self.config).map_err(|e| { + error!("Invalid storage configuration: {e}"); + Error::InitError(format!("Invalid storage configuration: {e}")) + })?; + + let table = + match deltalake::open_table_with_storage_options(table_url, storage_options.clone()) + .await + { + Ok(table) => table, + Err(e) => { + error!("Failed to load Delta table: {e}"); + return Err(Error::InitError(format!("Failed to load Delta table: {e}"))); + } + }; + + let kernel_schema = table + .snapshot() + .map_err(|e| { + error!("Failed to get table snapshot: {e}"); + Error::InitError(format!("Failed to get table snapshot: {e}")) + })? + .schema(); + let coercion_tree = create_coercion_tree(&kernel_schema); + + let writer = JsonWriter::for_table(&table).map_err(|e| { + error!("Failed to create JsonWriter: {e}"); + Error::InitError(format!("Failed to create JsonWriter: {e}")) + })?; + + *self.state.lock().await = Some(SinkState { + table, + writer, + coercion_tree, + }); + + info!( + "Delta Lake sink connector with ID: {} opened successfully.", + self.id + ); + Ok(()) + } + + async fn consume( + &self, + _topic_metadata: &TopicMetadata, + messages_metadata: MessagesMetadata, + messages: Vec<ConsumedMessage>, + ) -> Result<(), Error> { + debug!( + "Delta sink with ID: {} received: {} messages, partition: {}, offset: {}", + self.id, + messages.len(), + messages_metadata.partition_id, + messages_metadata.current_offset, + ); + + // Extract JSON values from consumed messages + let mut json_values: Vec<serde_json::Value> = Vec::with_capacity(messages.len()); + for msg in &messages { + match &msg.payload { + Payload::Json(simd_value) => { + json_values.push(owned_value_to_serde_json(simd_value)); + } + other => { + error!( + "Unsupported payload type: {other}. Delta sink only supports JSON payloads." + ); + return Err(Error::InvalidPayloadType); + } + } + } + + if json_values.is_empty() { + debug!("No JSON values to write"); + return Ok(()); + } + + let mut state_guard = self.state.lock().await; + let state = state_guard.as_mut().ok_or_else(|| { Review Comment: when `state` is `None` (meaning `open()` was not called or failed), returning `Error::InvalidConfig` is the wrong semantics - this is a lifecycle/invariant violation, not a config error. use `Error::InitError("Delta sink state not initialized - was open() called?".to_string())` to match the actual problem. ########## core/connectors/sinks/delta_sink/src/coercions.rs: ########## @@ -0,0 +1,600 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use deltalake::kernel::Schema as DeltaSchema; +use deltalake::kernel::{DataType, PrimitiveType}; + +use chrono::prelude::*; +use serde_json::Value; +use std::collections::HashMap; +use std::str::FromStr; + +#[derive(Debug, Clone, PartialEq)] +#[allow(unused)] Review Comment: all four `CoercionNode` variants are constructed and matched in the codebase. the `#[allow(unused)]` is unnecessary and should be removed. ########## core/integration/tests/connectors/fixtures/delta/fixture.rs: ########## @@ -0,0 +1,316 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use async_trait::async_trait; +use deltalake::kernel::{DataType, PrimitiveType, StructField}; +use deltalake::operations::create::CreateBuilder; +use integration::harness::{TestBinaryError, TestFixture}; +use std::collections::HashMap; +use std::path::PathBuf; +use tempfile::TempDir; +use testcontainers_modules::testcontainers::core::{IntoContainerPort, WaitFor}; +use testcontainers_modules::testcontainers::runners::AsyncRunner; +use testcontainers_modules::testcontainers::{ContainerAsync, GenericImage, ImageExt}; +use tracing::info; +use uuid::Uuid; + +const ENV_SINK_TABLE_URI: &str = "IGGY_CONNECTORS_SINK_DELTA_PLUGIN_CONFIG_TABLE_URI"; +const ENV_SINK_PATH: &str = "IGGY_CONNECTORS_SINK_DELTA_PATH"; +const ENV_SINK_STORAGE_BACKEND_TYPE: &str = + "IGGY_CONNECTORS_SINK_DELTA_PLUGIN_CONFIG_STORAGE_BACKEND_TYPE"; +const ENV_SINK_AWS_S3_ACCESS_KEY: &str = + "IGGY_CONNECTORS_SINK_DELTA_PLUGIN_CONFIG_AWS_S3_ACCESS_KEY"; +const ENV_SINK_AWS_S3_SECRET_KEY: &str = + "IGGY_CONNECTORS_SINK_DELTA_PLUGIN_CONFIG_AWS_S3_SECRET_KEY"; +const ENV_SINK_AWS_S3_REGION: &str = "IGGY_CONNECTORS_SINK_DELTA_PLUGIN_CONFIG_AWS_S3_REGION"; +const ENV_SINK_AWS_S3_ENDPOINT_URL: &str = + "IGGY_CONNECTORS_SINK_DELTA_PLUGIN_CONFIG_AWS_S3_ENDPOINT_URL"; +const ENV_SINK_AWS_S3_ALLOW_HTTP: &str = + "IGGY_CONNECTORS_SINK_DELTA_PLUGIN_CONFIG_AWS_S3_ALLOW_HTTP"; + +const MINIO_IMAGE: &str = "minio/minio"; +const MINIO_TAG: &str = "RELEASE.2025-09-07T16-13-09Z"; +const MINIO_PORT: u16 = 9000; +const MINIO_CONSOLE_PORT: u16 = 9001; +const MINIO_ACCESS_KEY: &str = "admin"; +const MINIO_SECRET_KEY: &str = "password"; +const MINIO_BUCKET: &str = "delta-warehouse"; + +pub struct DeltaFixture { + _temp_dir: TempDir, + table_path: PathBuf, +} + +impl DeltaFixture { + pub async fn wait_for_delta_log( + &self, + min_versions: usize, + max_attempts: usize, + interval_ms: u64, + ) -> Result<usize, TestBinaryError> { + let delta_log_dir = self.table_path.join("_delta_log"); + + for _ in 0..max_attempts { + let count = Self::count_delta_versions(&delta_log_dir); + if count >= min_versions { + info!("Found {count} delta log versions (required: {min_versions})"); + return Ok(count); + } + tokio::time::sleep(std::time::Duration::from_millis(interval_ms)).await; + } + + let final_count = Self::count_delta_versions(&delta_log_dir); + Err(TestBinaryError::InvalidState { + message: format!( + "Expected at least {min_versions} delta log versions, found {final_count} after {max_attempts} attempts" + ), + }) + } + + async fn create_table(table_uri: &str) -> Result<(), TestBinaryError> { + let columns = vec![ + StructField::new("id", DataType::Primitive(PrimitiveType::Long), true), + StructField::new("name", DataType::Primitive(PrimitiveType::String), true), + StructField::new("count", DataType::Primitive(PrimitiveType::Integer), true), + StructField::new("amount", DataType::Primitive(PrimitiveType::Double), true), + StructField::new("active", DataType::Primitive(PrimitiveType::Boolean), true), + StructField::new("timestamp", DataType::Primitive(PrimitiveType::Long), true), Review Comment: the test fixture declares `timestamp` as `PrimitiveType::Long`, not `PrimitiveType::Timestamp`. this means all integration tests bypass the coercion logic entirely. an e2e test with a `Timestamp`-typed column would catch issues like the `TimestampNtz` gap and null-to-"null" coercion bug at the integration level. ########## core/connectors/sinks/delta_sink/src/sink.rs: ########## @@ -0,0 +1,308 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use crate::DeltaSink; +use crate::SinkState; +use crate::coercions::{coerce, create_coercion_tree}; +use crate::storage::build_storage_options; +use async_trait::async_trait; +use deltalake::writer::{DeltaWriter, JsonWriter}; +use iggy_connector_sdk::{ConsumedMessage, Error, MessagesMetadata, Payload, Sink, TopicMetadata}; +use tracing::{debug, error, info}; + +#[async_trait] +impl Sink for DeltaSink { + async fn open(&mut self) -> Result<(), Error> { + info!( + "Opening Delta Lake sink connector with ID: {} for table: {}", + self.id, self.config.table_uri + ); + + let table_url = url::Url::parse(&self.config.table_uri).map_err(|e| { + error!("Failed to parse table URI '{}': {e}", self.config.table_uri); + Error::InitError(format!("Invalid table URI: {e}")) + })?; + + info!("Parsed table URI: {}", table_url); + + let storage_options = build_storage_options(&self.config).map_err(|e| { + error!("Invalid storage configuration: {e}"); + Error::InitError(format!("Invalid storage configuration: {e}")) + })?; + + let table = + match deltalake::open_table_with_storage_options(table_url, storage_options.clone()) + .await + { + Ok(table) => table, + Err(e) => { + error!("Failed to load Delta table: {e}"); + return Err(Error::InitError(format!("Failed to load Delta table: {e}"))); + } + }; + + let kernel_schema = table + .snapshot() + .map_err(|e| { + error!("Failed to get table snapshot: {e}"); + Error::InitError(format!("Failed to get table snapshot: {e}")) + })? + .schema(); + let coercion_tree = create_coercion_tree(&kernel_schema); + + let writer = JsonWriter::for_table(&table).map_err(|e| { + error!("Failed to create JsonWriter: {e}"); + Error::InitError(format!("Failed to create JsonWriter: {e}")) + })?; + + *self.state.lock().await = Some(SinkState { + table, + writer, + coercion_tree, + }); + + info!( + "Delta Lake sink connector with ID: {} opened successfully.", + self.id + ); + Ok(()) + } + + async fn consume( + &self, + _topic_metadata: &TopicMetadata, + messages_metadata: MessagesMetadata, + messages: Vec<ConsumedMessage>, + ) -> Result<(), Error> { + debug!( + "Delta sink with ID: {} received: {} messages, partition: {}, offset: {}", + self.id, + messages.len(), + messages_metadata.partition_id, + messages_metadata.current_offset, + ); + + // Extract JSON values from consumed messages + let mut json_values: Vec<serde_json::Value> = Vec::with_capacity(messages.len()); + for msg in &messages { + match &msg.payload { + Payload::Json(simd_value) => { + json_values.push(owned_value_to_serde_json(simd_value)); + } + other => { + error!( + "Unsupported payload type: {other}. Delta sink only supports JSON payloads." + ); + return Err(Error::InvalidPayloadType); + } + } + } + + if json_values.is_empty() { + debug!("No JSON values to write"); + return Ok(()); + } + + let mut state_guard = self.state.lock().await; + let state = state_guard.as_mut().ok_or_else(|| { + error!("Delta sink state not initialized — was open() called?"); + Error::InvalidConfig + })?; + + // Apply coercions to match Delta table schema + for value in &mut json_values { + coerce(value, &state.coercion_tree); + } + + // Write JSON values to internal Parquet buffers + state.writer.write(json_values).await.map_err(|e| { Review Comment: if `JsonWriter::write()` partially buffers data before failing, the next `consume()` call will call `flush_and_commit` on a buffer containing stale/partial data from the failed batch. `reset()` is correctly done for `flush_and_commit` failures (line 142) but is missing here for `write` failures. either add `reset()` on write error, or verify in deltalake docs that `write` is atomic. ########## core/connectors/sinks/delta_sink/src/coercions.rs: ########## @@ -0,0 +1,600 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use deltalake::kernel::Schema as DeltaSchema; +use deltalake::kernel::{DataType, PrimitiveType}; + +use chrono::prelude::*; +use serde_json::Value; +use std::collections::HashMap; +use std::str::FromStr; + +#[derive(Debug, Clone, PartialEq)] +#[allow(unused)] +enum CoercionNode { + Coercion(Coercion), + Tree(CoercionTree), + ArrayTree(CoercionTree), + ArrayPrimitive(Coercion), +} + +#[derive(Debug, Clone, PartialEq)] +enum Coercion { + ToString, + ToTimestamp, +} + +#[derive(Debug, Clone, PartialEq)] +pub(crate) struct CoercionTree { + root: HashMap<String, CoercionNode>, +} + +/// Returns a [`CoercionTree`] so the schema can be walked efficiently level by level when performing conversions. +pub(crate) fn create_coercion_tree(schema: &DeltaSchema) -> CoercionTree { + let mut root = HashMap::new(); + + for field in schema.fields() { + if let Some(node) = build_coercion_node(field.data_type()) { + root.insert(field.name().to_string(), node); + } + } + + CoercionTree { root } +} + +fn build_coercion_node(data_type: &DataType) -> Option<CoercionNode> { + match data_type { + DataType::Primitive(primitive) => match primitive { + PrimitiveType::String => Some(CoercionNode::Coercion(Coercion::ToString)), + PrimitiveType::Timestamp => Some(CoercionNode::Coercion(Coercion::ToTimestamp)), + _ => None, + }, + DataType::Struct(st) => { + let nested_context = create_coercion_tree(st); + if !nested_context.root.is_empty() { + Some(CoercionNode::Tree(nested_context)) + } else { + None + } + } + DataType::Array(array) => { + build_coercion_node(array.element_type()).and_then(|node| match node { + CoercionNode::Coercion(c) => Some(CoercionNode::ArrayPrimitive(c)), + CoercionNode::Tree(t) => Some(CoercionNode::ArrayTree(t)), + _ => None, + }) + } + _ => None, + } +} + +/// Applies all data coercions specified by the [`CoercionTree`] to the [`Value`]. +pub(crate) fn coerce(value: &mut Value, coercion_tree: &CoercionTree) { + if let Some(context) = value.as_object_mut() { + for (field_name, coercion) in coercion_tree.root.iter() { + if let Some(value) = context.get_mut(field_name) { + apply_coercion(value, coercion); + } + } + } +} + +fn apply_coercion(value: &mut Value, node: &CoercionNode) { + match node { + CoercionNode::Coercion(Coercion::ToString) => { + if !value.is_string() { Review Comment: when a value is JSON `null` and the delta column is `String`, `value.to_string()` produces the literal 4-char string `"null"` rather than preserving null. downstream consumers reading the delta table cannot distinguish actual nulls from the string `"null"`. the test at line 512-514 asserts this behavior, but it's semantically wrong - JSON null should map to SQL NULL. fix: add `if value.is_null() { return; }` before the `is_string()` check. ########## core/connectors/sinks/delta_sink/src/storage.rs: ########## @@ -0,0 +1,277 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use crate::DeltaSinkConfig; +use iggy_connector_sdk::Error; +use std::collections::HashMap; + +pub(crate) fn build_storage_options( + config: &DeltaSinkConfig, +) -> Result<HashMap<String, String>, Error> { + let mut opts = HashMap::new(); + + match config.storage_backend_type.as_deref() { + Some("s3") => { + let access_key = config + .aws_s3_access_key + .as_ref() + .ok_or(Error::InvalidConfig)?; + let secret_key = config + .aws_s3_secret_key + .as_ref() + .ok_or(Error::InvalidConfig)?; + let region = config.aws_s3_region.as_ref().ok_or(Error::InvalidConfig)?; + let endpoint_url = config + .aws_s3_endpoint_url + .as_ref() + .ok_or(Error::InvalidConfig)?; + let allow_http = config.aws_s3_allow_http.ok_or(Error::InvalidConfig)?; + + opts.insert("AWS_ACCESS_KEY_ID".into(), access_key.clone()); + opts.insert("AWS_SECRET_ACCESS_KEY".into(), secret_key.clone()); + opts.insert("AWS_REGION".into(), region.clone()); + opts.insert("AWS_ENDPOINT_URL".into(), endpoint_url.clone()); + opts.insert("AWS_ALLOW_HTTP".into(), allow_http.to_string()); + opts.insert("AWS_S3_ALLOW_HTTP".into(), allow_http.to_string()); + } + Some("azure") => { Review Comment: the azure path requires all four fields: `account_name`, `account_key`, `sas_token`, and `container_name`. in practice, azure auth uses either an account key OR a SAS token, never both. requiring both means users who only have a SAS token (common in restricted environments) can't use the connector. either make these two auth fields alternatives with validation, or make each optional and pass through whichever is provided. ########## core/connectors/sinks/delta_sink/src/sink.rs: ########## @@ -0,0 +1,308 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use crate::DeltaSink; +use crate::SinkState; +use crate::coercions::{coerce, create_coercion_tree}; +use crate::storage::build_storage_options; +use async_trait::async_trait; +use deltalake::writer::{DeltaWriter, JsonWriter}; +use iggy_connector_sdk::{ConsumedMessage, Error, MessagesMetadata, Payload, Sink, TopicMetadata}; +use tracing::{debug, error, info}; + +#[async_trait] +impl Sink for DeltaSink { + async fn open(&mut self) -> Result<(), Error> { + info!( + "Opening Delta Lake sink connector with ID: {} for table: {}", + self.id, self.config.table_uri + ); + + let table_url = url::Url::parse(&self.config.table_uri).map_err(|e| { + error!("Failed to parse table URI '{}': {e}", self.config.table_uri); + Error::InitError(format!("Invalid table URI: {e}")) + })?; + + info!("Parsed table URI: {}", table_url); + + let storage_options = build_storage_options(&self.config).map_err(|e| { + error!("Invalid storage configuration: {e}"); + Error::InitError(format!("Invalid storage configuration: {e}")) + })?; + + let table = + match deltalake::open_table_with_storage_options(table_url, storage_options.clone()) + .await + { + Ok(table) => table, + Err(e) => { + error!("Failed to load Delta table: {e}"); + return Err(Error::InitError(format!("Failed to load Delta table: {e}"))); + } + }; + + let kernel_schema = table + .snapshot() + .map_err(|e| { + error!("Failed to get table snapshot: {e}"); + Error::InitError(format!("Failed to get table snapshot: {e}")) + })? + .schema(); + let coercion_tree = create_coercion_tree(&kernel_schema); + + let writer = JsonWriter::for_table(&table).map_err(|e| { + error!("Failed to create JsonWriter: {e}"); + Error::InitError(format!("Failed to create JsonWriter: {e}")) + })?; + + *self.state.lock().await = Some(SinkState { + table, + writer, + coercion_tree, + }); + + info!( + "Delta Lake sink connector with ID: {} opened successfully.", + self.id + ); + Ok(()) + } + + async fn consume( + &self, + _topic_metadata: &TopicMetadata, + messages_metadata: MessagesMetadata, + messages: Vec<ConsumedMessage>, + ) -> Result<(), Error> { + debug!( + "Delta sink with ID: {} received: {} messages, partition: {}, offset: {}", + self.id, + messages.len(), + messages_metadata.partition_id, + messages_metadata.current_offset, + ); + + // Extract JSON values from consumed messages + let mut json_values: Vec<serde_json::Value> = Vec::with_capacity(messages.len()); + for msg in &messages { + match &msg.payload { + Payload::Json(simd_value) => { + json_values.push(owned_value_to_serde_json(simd_value)); + } + other => { + error!( + "Unsupported payload type: {other}. Delta sink only supports JSON payloads." + ); + return Err(Error::InvalidPayloadType); + } + } + } + + if json_values.is_empty() { + debug!("No JSON values to write"); + return Ok(()); + } + + let mut state_guard = self.state.lock().await; + let state = state_guard.as_mut().ok_or_else(|| { + error!("Delta sink state not initialized — was open() called?"); + Error::InvalidConfig + })?; + + // Apply coercions to match Delta table schema + for value in &mut json_values { + coerce(value, &state.coercion_tree); + } + + // Write JSON values to internal Parquet buffers + state.writer.write(json_values).await.map_err(|e| { + error!("Failed to write to Delta writer: {e}"); + Error::Storage(format!("Failed to write to Delta writer: {e}")) + })?; + + // Flush buffers to object store and commit to Delta log + let version = match state.writer.flush_and_commit(&mut state.table).await { Review Comment: every `consume()` call does `flush_and_commit`, which creates a new parquet file and a new JSON transaction log entry in `_delta_log/`. at high throughput with, say, 1000-message batches at 1M ops/sec, that's 1000 parquet files and 1000 log entries per second. delta lake degrades catastrophically under this - metadata parsing slows, checkpoint overhead grows, and cloud object store LIST calls become a bottleneck. the sink needs a buffering strategy: accumulate across multiple `consume()` calls and flush on a configurable row count, byte threshold, or time interval - not per batch. ########## core/connectors/sinks/delta_sink/src/lib.rs: ########## @@ -0,0 +1,90 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use deltalake::DeltaTable; +use deltalake::writer::JsonWriter; +use iggy_connector_sdk::sink_connector; +use serde::{Deserialize, Serialize}; +use tokio::sync::Mutex; + +mod coercions; +mod sink; +mod storage; + +use crate::coercions::CoercionTree; + +sink_connector!(DeltaSink); + +#[derive(Debug)] +pub struct DeltaSink { + id: u32, + config: DeltaSinkConfig, + state: Mutex<Option<SinkState>>, +} + +#[derive(Debug)] +struct SinkState { + table: DeltaTable, + writer: JsonWriter, + coercion_tree: CoercionTree, +} + +#[derive(Debug, Serialize, Deserialize)] Review Comment: `DeltaSinkConfig` derives `Debug`, which means AWS secret keys, Azure account keys, and GCS service account keys will appear in debug output / logs. consider implementing `Debug` manually to redact sensitive fields, or wrapping them in a newtype that redacts on display. ########## core/connectors/sinks/delta_sink/src/storage.rs: ########## @@ -0,0 +1,277 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use crate::DeltaSinkConfig; +use iggy_connector_sdk::Error; +use std::collections::HashMap; + +pub(crate) fn build_storage_options( + config: &DeltaSinkConfig, +) -> Result<HashMap<String, String>, Error> { + let mut opts = HashMap::new(); + + match config.storage_backend_type.as_deref() { + Some("s3") => { + let access_key = config + .aws_s3_access_key + .as_ref() + .ok_or(Error::InvalidConfig)?; + let secret_key = config + .aws_s3_secret_key + .as_ref() + .ok_or(Error::InvalidConfig)?; + let region = config.aws_s3_region.as_ref().ok_or(Error::InvalidConfig)?; + let endpoint_url = config Review Comment: also line 43. `endpoint_url` and `allow_http` are only needed for S3-compatible services (MinIO, LocalStack), not actual AWS S3. making them mandatory breaks the vanilla AWS use case. these should be optional with sensible defaults (no custom endpoint, HTTP disallowed). ########## core/connectors/sinks/delta_sink/src/coercions.rs: ########## @@ -0,0 +1,600 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use deltalake::kernel::Schema as DeltaSchema; +use deltalake::kernel::{DataType, PrimitiveType}; + +use chrono::prelude::*; +use serde_json::Value; +use std::collections::HashMap; +use std::str::FromStr; + +#[derive(Debug, Clone, PartialEq)] +#[allow(unused)] +enum CoercionNode { + Coercion(Coercion), + Tree(CoercionTree), + ArrayTree(CoercionTree), + ArrayPrimitive(Coercion), +} + +#[derive(Debug, Clone, PartialEq)] +enum Coercion { + ToString, + ToTimestamp, +} + +#[derive(Debug, Clone, PartialEq)] +pub(crate) struct CoercionTree { + root: HashMap<String, CoercionNode>, +} + +/// Returns a [`CoercionTree`] so the schema can be walked efficiently level by level when performing conversions. +pub(crate) fn create_coercion_tree(schema: &DeltaSchema) -> CoercionTree { + let mut root = HashMap::new(); + + for field in schema.fields() { + if let Some(node) = build_coercion_node(field.data_type()) { + root.insert(field.name().to_string(), node); + } + } + + CoercionTree { root } +} + +fn build_coercion_node(data_type: &DataType) -> Option<CoercionNode> { + match data_type { + DataType::Primitive(primitive) => match primitive { + PrimitiveType::String => Some(CoercionNode::Coercion(Coercion::ToString)), + PrimitiveType::Timestamp => Some(CoercionNode::Coercion(Coercion::ToTimestamp)), + _ => None, + }, + DataType::Struct(st) => { + let nested_context = create_coercion_tree(st); + if !nested_context.root.is_empty() { + Some(CoercionNode::Tree(nested_context)) + } else { + None + } + } + DataType::Array(array) => { + build_coercion_node(array.element_type()).and_then(|node| match node { + CoercionNode::Coercion(c) => Some(CoercionNode::ArrayPrimitive(c)), + CoercionNode::Tree(t) => Some(CoercionNode::ArrayTree(t)), + _ => None, + }) + } + _ => None, + } +} + +/// Applies all data coercions specified by the [`CoercionTree`] to the [`Value`]. +pub(crate) fn coerce(value: &mut Value, coercion_tree: &CoercionTree) { + if let Some(context) = value.as_object_mut() { + for (field_name, coercion) in coercion_tree.root.iter() { + if let Some(value) = context.get_mut(field_name) { + apply_coercion(value, coercion); + } + } + } +} + +fn apply_coercion(value: &mut Value, node: &CoercionNode) { + match node { + CoercionNode::Coercion(Coercion::ToString) => { + if !value.is_string() { + *value = Value::String(value.to_string()); + } + } + CoercionNode::Coercion(Coercion::ToTimestamp) => { + if let Some(as_str) = value.as_str() + && let Some(parsed) = string_to_timestamp(as_str) + { + *value = parsed + } + } + CoercionNode::Tree(tree) => { + for (name, node) in tree.root.iter() { + let fields = value.as_object_mut(); + if let Some(fields) = fields + && let Some(value) = fields.get_mut(name) + { + apply_coercion(value, node); + } + } + } + CoercionNode::ArrayPrimitive(coercion) => { + let values = value.as_array_mut(); + if let Some(values) = values { + let node = CoercionNode::Coercion(coercion.clone()); + for value in values { + apply_coercion(value, &node); + } + } + } + CoercionNode::ArrayTree(tree) => { + let values = value.as_array_mut(); + if let Some(values) = values { + let node = CoercionNode::Tree(tree.clone()); + for value in values { + apply_coercion(value, &node); + } + } + } + } +} + +fn string_to_timestamp(string: &str) -> Option<Value> { + let parsed = DateTime::from_str(string); + if let Err(e) = parsed { + tracing::error!( Review Comment: an unparseable timestamp string is an expected data quality issue, not an infrastructure error. using `error!` level here will flood monitoring/alerting under normal operation with dirty data. downgrade to `warn!`. ########## core/connectors/sinks/delta_sink/src/sink.rs: ########## @@ -0,0 +1,308 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use crate::DeltaSink; +use crate::SinkState; +use crate::coercions::{coerce, create_coercion_tree}; +use crate::storage::build_storage_options; +use async_trait::async_trait; +use deltalake::writer::{DeltaWriter, JsonWriter}; +use iggy_connector_sdk::{ConsumedMessage, Error, MessagesMetadata, Payload, Sink, TopicMetadata}; +use tracing::{debug, error, info}; + +#[async_trait] +impl Sink for DeltaSink { + async fn open(&mut self) -> Result<(), Error> { + info!( + "Opening Delta Lake sink connector with ID: {} for table: {}", + self.id, self.config.table_uri + ); + + let table_url = url::Url::parse(&self.config.table_uri).map_err(|e| { + error!("Failed to parse table URI '{}': {e}", self.config.table_uri); + Error::InitError(format!("Invalid table URI: {e}")) + })?; + + info!("Parsed table URI: {}", table_url); + + let storage_options = build_storage_options(&self.config).map_err(|e| { + error!("Invalid storage configuration: {e}"); + Error::InitError(format!("Invalid storage configuration: {e}")) + })?; + + let table = + match deltalake::open_table_with_storage_options(table_url, storage_options.clone()) + .await + { + Ok(table) => table, + Err(e) => { + error!("Failed to load Delta table: {e}"); + return Err(Error::InitError(format!("Failed to load Delta table: {e}"))); + } + }; + + let kernel_schema = table + .snapshot() + .map_err(|e| { + error!("Failed to get table snapshot: {e}"); + Error::InitError(format!("Failed to get table snapshot: {e}")) + })? + .schema(); + let coercion_tree = create_coercion_tree(&kernel_schema); + + let writer = JsonWriter::for_table(&table).map_err(|e| { + error!("Failed to create JsonWriter: {e}"); + Error::InitError(format!("Failed to create JsonWriter: {e}")) + })?; + + *self.state.lock().await = Some(SinkState { + table, + writer, + coercion_tree, + }); + + info!( + "Delta Lake sink connector with ID: {} opened successfully.", + self.id + ); + Ok(()) + } + + async fn consume( + &self, + _topic_metadata: &TopicMetadata, + messages_metadata: MessagesMetadata, + messages: Vec<ConsumedMessage>, + ) -> Result<(), Error> { + debug!( + "Delta sink with ID: {} received: {} messages, partition: {}, offset: {}", + self.id, + messages.len(), + messages_metadata.partition_id, + messages_metadata.current_offset, + ); + + // Extract JSON values from consumed messages + let mut json_values: Vec<serde_json::Value> = Vec::with_capacity(messages.len()); + for msg in &messages { + match &msg.payload { + Payload::Json(simd_value) => { + json_values.push(owned_value_to_serde_json(simd_value)); + } + other => { + error!( + "Unsupported payload type: {other}. Delta sink only supports JSON payloads." + ); + return Err(Error::InvalidPayloadType); + } + } + } + + if json_values.is_empty() { + debug!("No JSON values to write"); + return Ok(()); + } + + let mut state_guard = self.state.lock().await; + let state = state_guard.as_mut().ok_or_else(|| { + error!("Delta sink state not initialized — was open() called?"); + Error::InvalidConfig + })?; + + // Apply coercions to match Delta table schema + for value in &mut json_values { + coerce(value, &state.coercion_tree); + } + + // Write JSON values to internal Parquet buffers + state.writer.write(json_values).await.map_err(|e| { + error!("Failed to write to Delta writer: {e}"); + Error::Storage(format!("Failed to write to Delta writer: {e}")) + })?; + + // Flush buffers to object store and commit to Delta log + let version = match state.writer.flush_and_commit(&mut state.table).await { + Ok(v) => v, + Err(e) => { + state.writer.reset(); + error!("Failed to flush and commit to Delta table: {e}"); + return Err(Error::Storage(format!("Failed to flush and commit: {e}"))); + } + }; + + debug!( + "Delta sink with ID: {} committed version {}", + self.id, version + ); + + Ok(()) + } + + async fn close(&mut self) -> Result<(), Error> { + if let Some(mut state) = self.state.lock().await.take() + && let Err(e) = state.writer.flush_and_commit(&mut state.table).await + { + error!( + "Delta sink with ID: {} failed to flush on close: {e}", + self.id + ); + return Err(Error::Storage(format!("Failed to flush on close: {e}"))); + } + info!("Delta Lake sink connector with ID: {} is closed.", self.id); + Ok(()) + } +} + +fn owned_value_to_serde_json(value: &simd_json::OwnedValue) -> serde_json::Value { Review Comment: this `simd_json::OwnedValue -> serde_json::Value` conversion is copy-pasted identically in `elasticsearch_sink/src/lib.rs:39`. should live in `iggy_connector_sdk` or a shared utility crate. when the conversion strategy changes, both need updating independently right now. i commented same in https://github.com/apache/iggy/pull/2925 ########## core/connectors/sinks/delta_sink/src/sink.rs: ########## @@ -0,0 +1,308 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use crate::DeltaSink; +use crate::SinkState; +use crate::coercions::{coerce, create_coercion_tree}; +use crate::storage::build_storage_options; +use async_trait::async_trait; +use deltalake::writer::{DeltaWriter, JsonWriter}; +use iggy_connector_sdk::{ConsumedMessage, Error, MessagesMetadata, Payload, Sink, TopicMetadata}; +use tracing::{debug, error, info}; + +#[async_trait] +impl Sink for DeltaSink { + async fn open(&mut self) -> Result<(), Error> { + info!( + "Opening Delta Lake sink connector with ID: {} for table: {}", + self.id, self.config.table_uri + ); + + let table_url = url::Url::parse(&self.config.table_uri).map_err(|e| { + error!("Failed to parse table URI '{}': {e}", self.config.table_uri); + Error::InitError(format!("Invalid table URI: {e}")) + })?; + + info!("Parsed table URI: {}", table_url); + + let storage_options = build_storage_options(&self.config).map_err(|e| { + error!("Invalid storage configuration: {e}"); + Error::InitError(format!("Invalid storage configuration: {e}")) + })?; + + let table = + match deltalake::open_table_with_storage_options(table_url, storage_options.clone()) Review Comment: `build_storage_options` returns an owned `HashMap` which is then `.clone()`-d just to pass to `open_table_with_storage_options`. since the original is never used again after this call, just pass the owned map directly and drop the clone. ########## core/connectors/sinks/delta_sink/src/coercions.rs: ########## @@ -0,0 +1,600 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use deltalake::kernel::Schema as DeltaSchema; +use deltalake::kernel::{DataType, PrimitiveType}; + +use chrono::prelude::*; +use serde_json::Value; +use std::collections::HashMap; +use std::str::FromStr; + +#[derive(Debug, Clone, PartialEq)] +#[allow(unused)] +enum CoercionNode { + Coercion(Coercion), + Tree(CoercionTree), + ArrayTree(CoercionTree), + ArrayPrimitive(Coercion), +} + +#[derive(Debug, Clone, PartialEq)] +enum Coercion { + ToString, + ToTimestamp, +} + +#[derive(Debug, Clone, PartialEq)] +pub(crate) struct CoercionTree { + root: HashMap<String, CoercionNode>, +} + +/// Returns a [`CoercionTree`] so the schema can be walked efficiently level by level when performing conversions. +pub(crate) fn create_coercion_tree(schema: &DeltaSchema) -> CoercionTree { + let mut root = HashMap::new(); + + for field in schema.fields() { + if let Some(node) = build_coercion_node(field.data_type()) { + root.insert(field.name().to_string(), node); + } + } + + CoercionTree { root } +} + +fn build_coercion_node(data_type: &DataType) -> Option<CoercionNode> { + match data_type { + DataType::Primitive(primitive) => match primitive { + PrimitiveType::String => Some(CoercionNode::Coercion(Coercion::ToString)), + PrimitiveType::Timestamp => Some(CoercionNode::Coercion(Coercion::ToTimestamp)), + _ => None, + }, + DataType::Struct(st) => { + let nested_context = create_coercion_tree(st); + if !nested_context.root.is_empty() { + Some(CoercionNode::Tree(nested_context)) + } else { + None + } + } + DataType::Array(array) => { + build_coercion_node(array.element_type()).and_then(|node| match node { + CoercionNode::Coercion(c) => Some(CoercionNode::ArrayPrimitive(c)), + CoercionNode::Tree(t) => Some(CoercionNode::ArrayTree(t)), + _ => None, + }) + } + _ => None, + } +} + +/// Applies all data coercions specified by the [`CoercionTree`] to the [`Value`]. +pub(crate) fn coerce(value: &mut Value, coercion_tree: &CoercionTree) { + if let Some(context) = value.as_object_mut() { + for (field_name, coercion) in coercion_tree.root.iter() { + if let Some(value) = context.get_mut(field_name) { + apply_coercion(value, coercion); + } + } + } +} + +fn apply_coercion(value: &mut Value, node: &CoercionNode) { + match node { + CoercionNode::Coercion(Coercion::ToString) => { + if !value.is_string() { + *value = Value::String(value.to_string()); + } + } + CoercionNode::Coercion(Coercion::ToTimestamp) => { + if let Some(as_str) = value.as_str() + && let Some(parsed) = string_to_timestamp(as_str) + { + *value = parsed + } + } + CoercionNode::Tree(tree) => { + for (name, node) in tree.root.iter() { + let fields = value.as_object_mut(); + if let Some(fields) = fields + && let Some(value) = fields.get_mut(name) + { + apply_coercion(value, node); + } + } + } + CoercionNode::ArrayPrimitive(coercion) => { + let values = value.as_array_mut(); + if let Some(values) = values { + let node = CoercionNode::Coercion(coercion.clone()); Review Comment: also line 133. `coercion.clone()` and `tree.clone()` are called once per array-typed field per message. `CoercionTree` contains a `HashMap<String, CoercionNode>`, so cloning it for every array in every message is a heap allocation storm on wide schemas. refactor to pass `&Coercion` / `&CoercionTree` directly instead of wrapping in a new `CoercionNode`. ########## core/connectors/sinks/delta_sink/src/coercions.rs: ########## @@ -0,0 +1,600 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use deltalake::kernel::Schema as DeltaSchema; +use deltalake::kernel::{DataType, PrimitiveType}; + +use chrono::prelude::*; +use serde_json::Value; +use std::collections::HashMap; +use std::str::FromStr; + +#[derive(Debug, Clone, PartialEq)] +#[allow(unused)] +enum CoercionNode { + Coercion(Coercion), + Tree(CoercionTree), + ArrayTree(CoercionTree), + ArrayPrimitive(Coercion), +} + +#[derive(Debug, Clone, PartialEq)] +enum Coercion { + ToString, + ToTimestamp, +} + +#[derive(Debug, Clone, PartialEq)] +pub(crate) struct CoercionTree { + root: HashMap<String, CoercionNode>, +} + +/// Returns a [`CoercionTree`] so the schema can be walked efficiently level by level when performing conversions. +pub(crate) fn create_coercion_tree(schema: &DeltaSchema) -> CoercionTree { + let mut root = HashMap::new(); + + for field in schema.fields() { + if let Some(node) = build_coercion_node(field.data_type()) { + root.insert(field.name().to_string(), node); + } + } + + CoercionTree { root } +} + +fn build_coercion_node(data_type: &DataType) -> Option<CoercionNode> { + match data_type { + DataType::Primitive(primitive) => match primitive { + PrimitiveType::String => Some(CoercionNode::Coercion(Coercion::ToString)), + PrimitiveType::Timestamp => Some(CoercionNode::Coercion(Coercion::ToTimestamp)), + _ => None, + }, + DataType::Struct(st) => { + let nested_context = create_coercion_tree(st); + if !nested_context.root.is_empty() { + Some(CoercionNode::Tree(nested_context)) + } else { + None + } + } + DataType::Array(array) => { + build_coercion_node(array.element_type()).and_then(|node| match node { + CoercionNode::Coercion(c) => Some(CoercionNode::ArrayPrimitive(c)), + CoercionNode::Tree(t) => Some(CoercionNode::ArrayTree(t)), + _ => None, + }) + } + _ => None, Review Comment: `build_coercion_node` handles `Primitive`, `Struct`, and `Array` but falls through to `None` for `Map` and `Variant`. if a delta table has a `Map<String, Timestamp>` column, timestamp values inside the map won't be coerced. if intentional, add a comment documenting this as a known limitation so future maintainers know it's deliberate. ########## core/connectors/sinks/delta_sink/Cargo.toml: ########## @@ -0,0 +1,48 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +[package] +name = "iggy_connector_delta_sink" +version = "0.1.0" +edition = "2024" +license = "Apache-2.0" +keywords = ["iggy", "messaging", "streaming"] +categories = ["command-line-utilities", "database", "network-programming"] +homepage = "https://iggy.apache.org" +documentation = "https://iggy.apache.org/docs" +repository = "https://github.com/apache/iggy" +readme = "../../README.md" + +[package.metadata.cargo-machete] +ignored = ["dashmap", "once_cell"] + +[lib] +crate-type = ["cdylib", "lib"] + +[dependencies] +async-trait = { workspace = true } +chrono = { workspace = true } +dashmap = { workspace = true } Review Comment: also line 42. `dashmap` and `once_cell` are listed in `[dependencies]` and then suppressed via `[package.metadata.cargo-machete] ignored`. if they're truly unused, remove them rather than suppressing the warning. ########## core/connectors/sinks/delta_sink/src/storage.rs: ########## @@ -0,0 +1,277 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use crate::DeltaSinkConfig; +use iggy_connector_sdk::Error; +use std::collections::HashMap; + +pub(crate) fn build_storage_options( + config: &DeltaSinkConfig, +) -> Result<HashMap<String, String>, Error> { + let mut opts = HashMap::new(); + + match config.storage_backend_type.as_deref() { + Some("s3") => { + let access_key = config + .aws_s3_access_key + .as_ref() + .ok_or(Error::InvalidConfig)?; Review Comment: also at lines 37, 38, 42, 43, 56, 60, 64, 68, 80, 89. every `.ok_or(Error::InvalidConfig)?` returns the same variant with no indication of which field is missing or which backend was selected. a user who misconfigures one field gets `"Invalid config"` with zero diagnostic. use `Error::InitError(format!("S3 backend requires 'aws_s3_access_key'"))` or similar so the error message actually tells the user what to fix. ########## core/connectors/sinks/delta_sink/src/coercions.rs: ########## @@ -0,0 +1,600 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use deltalake::kernel::Schema as DeltaSchema; +use deltalake::kernel::{DataType, PrimitiveType}; + +use chrono::prelude::*; +use serde_json::Value; +use std::collections::HashMap; +use std::str::FromStr; + +#[derive(Debug, Clone, PartialEq)] +#[allow(unused)] +enum CoercionNode { + Coercion(Coercion), + Tree(CoercionTree), + ArrayTree(CoercionTree), + ArrayPrimitive(Coercion), +} + +#[derive(Debug, Clone, PartialEq)] +enum Coercion { + ToString, + ToTimestamp, +} + +#[derive(Debug, Clone, PartialEq)] +pub(crate) struct CoercionTree { + root: HashMap<String, CoercionNode>, +} + +/// Returns a [`CoercionTree`] so the schema can be walked efficiently level by level when performing conversions. +pub(crate) fn create_coercion_tree(schema: &DeltaSchema) -> CoercionTree { + let mut root = HashMap::new(); + + for field in schema.fields() { + if let Some(node) = build_coercion_node(field.data_type()) { + root.insert(field.name().to_string(), node); + } + } + + CoercionTree { root } +} + +fn build_coercion_node(data_type: &DataType) -> Option<CoercionNode> { + match data_type { + DataType::Primitive(primitive) => match primitive { + PrimitiveType::String => Some(CoercionNode::Coercion(Coercion::ToString)), + PrimitiveType::Timestamp => Some(CoercionNode::Coercion(Coercion::ToTimestamp)), + _ => None, + }, + DataType::Struct(st) => { + let nested_context = create_coercion_tree(st); + if !nested_context.root.is_empty() { + Some(CoercionNode::Tree(nested_context)) + } else { + None + } + } + DataType::Array(array) => { + build_coercion_node(array.element_type()).and_then(|node| match node { + CoercionNode::Coercion(c) => Some(CoercionNode::ArrayPrimitive(c)), + CoercionNode::Tree(t) => Some(CoercionNode::ArrayTree(t)), + _ => None, + }) + } + _ => None, + } +} + +/// Applies all data coercions specified by the [`CoercionTree`] to the [`Value`]. +pub(crate) fn coerce(value: &mut Value, coercion_tree: &CoercionTree) { + if let Some(context) = value.as_object_mut() { + for (field_name, coercion) in coercion_tree.root.iter() { + if let Some(value) = context.get_mut(field_name) { + apply_coercion(value, coercion); + } + } + } +} + +fn apply_coercion(value: &mut Value, node: &CoercionNode) { + match node { + CoercionNode::Coercion(Coercion::ToString) => { + if !value.is_string() { + *value = Value::String(value.to_string()); + } + } + CoercionNode::Coercion(Coercion::ToTimestamp) => { Review Comment: when `string_to_timestamp` returns `None` (unparseable string), the value remains as a JSON string. `JsonWriter` then receives a string where it expects an i64 microsecond timestamp, causing `writer.write()` at sink.rs:133 to fail - rejecting the entire batch (potentially hundreds of valid messages) due to a single malformed timestamp in one message. either fail fast with an explicit error naming the field/value, or set the value to null (if the column is nullable) to preserve the rest of the batch. ########## core/connectors/sinks/delta_sink/src/sink.rs: ########## @@ -0,0 +1,308 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use crate::DeltaSink; +use crate::SinkState; +use crate::coercions::{coerce, create_coercion_tree}; +use crate::storage::build_storage_options; +use async_trait::async_trait; +use deltalake::writer::{DeltaWriter, JsonWriter}; +use iggy_connector_sdk::{ConsumedMessage, Error, MessagesMetadata, Payload, Sink, TopicMetadata}; +use tracing::{debug, error, info}; + +#[async_trait] +impl Sink for DeltaSink { + async fn open(&mut self) -> Result<(), Error> { + info!( + "Opening Delta Lake sink connector with ID: {} for table: {}", + self.id, self.config.table_uri + ); + + let table_url = url::Url::parse(&self.config.table_uri).map_err(|e| { + error!("Failed to parse table URI '{}': {e}", self.config.table_uri); + Error::InitError(format!("Invalid table URI: {e}")) + })?; + + info!("Parsed table URI: {}", table_url); + + let storage_options = build_storage_options(&self.config).map_err(|e| { + error!("Invalid storage configuration: {e}"); + Error::InitError(format!("Invalid storage configuration: {e}")) + })?; + + let table = + match deltalake::open_table_with_storage_options(table_url, storage_options.clone()) + .await + { + Ok(table) => table, + Err(e) => { + error!("Failed to load Delta table: {e}"); + return Err(Error::InitError(format!("Failed to load Delta table: {e}"))); + } + }; + + let kernel_schema = table + .snapshot() + .map_err(|e| { + error!("Failed to get table snapshot: {e}"); + Error::InitError(format!("Failed to get table snapshot: {e}")) + })? + .schema(); + let coercion_tree = create_coercion_tree(&kernel_schema); + + let writer = JsonWriter::for_table(&table).map_err(|e| { + error!("Failed to create JsonWriter: {e}"); + Error::InitError(format!("Failed to create JsonWriter: {e}")) + })?; + + *self.state.lock().await = Some(SinkState { + table, + writer, + coercion_tree, + }); + + info!( + "Delta Lake sink connector with ID: {} opened successfully.", + self.id + ); + Ok(()) + } + + async fn consume( + &self, + _topic_metadata: &TopicMetadata, + messages_metadata: MessagesMetadata, + messages: Vec<ConsumedMessage>, + ) -> Result<(), Error> { + debug!( + "Delta sink with ID: {} received: {} messages, partition: {}, offset: {}", + self.id, + messages.len(), + messages_metadata.partition_id, + messages_metadata.current_offset, + ); + + // Extract JSON values from consumed messages + let mut json_values: Vec<serde_json::Value> = Vec::with_capacity(messages.len()); + for msg in &messages { + match &msg.payload { + Payload::Json(simd_value) => { + json_values.push(owned_value_to_serde_json(simd_value)); + } + other => { + error!( + "Unsupported payload type: {other}. Delta sink only supports JSON payloads." + ); + return Err(Error::InvalidPayloadType); + } + } + } + + if json_values.is_empty() { + debug!("No JSON values to write"); + return Ok(()); + } + + let mut state_guard = self.state.lock().await; + let state = state_guard.as_mut().ok_or_else(|| { + error!("Delta sink state not initialized — was open() called?"); + Error::InvalidConfig + })?; + + // Apply coercions to match Delta table schema + for value in &mut json_values { + coerce(value, &state.coercion_tree); + } + + // Write JSON values to internal Parquet buffers + state.writer.write(json_values).await.map_err(|e| { + error!("Failed to write to Delta writer: {e}"); + Error::Storage(format!("Failed to write to Delta writer: {e}")) + })?; + + // Flush buffers to object store and commit to Delta log + let version = match state.writer.flush_and_commit(&mut state.table).await { + Ok(v) => v, + Err(e) => { + state.writer.reset(); Review Comment: when `write()` succeeds (line 133) but `flush_and_commit()` fails here, `reset()` clears the internal parquet buffer. those messages are permanently lost with no retry path. since the connector runtime uses `AutoCommitWhen::PollingMessages` (offset committed before consume), the consumer offset may already have been advanced past these messages. there's no retry, no DLQ, no metric - the messages just vanish. ########## core/connectors/sinks/delta_sink/src/sink.rs: ########## @@ -0,0 +1,308 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use crate::DeltaSink; +use crate::SinkState; +use crate::coercions::{coerce, create_coercion_tree}; +use crate::storage::build_storage_options; +use async_trait::async_trait; +use deltalake::writer::{DeltaWriter, JsonWriter}; +use iggy_connector_sdk::{ConsumedMessage, Error, MessagesMetadata, Payload, Sink, TopicMetadata}; +use tracing::{debug, error, info}; + +#[async_trait] +impl Sink for DeltaSink { + async fn open(&mut self) -> Result<(), Error> { + info!( + "Opening Delta Lake sink connector with ID: {} for table: {}", + self.id, self.config.table_uri + ); + + let table_url = url::Url::parse(&self.config.table_uri).map_err(|e| { + error!("Failed to parse table URI '{}': {e}", self.config.table_uri); + Error::InitError(format!("Invalid table URI: {e}")) + })?; + + info!("Parsed table URI: {}", table_url); + + let storage_options = build_storage_options(&self.config).map_err(|e| { + error!("Invalid storage configuration: {e}"); + Error::InitError(format!("Invalid storage configuration: {e}")) + })?; + + let table = + match deltalake::open_table_with_storage_options(table_url, storage_options.clone()) + .await + { + Ok(table) => table, + Err(e) => { + error!("Failed to load Delta table: {e}"); + return Err(Error::InitError(format!("Failed to load Delta table: {e}"))); + } + }; + + let kernel_schema = table + .snapshot() + .map_err(|e| { + error!("Failed to get table snapshot: {e}"); + Error::InitError(format!("Failed to get table snapshot: {e}")) + })? + .schema(); + let coercion_tree = create_coercion_tree(&kernel_schema); + + let writer = JsonWriter::for_table(&table).map_err(|e| { + error!("Failed to create JsonWriter: {e}"); + Error::InitError(format!("Failed to create JsonWriter: {e}")) + })?; + + *self.state.lock().await = Some(SinkState { + table, + writer, + coercion_tree, + }); + + info!( + "Delta Lake sink connector with ID: {} opened successfully.", + self.id + ); + Ok(()) + } + + async fn consume( + &self, + _topic_metadata: &TopicMetadata, + messages_metadata: MessagesMetadata, + messages: Vec<ConsumedMessage>, + ) -> Result<(), Error> { + debug!( + "Delta sink with ID: {} received: {} messages, partition: {}, offset: {}", + self.id, + messages.len(), + messages_metadata.partition_id, + messages_metadata.current_offset, + ); + + // Extract JSON values from consumed messages + let mut json_values: Vec<serde_json::Value> = Vec::with_capacity(messages.len()); + for msg in &messages { + match &msg.payload { + Payload::Json(simd_value) => { + json_values.push(owned_value_to_serde_json(simd_value)); + } + other => { + error!( + "Unsupported payload type: {other}. Delta sink only supports JSON payloads." + ); + return Err(Error::InvalidPayloadType); + } + } + } + + if json_values.is_empty() { + debug!("No JSON values to write"); + return Ok(()); + } + + let mut state_guard = self.state.lock().await; + let state = state_guard.as_mut().ok_or_else(|| { + error!("Delta sink state not initialized — was open() called?"); + Error::InvalidConfig + })?; + + // Apply coercions to match Delta table schema + for value in &mut json_values { + coerce(value, &state.coercion_tree); + } + + // Write JSON values to internal Parquet buffers + state.writer.write(json_values).await.map_err(|e| { + error!("Failed to write to Delta writer: {e}"); + Error::Storage(format!("Failed to write to Delta writer: {e}")) + })?; + + // Flush buffers to object store and commit to Delta log + let version = match state.writer.flush_and_commit(&mut state.table).await { + Ok(v) => v, + Err(e) => { + state.writer.reset(); + error!("Failed to flush and commit to Delta table: {e}"); + return Err(Error::Storage(format!("Failed to flush and commit: {e}"))); + } + }; + + debug!( + "Delta sink with ID: {} committed version {}", + self.id, version + ); + + Ok(()) + } + + async fn close(&mut self) -> Result<(), Error> { + if let Some(mut state) = self.state.lock().await.take() + && let Err(e) = state.writer.flush_and_commit(&mut state.table).await + { + error!( + "Delta sink with ID: {} failed to flush on close: {e}", + self.id + ); + return Err(Error::Storage(format!("Failed to flush on close: {e}"))); + } + info!("Delta Lake sink connector with ID: {} is closed.", self.id); + Ok(()) + } +} + +fn owned_value_to_serde_json(value: &simd_json::OwnedValue) -> serde_json::Value { + match value { + simd_json::OwnedValue::Static(s) => match s { + simd_json::StaticNode::Null => serde_json::Value::Null, + simd_json::StaticNode::Bool(b) => serde_json::Value::Bool(*b), + simd_json::StaticNode::I64(n) => serde_json::Value::Number((*n).into()), + simd_json::StaticNode::U64(n) => serde_json::Value::Number((*n).into()), + simd_json::StaticNode::F64(n) => serde_json::Number::from_f64(*n) Review Comment: non-finite f64 values (NaN, Infinity) are silently mapped to `serde_json::Value::Null` here. for a delta column typed as `Double`, writing null where the source had NaN is data loss. if the column is non-nullable, this causes a confusing write failure downstream. add a `tracing::warn!` when this conversion happens so operators can diagnose data quality issues. ########## core/connectors/sinks/delta_sink/src/coercions.rs: ########## @@ -0,0 +1,600 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use deltalake::kernel::Schema as DeltaSchema; +use deltalake::kernel::{DataType, PrimitiveType}; + +use chrono::prelude::*; +use serde_json::Value; +use std::collections::HashMap; +use std::str::FromStr; + +#[derive(Debug, Clone, PartialEq)] +#[allow(unused)] +enum CoercionNode { + Coercion(Coercion), + Tree(CoercionTree), + ArrayTree(CoercionTree), + ArrayPrimitive(Coercion), +} + +#[derive(Debug, Clone, PartialEq)] +enum Coercion { + ToString, + ToTimestamp, +} + +#[derive(Debug, Clone, PartialEq)] +pub(crate) struct CoercionTree { + root: HashMap<String, CoercionNode>, +} + +/// Returns a [`CoercionTree`] so the schema can be walked efficiently level by level when performing conversions. +pub(crate) fn create_coercion_tree(schema: &DeltaSchema) -> CoercionTree { + let mut root = HashMap::new(); + + for field in schema.fields() { + if let Some(node) = build_coercion_node(field.data_type()) { + root.insert(field.name().to_string(), node); + } + } + + CoercionTree { root } +} + +fn build_coercion_node(data_type: &DataType) -> Option<CoercionNode> { + match data_type { + DataType::Primitive(primitive) => match primitive { + PrimitiveType::String => Some(CoercionNode::Coercion(Coercion::ToString)), + PrimitiveType::Timestamp => Some(CoercionNode::Coercion(Coercion::ToTimestamp)), Review Comment: `build_coercion_node` handles `PrimitiveType::Timestamp` but not `PrimitiveType::TimestampNtz`. delta lake has both `Timestamp` (microseconds, UTC-adjusted) and `TimestampNtz` (microseconds, no timezone). if a delta table uses `TimestampNtz` columns and receives ISO 8601 strings, they pass through uncoerced and `JsonWriter` receives a string where it expects an i64 - causing a write failure. fix: add `PrimitiveType::TimestampNtz` alongside `Timestamp` in this match. ########## core/connectors/sinks/delta_sink/src/sink.rs: ########## @@ -0,0 +1,308 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use crate::DeltaSink; +use crate::SinkState; +use crate::coercions::{coerce, create_coercion_tree}; +use crate::storage::build_storage_options; +use async_trait::async_trait; +use deltalake::writer::{DeltaWriter, JsonWriter}; +use iggy_connector_sdk::{ConsumedMessage, Error, MessagesMetadata, Payload, Sink, TopicMetadata}; +use tracing::{debug, error, info}; + +#[async_trait] +impl Sink for DeltaSink { + async fn open(&mut self) -> Result<(), Error> { + info!( + "Opening Delta Lake sink connector with ID: {} for table: {}", + self.id, self.config.table_uri + ); + + let table_url = url::Url::parse(&self.config.table_uri).map_err(|e| { + error!("Failed to parse table URI '{}': {e}", self.config.table_uri); + Error::InitError(format!("Invalid table URI: {e}")) + })?; + + info!("Parsed table URI: {}", table_url); + + let storage_options = build_storage_options(&self.config).map_err(|e| { + error!("Invalid storage configuration: {e}"); + Error::InitError(format!("Invalid storage configuration: {e}")) + })?; + + let table = + match deltalake::open_table_with_storage_options(table_url, storage_options.clone()) + .await + { + Ok(table) => table, + Err(e) => { + error!("Failed to load Delta table: {e}"); + return Err(Error::InitError(format!("Failed to load Delta table: {e}"))); + } + }; + + let kernel_schema = table + .snapshot() + .map_err(|e| { + error!("Failed to get table snapshot: {e}"); + Error::InitError(format!("Failed to get table snapshot: {e}")) + })? + .schema(); + let coercion_tree = create_coercion_tree(&kernel_schema); + + let writer = JsonWriter::for_table(&table).map_err(|e| { + error!("Failed to create JsonWriter: {e}"); + Error::InitError(format!("Failed to create JsonWriter: {e}")) + })?; + + *self.state.lock().await = Some(SinkState { + table, + writer, + coercion_tree, + }); + + info!( + "Delta Lake sink connector with ID: {} opened successfully.", + self.id + ); + Ok(()) + } + + async fn consume( + &self, + _topic_metadata: &TopicMetadata, + messages_metadata: MessagesMetadata, + messages: Vec<ConsumedMessage>, + ) -> Result<(), Error> { + debug!( + "Delta sink with ID: {} received: {} messages, partition: {}, offset: {}", + self.id, + messages.len(), + messages_metadata.partition_id, + messages_metadata.current_offset, + ); + + // Extract JSON values from consumed messages + let mut json_values: Vec<serde_json::Value> = Vec::with_capacity(messages.len()); + for msg in &messages { + match &msg.payload { + Payload::Json(simd_value) => { + json_values.push(owned_value_to_serde_json(simd_value)); + } + other => { + error!( + "Unsupported payload type: {other}. Delta sink only supports JSON payloads." + ); + return Err(Error::InvalidPayloadType); + } + } + } + + if json_values.is_empty() { + debug!("No JSON values to write"); + return Ok(()); + } + + let mut state_guard = self.state.lock().await; Review Comment: the tokio mutex is acquired here and held through both `writer.write()` (in-memory parquet encoding) and `writer.flush_and_commit()` (network/disk I/O + delta log commit). if `consume()` is called concurrently from multiple partitions (the SDK uses `&self` not `&mut self`), every caller serializes on this lock including the I/O wait. combined with per-batch flush, a slow object store response blocks all partitions. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
