hubcio commented on code in PR #2886: URL: https://github.com/apache/iggy/pull/2886#discussion_r3044822287
########## core/connectors/sinks/clickhouse_sink/src/body.rs: ########## @@ -0,0 +1,286 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +//! HTTP request body builders for each INSERT format. +//! +//! Each function accepts a slice of [`ConsumedMessage`]s and returns the raw +//! bytes that will be sent to ClickHouse. Payloads of the wrong type for the +//! chosen format are skipped with a warning rather than erroring, so a mixed +//! batch never causes a complete failure. + +use crate::StringFormat; +use crate::schema::Column; +use iggy_connector_sdk::{ConsumedMessage, Error, Payload}; +use tracing::warn; + +// ─── Body builders ─────────────────────────────────────────────────────────── + +/// Build a newline-delimited JSON body for `FORMAT JSONEachRow`. +/// Each `Payload::Json` message becomes one line. Other payload types are skipped. +pub(crate) fn build_json_body(messages: &[ConsumedMessage]) -> Vec<u8> { + let mut buf = Vec::with_capacity(messages.len() * 64); + for msg in messages { + match &msg.payload { + Payload::Json(value) => { + if simd_json::to_writer(&mut buf, value).is_ok() { + buf.push(b'\n'); + } else { + warn!("Failed to serialise JSON payload at offset {}", msg.offset); + } + } + other => { + warn!( + "JSONEachRow mode: skipping unsupported payload type {:?} at offset {}", + payload_type_name(other), + msg.offset + ); + } + } + } + buf +} + +/// Build a RowBinaryWithDefaults body. +/// Each `Payload::Json` message is serialised to binary using the table schema. +pub(crate) fn build_row_binary_body( + messages: &[ConsumedMessage], + schema: &[Column], +) -> Result<Vec<u8>, Error> { + let mut buf = Vec::with_capacity(messages.len() * 128); + for msg in messages { + match &msg.payload { + Payload::Json(value) => { + crate::binary::serialize_row(value, schema, &mut buf)?; + } + other => { + warn!( + "RowBinary mode: skipping unsupported payload type {:?} at offset {}", + payload_type_name(other), + msg.offset + ); + } + } + } + Ok(buf) +} + +/// Build a raw string body for CSV / TSV / JSONEachRow string passthrough. +/// Each `Payload::Text` message is written as-is with a trailing newline +/// appended for CSV/TSV if not already present. +pub(crate) fn build_string_body( + messages: &[ConsumedMessage], + string_format: StringFormat, +) -> Vec<u8> { + let mut buf = Vec::with_capacity(messages.len() * 64); + for msg in messages { + match &msg.payload { + Payload::Text(s) => { + buf.extend_from_slice(s.as_bytes()); + if string_format.requires_newline() && !s.ends_with('\n') { Review Comment: i don't think we understood each other. the passthrough mode already takes formatting responsibility for CSV and TSV - build_string_body appends \n when requires_newline() returns true (body.rs:94). not doing the same for JSONEachRow is inconsistent. the comment at lib.rs:109 says "JSON rows are already delimited" but that's incorrect - individual JSON objects like {"a":1} have no trailing delimiter. when you batch two messages you get {"a":1}{"b":2} which is not valid JSONEachRow. clickhouse expects one JSON object per line. the fix is trivial - requires_newline() should return true for JsonEachRow as well, or JSONEachRow should always append \n in build_string_body.thi matches what build_json_body already does (line 41: buf.push(b'\n')) please write a test to validate my claims, because current test doesnt validate it: string_body_json_each_row_does_not_append_newline only uses a single message so it doesn't catch this. a two-message test would immediately show the problem. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
