hubcio commented on code in PR #2889:
URL: https://github.com/apache/iggy/pull/2889#discussion_r3147043124


##########
core/connectors/sinks/delta_sink/src/coercions.rs:
##########
@@ -0,0 +1,620 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use deltalake::kernel::Schema as DeltaSchema;
+use deltalake::kernel::{DataType, PrimitiveType};
+
+use chrono::prelude::*;
+use serde_json::Value;
+use std::collections::HashMap;
+use std::str::FromStr;
+
+#[derive(Debug, Clone, PartialEq)]
+enum CoercionNode {
+    Coercion(Coercion),
+    Tree(CoercionTree),
+    ArrayTree(CoercionTree),
+    ArrayPrimitive(Coercion),
+}
+
+#[derive(Debug, Clone, PartialEq)]
+enum Coercion {
+    ToString,
+    ToTimestamp,
+}
+
+#[derive(Debug, Clone, PartialEq)]
+pub(crate) struct CoercionTree {
+    root: HashMap<String, CoercionNode>,
+}
+
+/// Returns a [`CoercionTree`] so the schema can be walked efficiently level 
by level when performing conversions.
+pub(crate) fn create_coercion_tree(schema: &DeltaSchema) -> CoercionTree {
+    let mut root = HashMap::new();
+
+    for field in schema.fields() {
+        if let Some(node) = build_coercion_node(field.data_type()) {
+            root.insert(field.name().to_string(), node);
+        }
+    }
+
+    CoercionTree { root }
+}
+
+fn build_coercion_node(data_type: &DataType) -> Option<CoercionNode> {
+    match data_type {
+        DataType::Primitive(primitive) => match primitive {
+            PrimitiveType::String => 
Some(CoercionNode::Coercion(Coercion::ToString)),
+            PrimitiveType::Timestamp | PrimitiveType::TimestampNtz => {
+                Some(CoercionNode::Coercion(Coercion::ToTimestamp))
+            }
+            _ => None,
+        },
+        DataType::Struct(st) => {
+            let nested_context = create_coercion_tree(st);
+            if !nested_context.root.is_empty() {
+                Some(CoercionNode::Tree(nested_context))
+            } else {
+                None
+            }
+        }
+        DataType::Array(array) => {
+            build_coercion_node(array.element_type()).and_then(|node| match 
node {
+                CoercionNode::Coercion(c) => 
Some(CoercionNode::ArrayPrimitive(c)),
+                CoercionNode::Tree(t) => Some(CoercionNode::ArrayTree(t)),
+                _ => None,
+            })
+        }
+        // TODO: Map and Variant column types are not coerced. Values inside 
these columns
+        // pass through to the Delta writer unchanged. Add support if these 
types are needed.
+        _ => None,
+    }
+}
+
+/// Applies all data coercions specified by the [`CoercionTree`] to the 
[`Value`].
+///
+/// Returns an error if a string value in a timestamp field cannot be parsed 
as a timestamp.
+/// The caller should reject the record (or batch) on error rather than 
forwarding an
+/// unparsable string to Arrow, which would produce an opaque 
`ArrowError::JsonError`.
+pub(crate) fn coerce(value: &mut Value, coercion_tree: &CoercionTree) -> 
Result<(), String> {
+    if let Some(context) = value.as_object_mut() {
+        for (field_name, coercion) in coercion_tree.root.iter() {
+            if let Some(value) = context.get_mut(field_name) {
+                apply_coercion(value, coercion)?;
+            }
+        }
+    }
+    Ok(())
+}
+
+fn apply_coercion(value: &mut Value, node: &CoercionNode) -> Result<(), 
String> {
+    match node {
+        CoercionNode::Coercion(Coercion::ToString) => {
+            if !value.is_null() && !value.is_string() {
+                *value = Value::String(value.to_string());
+            }
+        }
+        CoercionNode::Coercion(Coercion::ToTimestamp) => {
+            if let Some(as_str) = value.as_str() {
+                *value = string_to_timestamp(as_str)?;
+            } else if value.is_i64() || value.is_u64() {
+                // Already epoch microseconds — valid timestamp 
representation, pass through.
+            }
+        }
+        CoercionNode::Tree(tree) => {
+            for (name, node) in tree.root.iter() {
+                let fields = value.as_object_mut();
+                if let Some(fields) = fields
+                    && let Some(value) = fields.get_mut(name)
+                {
+                    apply_coercion(value, node)?;
+                }
+            }
+        }
+        CoercionNode::ArrayPrimitive(coercion) => {
+            let values = value.as_array_mut();
+            if let Some(values) = values {
+                let node = CoercionNode::Coercion(coercion.clone());
+                for value in values {
+                    apply_coercion(value, &node)?;
+                }
+            }
+        }
+        CoercionNode::ArrayTree(tree) => {
+            if let Some(values) = value.as_array_mut() {
+                for value in values {
+                    for (name, node) in tree.root.iter() {
+                        if let Some(fields) = value.as_object_mut()
+                            && let Some(field_value) = fields.get_mut(name)
+                        {
+                            apply_coercion(field_value, node)?;
+                        }
+                    }
+                }
+            }
+        }
+    }
+    Ok(())
+}
+
+fn string_to_timestamp(string: &str) -> Result<Value, String> {
+    // Try strict RFC 3339 / ISO 8601 with T separator first.
+    if let Ok(dt) = DateTime::<Utc>::from_str(string) {
+        return Ok(Value::Number(dt.timestamp_micros().into()));
+    }
+
+    // Arrow's parser also accepts ' ' as a date/time separator (e.g. 
"2021-11-11 22:11:58").
+    // Handle that here so the value is normalised to epoch microseconds 
rather than left as a
+    // raw string that Arrow would silently accept without going through this 
coercion layer.
+    let ndt = NaiveDateTime::parse_from_str(string, "%Y-%m-%d %H:%M:%S%.f")
+        .or_else(|_| NaiveDateTime::parse_from_str(string, "%Y-%m-%d 
%H:%M:%S"));
+    if let Ok(ndt) = ndt {
+        return Ok(Value::Number(ndt.and_utc().timestamp_micros().into()));
+    }
+
+    Err(format!("cannot parse \"{string}\" as a timestamp"))

Review Comment:
   `cannot parse "{string}" as a timestamp` has no field-name context. with 
nested structs/arrays the operator can't tell which column had the bad value 
when this bubbles up via `Error::InvalidRecordValue`. propagate the field path 
(e.g. `level2.level2_timestamp`) by threading a path stack through 
`apply_coercion`. useful diagnostic in prod when bad records show up.



##########
core/connectors/runtime/example_config/connectors/delta_sink.toml:
##########
@@ -0,0 +1,35 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+type = "sink"
+key = "delta"
+enabled = true
+version = 0
+name = "Delta Lake sink"
+path = "target/release/libiggy_connector_delta_sink"
+verbose = true
+
+[[streams]]
+stream = "qw"

Review Comment:
   `stream = "qw"` and `topics = ["records"]` (line 28) read like leftover copy 
(probably `quickwit`?) placeholders. rename to descriptive defaults (e.g. 
`stream = "iggy_stream"`, `topics = ["delta_topic"]`) so users copying the 
example get something readable.



##########
core/integration/tests/connectors/fixtures/delta/fixture.rs:
##########
@@ -0,0 +1,391 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use async_trait::async_trait;
+use deltalake::kernel::{DataType, PrimitiveType, StructField};
+use deltalake::operations::create::CreateBuilder;
+use integration::harness::{TestBinaryError, TestFixture};
+use std::collections::HashMap;
+use std::path::PathBuf;
+use tempfile::TempDir;
+use testcontainers_modules::testcontainers::core::{IntoContainerPort, WaitFor};
+use testcontainers_modules::testcontainers::runners::AsyncRunner;
+use testcontainers_modules::testcontainers::{ContainerAsync, GenericImage, 
ImageExt};
+use tracing::info;
+use uuid::Uuid;
+
+const ENV_SINK_TABLE_URI: &str = 
"IGGY_CONNECTORS_SINK_DELTA_PLUGIN_CONFIG_TABLE_URI";
+const ENV_SINK_PATH: &str = "IGGY_CONNECTORS_SINK_DELTA_PATH";
+const ENV_SINK_STORAGE_BACKEND_TYPE: &str =
+    "IGGY_CONNECTORS_SINK_DELTA_PLUGIN_CONFIG_STORAGE_BACKEND_TYPE";
+const ENV_SINK_AWS_S3_ACCESS_KEY: &str =
+    "IGGY_CONNECTORS_SINK_DELTA_PLUGIN_CONFIG_AWS_S3_ACCESS_KEY";
+const ENV_SINK_AWS_S3_SECRET_KEY: &str =
+    "IGGY_CONNECTORS_SINK_DELTA_PLUGIN_CONFIG_AWS_S3_SECRET_KEY";
+const ENV_SINK_AWS_S3_REGION: &str = 
"IGGY_CONNECTORS_SINK_DELTA_PLUGIN_CONFIG_AWS_S3_REGION";
+const ENV_SINK_AWS_S3_ENDPOINT_URL: &str =
+    "IGGY_CONNECTORS_SINK_DELTA_PLUGIN_CONFIG_AWS_S3_ENDPOINT_URL";
+const ENV_SINK_AWS_S3_ALLOW_HTTP: &str =
+    "IGGY_CONNECTORS_SINK_DELTA_PLUGIN_CONFIG_AWS_S3_ALLOW_HTTP";
+
+const MINIO_IMAGE: &str = "minio/minio";
+const MINIO_TAG: &str = "RELEASE.2025-09-07T16-13-09Z";
+const MINIO_PORT: u16 = 9000;
+const MINIO_CONSOLE_PORT: u16 = 9001;
+const MINIO_ACCESS_KEY: &str = "admin";
+const MINIO_SECRET_KEY: &str = "password";
+const MINIO_BUCKET: &str = "delta-warehouse";
+
+pub struct DeltaFixture {
+    _temp_dir: TempDir,
+    table_path: PathBuf,
+}
+
+async fn count_rows(
+    table_uri: url::Url,
+    storage_options: HashMap<String, String>,
+) -> Result<usize, TestBinaryError> {
+    use deltalake::arrow::array::Int64Array;
+
+    let table = deltalake::open_table_with_storage_options(table_uri, 
storage_options)
+        .await
+        .map_err(|e| TestBinaryError::InvalidState {
+            message: format!("Failed to open delta table: {e}"),
+        })?;
+
+    let batch = table
+        .snapshot()
+        .map_err(|e| TestBinaryError::InvalidState {
+            message: format!("Failed to get table snapshot: {e}"),
+        })?
+        .add_actions_table(false)
+        .map_err(|e| TestBinaryError::InvalidState {
+            message: format!("Failed to get add actions table: {e}"),
+        })?;
+
+    let total = batch
+        .column_by_name("num_records")
+        .and_then(|col| col.as_any().downcast_ref::<Int64Array>())
+        .map(|arr| arr.iter().flatten().sum::<i64>() as usize)
+        .unwrap_or(0);
+
+    Ok(total)
+}
+
+async fn wait_for_row_count(
+    table_uri: url::Url,
+    storage_options: HashMap<String, String>,
+    expected_rows: usize,
+    max_attempts: usize,
+    interval_ms: u64,
+) -> Result<usize, TestBinaryError> {
+    for _ in 0..max_attempts {
+        let count = count_rows(table_uri.clone(), storage_options.clone())
+            .await
+            .unwrap_or(0);
+        if count >= expected_rows {
+            info!("Found {count} rows in delta table (required: 
{expected_rows})");
+            return Ok(count);
+        }
+        
tokio::time::sleep(std::time::Duration::from_millis(interval_ms)).await;
+    }
+
+    let final_count = count_rows(table_uri, 
storage_options).await.unwrap_or(0);
+    Err(TestBinaryError::InvalidState {
+        message: format!(
+            "Expected at least {expected_rows} rows, found {final_count} after 
{max_attempts} attempts"
+        ),
+    })
+}
+
+fn table_columns() -> Vec<StructField> {
+    vec![
+        StructField::new("id", DataType::Primitive(PrimitiveType::Long), true),
+        StructField::new("name", DataType::Primitive(PrimitiveType::String), 
true),
+        StructField::new("count", DataType::Primitive(PrimitiveType::Integer), 
true),
+        StructField::new("amount", DataType::Primitive(PrimitiveType::Double), 
true),
+        StructField::new("active", 
DataType::Primitive(PrimitiveType::Boolean), true),
+        StructField::new(
+            "timestamp",
+            DataType::Primitive(PrimitiveType::TimestampNtz),

Review Comment:
   column type was correctly changed to `TimestampNtz` per earlier review, but 
`TestMessage.timestamp` in `core/integration/tests/connectors/mod.rs:46,58` is 
still `i64` (epoch micros). the `i64` payload hits the `is_i64()/is_u64()` 
passthrough branch in `coercions.rs:115-117` - `string_to_timestamp` is never 
exercised end-to-end. add a test case (or new `TestMessageWithIsoTimestamp` 
variant) that sends `"2026-01-01T00:00:00Z"` into the same column so the 
coercion logic is actually integration-tested.



##########
core/connectors/sinks/delta_sink/src/lib.rs:
##########
@@ -0,0 +1,97 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use deltalake::DeltaTable;
+use deltalake::writer::JsonWriter;
+use iggy_connector_sdk::sink_connector;
+use secrecy::SecretString;
+use serde::Deserialize;
+use tokio::sync::Mutex;
+
+#[derive(Debug, Deserialize)]
+#[serde(rename_all = "lowercase")]
+pub enum StorageBackendType {
+    S3,
+    Azure,
+    Gcs,
+}
+
+mod coercions;
+mod sink;
+mod storage;
+
+use crate::coercions::CoercionTree;
+
+sink_connector!(DeltaSink);
+
+#[derive(Debug)]
+pub struct DeltaSink {
+    id: u32,
+    config: DeltaSinkConfig,
+    state: Mutex<Option<SinkState>>,
+}
+
+#[derive(Debug)]
+struct SinkState {
+    table: DeltaTable,
+    writer: JsonWriter,
+    coercion_tree: CoercionTree,
+}
+
+#[derive(Debug, Deserialize)]
+pub struct DeltaSinkConfig {
+    pub table_uri: String,
+    #[serde(default)]
+    pub storage_backend_type: Option<StorageBackendType>,
+
+    // AWS S3
+    #[serde(default)]
+    pub aws_s3_access_key: Option<String>,

Review Comment:
   `aws_s3_access_key: Option<String>` should be `Option<SecretString>`. AWS 
access keys are credential material - derived `Debug` on `DeltaSinkConfig` 
leaks them to logs. the other secret fields (`aws_s3_secret_key`, 
`azure_storage_account_key`, `azure_storage_sas_token`, 
`gcs_service_account_key`) already use `SecretString`, apply the same treatment 
here for consistency.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to