kriti-sc commented on code in PR #2886:
URL: https://github.com/apache/iggy/pull/2886#discussion_r3032763581


##########
core/connectors/sinks/clickhouse_sink/src/body.rs:
##########
@@ -0,0 +1,286 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+//! HTTP request body builders for each INSERT format.
+//!
+//! Each function accepts a slice of [`ConsumedMessage`]s and returns the raw
+//! bytes that will be sent to ClickHouse. Payloads of the wrong type for the
+//! chosen format are skipped with a warning rather than erroring, so a mixed
+//! batch never causes a complete failure.
+
+use crate::StringFormat;
+use crate::schema::Column;
+use iggy_connector_sdk::{ConsumedMessage, Error, Payload};
+use tracing::warn;
+
+// ─── Body builders 
───────────────────────────────────────────────────────────
+
+/// Build a newline-delimited JSON body for `FORMAT JSONEachRow`.
+/// Each `Payload::Json` message becomes one line. Other payload types are 
skipped.
+pub(crate) fn build_json_body(messages: &[ConsumedMessage]) -> Vec<u8> {
+    let mut buf = Vec::with_capacity(messages.len() * 64);
+    for msg in messages {
+        match &msg.payload {
+            Payload::Json(value) => {
+                if simd_json::to_writer(&mut buf, value).is_ok() {
+                    buf.push(b'\n');
+                } else {
+                    warn!("Failed to serialise JSON payload at offset {}", 
msg.offset);
+                }
+            }
+            other => {
+                warn!(
+                    "JSONEachRow mode: skipping unsupported payload type {:?} 
at offset {}",
+                    payload_type_name(other),
+                    msg.offset
+                );
+            }
+        }
+    }
+    buf
+}
+
+/// Build a RowBinaryWithDefaults body.
+/// Each `Payload::Json` message is serialised to binary using the table 
schema.
+pub(crate) fn build_row_binary_body(
+    messages: &[ConsumedMessage],
+    schema: &[Column],
+) -> Result<Vec<u8>, Error> {
+    let mut buf = Vec::with_capacity(messages.len() * 128);
+    for msg in messages {
+        match &msg.payload {
+            Payload::Json(value) => {
+                crate::binary::serialize_row(value, schema, &mut buf)?;
+            }
+            other => {
+                warn!(
+                    "RowBinary mode: skipping unsupported payload type {:?} at 
offset {}",
+                    payload_type_name(other),
+                    msg.offset
+                );
+            }
+        }
+    }
+    Ok(buf)
+}
+
+/// Build a raw string body for CSV / TSV / JSONEachRow string passthrough.
+/// Each `Payload::Text` message is written as-is with a trailing newline
+/// appended for CSV/TSV if not already present.
+pub(crate) fn build_string_body(
+    messages: &[ConsumedMessage],
+    string_format: StringFormat,
+) -> Vec<u8> {
+    let mut buf = Vec::with_capacity(messages.len() * 64);
+    for msg in messages {
+        match &msg.payload {
+            Payload::Text(s) => {
+                buf.extend_from_slice(s.as_bytes());
+                if string_format.requires_newline() && !s.ends_with('\n') {

Review Comment:
   `insert_format = "string"` is a _passthrough_ path ie the payload will be 
inserted into Clickhouse _as-is_. The responsibility of correctly formatting 
the payload rests with the user. 
   
   Please let me know if this addresses the concern. @hubcio 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to