ryerraguntla commented on code in PR #3140:
URL: https://github.com/apache/iggy/pull/3140#discussion_r3142150288


##########
core/connectors/sources/influxdb_source/src/v3.rs:
##########
@@ -0,0 +1,1083 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+//! InfluxDB V3 source — SQL queries, JSONL responses, Bearer auth.
+//!
+//! V3 uses strict `> cursor` semantics. DataFusion/Parquet does not guarantee
+//! stable ordering for rows that share the same timestamp, so the V2 skip-N
+//! approach is not safe here. If all rows in a batch share the same timestamp,
+//! the cursor cannot advance — the effective batch size is doubled each poll
+//! up to `stuck_batch_cap_factor × batch_size`. If the cap is reached, the
+//! circuit breaker is tripped.
+
+use crate::common::{
+    DEFAULT_V3_CURSOR_FIELD, PayloadFormat, Row, RowContext, V3SourceConfig, 
V3State,
+    apply_query_params, is_timestamp_after, parse_jsonl_rows, parse_scalar, 
validate_cursor,
+};
+use base64::{Engine as _, engine::general_purpose};
+use iggy_connector_sdk::{Error, ProducedMessage, Schema};
+use reqwest::Url;
+use reqwest_middleware::ClientWithMiddleware;
+use serde_json::json;
+use tracing::warn;
+use uuid::Uuid;
+
+pub(crate) const DEFAULT_STUCK_CAP_FACTOR: u32 = 10;
+/// Upper bound for `stuck_batch_cap_factor`. A value of 1000 with 
batch_size=1000
+/// would issue 1,000,000-row queries before tripping the circuit breaker.
+pub(crate) const MAX_STUCK_CAP_FACTOR: u32 = 100;
+
+/// InfluxDB V3 query endpoint expects this exact string for JSONL response 
format.
+const QUERY_FORMAT_JSONL: &str = "jsonl";
+
+fn build_query(base: &str, query: &str, db: &str) -> Result<(Url, 
serde_json::Value), Error> {
+    let url = Url::parse(&format!("{base}/api/v3/query_sql"))
+        .map_err(|e| Error::InvalidConfigValue(format!("Invalid InfluxDB URL: 
{e}")))?;
+    let body = json!({
+        "db":     db,
+        "q":      query,
+        "format": QUERY_FORMAT_JSONL
+    });
+    Ok((url, body))
+}
+
+// ── Query execution 
───────────────────────────────────────────────────────────
+
+pub(crate) async fn run_query(
+    client: &ClientWithMiddleware,
+    config: &V3SourceConfig,
+    auth: &str,
+    cursor: &str,
+    effective_batch: u32,
+) -> Result<String, Error> {
+    validate_cursor(cursor)?;
+    let q = apply_query_params(&config.query, cursor, 
&effective_batch.to_string());
+    let base = config.url.trim_end_matches('/');
+    let (url, body) = build_query(base, &q, &config.db)?;
+
+    let response = client
+        .post(url)
+        .header("Authorization", auth)
+        .header("Content-Type", "application/json")
+        .header("Accept", "application/json")
+        .json(&body)
+        .send()
+        .await
+        .map_err(|e| Error::Storage(format!("InfluxDB V3 query failed: 
{e}")))?;
+
+    let status = response.status();
+    if status.is_success() {
+        return response
+            .text()
+            .await
+            .map_err(|e| Error::Storage(format!("Failed to read V3 response: 
{e}")));
+    }
+
+    let body_text = response
+        .text()
+        .await
+        .unwrap_or_else(|_| "failed to read response body".to_string());
+
+    // 404 "database not found" means the namespace has not been written to 
yet;
+    // treat it as empty rather than a failure so the circuit breaker stays 
healthy.
+    if status.as_u16() == 404 && body_text.contains("database not found") {
+        return Ok(String::new());
+    }
+
+    if iggy_connector_sdk::retry::is_transient_status(status) {
+        Err(Error::Storage(format!(
+            "InfluxDB V3 query failed with status {status}: {body_text}"
+        )))
+    } else {
+        Err(Error::PermanentHttpError(format!(
+            "InfluxDB V3 query failed with status {status}: {body_text}"
+        )))
+    }
+}
+
+// ── Message building 
──────────────────────────────────────────────────────────
+
+fn build_payload(
+    row: &Row,
+    payload_column: Option<&str>,
+    payload_format: PayloadFormat,
+    include_metadata: bool,
+    cursor_field: &str,
+) -> Result<Vec<u8>, Error> {
+    if let Some(col) = payload_column {
+        let raw = row
+            .get(col)
+            .cloned()
+            .ok_or_else(|| Error::InvalidRecordValue(format!("Missing payload 
column '{col}'")))?;
+        return match payload_format {
+            PayloadFormat::Json => {
+                let v: serde_json::Value = 
serde_json::from_str(&raw).map_err(|e| {
+                    Error::InvalidRecordValue(format!(
+                        "Payload column '{col}' is not valid JSON: {e}"
+                    ))
+                })?;
+                serde_json::to_vec(&v)
+                    .map_err(|e| Error::Serialization(format!("JSON 
serialization failed: {e}")))
+            }
+            PayloadFormat::Text => Ok(raw.into_bytes()),
+            PayloadFormat::Raw => general_purpose::STANDARD
+                .decode(raw.as_bytes())
+                .map_err(|e| {
+                    Error::InvalidRecordValue(format!("Failed to decode 
payload as base64: {e}"))
+                }),
+        };
+    }
+
+    // V3 rows are flat SQL results. When include_metadata=false, exclude the
+    // cursor column (timestamp) — it is used for deduplication, not user data.
+    let json_row: serde_json::Map<_, _> = row
+        .iter()
+        .filter(|(k, _)| include_metadata || k.as_str() != cursor_field)
+        .map(|(k, v)| (k.clone(), parse_scalar(v)))
+        .collect();
+    serde_json::to_vec(&json_row)
+        .map_err(|e| Error::Serialization(format!("JSON serialization failed: 
{e}")))
+}
+
+/// Compute the next effective batch size when the batch is stuck.
+/// Doubles until it reaches `cap`. Returns `None` if already at cap.
+pub(crate) fn next_stuck_batch_size(current: u32, base: u32, cap_factor: u32) 
-> Option<u32> {
+    let cap = base.saturating_mul(cap_factor);
+    if current >= cap {
+        None
+    } else {
+        Some(current.saturating_mul(2).min(cap))
+    }
+}
+
+// ── Poll 
──────────────────────────────────────────────────────────────────────
+
+pub(crate) struct PollResult {
+    pub messages: Vec<ProducedMessage>,
+    pub new_state: V3State,
+    pub schema: Schema,
+    /// Set to true when the stuck-timestamp cap was reached and the circuit
+    /// breaker should be tripped by the caller.
+    pub trip_circuit_breaker: bool,
+}
+
+// ── Row processing (pure, testable without HTTP) 
──────────────────────────────
+
+/// Result of processing a batch of V3 rows into Iggy messages.
+#[derive(Debug)]
+pub(crate) struct RowProcessingResult {
+    pub messages: Vec<ProducedMessage>,
+    pub max_cursor: Option<String>,
+    /// `true` when every row's `cursor_field` value equals `current_cursor`.
+    /// Combined with `rows.len() >= effective_batch`, this signals a stuck 
batch:
+    /// all returned rows are at the current cursor, meaning the cursor cannot
+    /// advance with `> cursor` semantics.
+    pub all_at_cursor: bool,
+}
+
+/// Convert a slice of V3 query rows into Iggy messages.
+///
+/// Also detects whether all rows share the same cursor value as 
`ctx.current_cursor`
+/// (the `all_at_cursor` flag). The caller uses this together with batch 
fullness
+/// to decide whether to inflate the batch size for the next poll.
+///
+/// Unlike V2, V3 uses strict `> cursor` semantics, so there is no 
row-skipping.
+/// All rows in the slice are emitted as messages.
+pub(crate) fn process_rows(
+    rows: &[Row],
+    ctx: &RowContext<'_>,
+) -> Result<RowProcessingResult, Error> {
+    let mut messages = Vec::with_capacity(rows.len());
+    let mut max_cursor: Option<String> = None;
+    // Starts true for non-empty batches; flipped to false as soon as any row
+    // either has a different cursor value or has no cursor field at all.
+    let mut all_at_cursor = !rows.is_empty();
+    // Generate the base UUID once per poll; derive per-message IDs by 
addition.
+    // This is O(1) PRNG calls per batch instead of O(n), measurable at batch 
≥ 100.
+    let id_base = Uuid::new_v4().as_u128();
+
+    for row in rows.iter() {
+        if let Some(cv) = row.get(ctx.cursor_field) {
+            if cv != ctx.current_cursor {
+                all_at_cursor = false;
+            }
+            match &max_cursor {

Review Comment:
   fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to