ryerraguntla commented on code in PR #3140:
URL: https://github.com/apache/iggy/pull/3140#discussion_r3232900698


##########
core/connectors/sources/influxdb_source/src/v2.rs:
##########
@@ -0,0 +1,912 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+//! InfluxDB V2 source — Flux queries, annotated-CSV responses, Token auth.
+
+use crate::common::{
+    PayloadFormat, Row, RowContext, V2SourceConfig, V2State, 
apply_query_params,
+    is_timestamp_after, parse_csv_rows, parse_scalar, validate_cursor,
+};
+use base64::{Engine as _, engine::general_purpose};
+use chrono::{DateTime, Utc};
+use iggy_connector_sdk::{Error, ProducedMessage, Schema};
+use reqwest::Url;
+use reqwest_middleware::ClientWithMiddleware;
+use serde_json::json;
+use uuid::Uuid;
+
+fn build_query(
+    base: &str,
+    query: &str,
+    org: Option<&str>,
+) -> Result<(Url, serde_json::Value), Error> {
+    let mut url = Url::parse(&format!("{base}/api/v2/query"))
+        .map_err(|e| Error::InvalidConfigValue(format!("Invalid InfluxDB URL: 
{e}")))?;
+    if let Some(o) = org {
+        url.query_pairs_mut().append_pair("org", o);
+    }
+    let body = json!({
+        "query": query,
+        "dialect": {
+            "annotations": ["datatype", "group", "default"],
+            "delimiter": ",",
+            "header": true,
+            "commentPrefix": "#"
+        }
+    });
+    Ok((url, body))
+}
+
+/// Maximum multiple of `batch_size` by which `already_seen` may inflate the
+/// query limit. Prevents an unbounded request to InfluxDB when the cursor
+/// is stuck at the same timestamp for many consecutive polls (analogous to
+/// V3's `stuck_batch_cap_factor`).
+const MAX_SKIP_INFLATION_FACTOR: u64 = 10;
+
+/// Render the final Flux query by substituting `$cursor` and `$limit`.
+///
+/// The limit is inflated by `already_seen` (rows at the current cursor
+/// timestamp that were delivered in a previous batch) so that re-fetching
+/// with `>= cursor` returns enough rows to skip them and still fill a full
+/// batch. Inflation is capped at `MAX_SKIP_INFLATION_FACTOR × batch_size`
+/// to prevent excessively large queries when the cursor is stuck.
+fn render_query(config: &V2SourceConfig, cursor: &str, already_seen: u64) -> 
Result<String, Error> {
+    validate_cursor(cursor)?;
+    let batch = config.batch_size.unwrap_or(500) as u64;
+    // Cap inflation so a stuck cursor cannot issue arbitrarily large queries.
+    let capped_seen = 
already_seen.min(batch.saturating_mul(MAX_SKIP_INFLATION_FACTOR));
+    let limit = batch.saturating_add(capped_seen).to_string();
+    Ok(apply_query_params(&config.query, cursor, &limit, ""))
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use crate::common::{Row, RowContext};
+
+    fn row(pairs: &[(&str, &str)]) -> Row {
+        pairs
+            .iter()
+            .map(|(k, v)| (k.to_string(), 
serde_json::Value::String(v.to_string())))
+            .collect()
+    }
+
+    const BASE_CURSOR: &str = "1970-01-01T00:00:00Z";
+    const T1: &str = "2024-01-01T00:00:00Z";
+    const T2: &str = "2024-01-01T00:00:01Z";
+    const T3: &str = "2024-01-01T00:00:02Z";
+
+    fn ctx(current_cursor: &str, now_micros: u64) -> RowContext<'_> {
+        RowContext {
+            cursor_field: "_time",
+            current_cursor,
+            include_metadata: true,
+            payload_col: None,
+            payload_format: PayloadFormat::Json,
+            now_micros,
+        }
+    }
+
+    #[test]
+    fn process_rows_empty_returns_empty() {
+        let result = process_rows(&[], &ctx(BASE_CURSOR, 1000), 0).unwrap();
+        assert!(result.messages.is_empty());
+        assert!(result.max_cursor.is_none());
+        assert_eq!(result.skipped, 0);
+        assert_eq!(result.rows_at_max_cursor, 0);
+    }
+
+    #[test]
+    fn process_rows_single_row_produces_one_message() {
+        let rows = vec![row(&[("_time", T1), ("_value", "42")])];
+        let result = process_rows(&rows, &ctx(BASE_CURSOR, 1000), 0).unwrap();
+        assert_eq!(result.messages.len(), 1);
+        assert_eq!(result.max_cursor.as_deref(), Some(T1));
+        assert_eq!(result.rows_at_max_cursor, 1);
+        assert_eq!(result.skipped, 0);
+    }
+
+    #[test]
+    fn process_rows_skips_already_seen_at_cursor() {
+        // Three rows all at T1, cursor=T1, already_seen=1 → skip first, 
produce two.
+        let rows = vec![
+            row(&[("_time", T1), ("_value", "1")]),
+            row(&[("_time", T1), ("_value", "2")]),
+            row(&[("_time", T1), ("_value", "3")]),
+        ];
+        let result = process_rows(&rows, &ctx(T1, 1000), 1).unwrap();
+        assert_eq!(result.skipped, 1);
+        assert_eq!(result.messages.len(), 2);
+    }
+
+    #[test]
+    fn process_rows_does_not_skip_beyond_already_seen() {
+        // already_seen=1 but there are 3 rows at cursor; only the first 
should be skipped.
+        let rows = vec![
+            row(&[("_time", T1)]),
+            row(&[("_time", T1)]),
+            row(&[("_time", T1)]),
+        ];
+        let result = process_rows(&rows, &ctx(T1, 1000), 1).unwrap();
+        assert_eq!(result.skipped, 1);
+        assert_eq!(result.messages.len(), 2);
+    }
+
+    #[test]
+    fn process_rows_tracks_latest_max_cursor() {
+        let rows = vec![
+            row(&[("_time", T1)]),
+            row(&[("_time", T3)]),
+            row(&[("_time", T2)]),
+        ];
+        let result = process_rows(&rows, &ctx(BASE_CURSOR, 1000), 0).unwrap();
+        assert_eq!(result.max_cursor.as_deref(), Some(T3));
+        assert_eq!(result.rows_at_max_cursor, 1);
+    }
+
+    #[test]
+    fn process_rows_counts_rows_at_max_cursor() {
+        let rows = vec![
+            row(&[("_time", T1)]),
+            row(&[("_time", T2)]),
+            row(&[("_time", T2)]),
+        ];
+        let result = process_rows(&rows, &ctx(BASE_CURSOR, 1000), 0).unwrap();
+        assert_eq!(result.max_cursor.as_deref(), Some(T2));
+        assert_eq!(result.rows_at_max_cursor, 2);
+    }
+
+    #[test]
+    fn process_rows_message_ids_are_some_and_unique() {
+        let rows = vec![row(&[("_time", T1)]), row(&[("_time", T2)])];
+        let result = process_rows(&rows, &ctx(BASE_CURSOR, 1000), 0).unwrap();
+        assert!(result.messages[0].id.is_some());
+        assert!(result.messages[1].id.is_some());
+        assert_ne!(result.messages[0].id, result.messages[1].id);
+    }
+
+    #[test]
+    fn process_rows_message_timestamps_use_now_micros() {
+        let rows = vec![row(&[("_time", T1)])];
+        let result = process_rows(&rows, &ctx(BASE_CURSOR, 999_999), 
0).unwrap();
+        assert_eq!(result.messages[0].timestamp, Some(999_999));
+        assert_eq!(result.messages[0].origin_timestamp, Some(999_999));
+    }
+
+    #[test]
+    fn process_rows_row_without_cursor_field_still_produces_message() {
+        let rows = vec![row(&[("_value", "42")])]; // no _time field
+        let result = process_rows(&rows, &ctx(BASE_CURSOR, 1000), 0).unwrap();
+        assert_eq!(result.messages.len(), 1);
+        assert!(result.max_cursor.is_none());
+    }
+}
+
+// ── Query execution 
───────────────────────────────────────────────────────────
+
+pub(crate) async fn run_query(
+    client: &ClientWithMiddleware,
+    config: &V2SourceConfig,
+    auth: &str,
+    cursor: &str,
+    already_seen: u64,
+) -> Result<String, Error> {
+    let query = render_query(config, cursor, already_seen)?;
+    let base = config.url.trim_end_matches('/');
+    let (url, body) = build_query(base, &query, Some(&config.org))?;
+
+    let response = client
+        .post(url)
+        .header("Authorization", auth)
+        .header("Content-Type", "application/json")
+        .header("Accept", "text/csv")
+        .json(&body)
+        .send()
+        .await
+        .map_err(|e| Error::Storage(format!("InfluxDB V2 query failed: 
{e}")))?;
+
+    let status = response.status();
+    if status.is_success() {
+        return response

Review Comment:
   Done!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to