hubcio commented on code in PR #3140: URL: https://github.com/apache/iggy/pull/3140#discussion_r3187492543
########## core/connectors/sources/influxdb_source/src/v3.rs: ########## @@ -0,0 +1,1298 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +//! InfluxDB V3 source — SQL queries, JSONL responses, Bearer auth. +//! +//! V3 uses strict `> cursor` semantics. DataFusion/Parquet does not guarantee +//! stable ordering for rows that share the same timestamp, so the V2 skip-N +//! approach is not safe here. If all rows in a batch share the same timestamp, +//! the cursor cannot advance — the effective batch size is doubled each poll +//! up to `stuck_batch_cap_factor × batch_size`. If the cap is reached, the +//! circuit breaker is tripped. + +use crate::common::{ + DEFAULT_V3_CURSOR_FIELD, PayloadFormat, Row, RowContext, V3SourceConfig, V3State, + apply_query_params, is_timestamp_after, parse_jsonl_rows, timestamps_equal, validate_cursor, +}; +use base64::{Engine as _, engine::general_purpose}; +use chrono::{DateTime, Utc}; +use iggy_connector_sdk::{Error, ProducedMessage, Schema}; +use reqwest::Url; +use reqwest_middleware::ClientWithMiddleware; +use serde_json::json; +use tracing::warn; +use uuid::Uuid; + +pub(crate) const DEFAULT_STUCK_CAP_FACTOR: u32 = 10; +/// Upper bound for `stuck_batch_cap_factor`. A value of 1000 with batch_size=1000 +/// would issue 1,000,000-row queries before tripping the circuit breaker. +pub(crate) const MAX_STUCK_CAP_FACTOR: u32 = 100; + +/// Hard cap on buffered JSONL response body size. +/// +/// `MAX_STUCK_CAP_FACTOR` can inflate the effective batch to 100 × `batch_size`, +/// making unbounded `response.text()` a real OOM vector under misconfiguration. +/// Streaming stops and returns an error once this many bytes have been read. +const MAX_RESPONSE_BODY_BYTES: usize = 256 * 1024 * 1024; // 256 MiB + +/// InfluxDB V3 query endpoint expects this exact string for JSONL response format. +const QUERY_FORMAT_JSONL: &str = "jsonl"; + +fn build_query(base: &str, query: &str, db: &str) -> Result<(Url, serde_json::Value), Error> { + let url = Url::parse(&format!("{base}/api/v3/query_sql")) + .map_err(|e| Error::InvalidConfigValue(format!("Invalid InfluxDB URL: {e}")))?; + let body = json!({ + "db": db, + "q": query, + "format": QUERY_FORMAT_JSONL + }); + Ok((url, body)) +} + +// ── Query execution ─────────────────────────────────────────────────────────── + +pub(crate) async fn run_query( + client: &ClientWithMiddleware, + config: &V3SourceConfig, + auth: &str, + cursor: &str, + effective_batch: u32, + offset: u64, +) -> Result<String, Error> { + validate_cursor(cursor)?; + let q = apply_query_params( + &config.query, + cursor, + &effective_batch.to_string(), + &offset.to_string(), /* &str */ + ); + let base = config.url.trim_end_matches('/'); + let (url, body) = build_query(base, &q, &config.db)?; + + let mut response = client + .post(url) + .header("Authorization", auth) + .header("Content-Type", "application/json") + .header("Accept", "application/json") + .json(&body) + .send() + .await + .map_err(|e| Error::Storage(format!("InfluxDB V3 query failed: {e}")))?; + + let status = response.status(); + if status.is_success() { + // Stream chunk-by-chunk with a hard byte cap to prevent OOM when + // MAX_STUCK_CAP_FACTOR inflates the effective batch to 100 × batch_size. + if response + .content_length() + .is_some_and(|n| n as usize > MAX_RESPONSE_BODY_BYTES) + { + return Err(Error::Storage(format!( + "InfluxDB V3 response body exceeds {MAX_RESPONSE_BODY_BYTES} byte cap; \ + reduce batch_size to avoid OOM" + ))); + } + let mut buf: Vec<u8> = Vec::new(); + while let Some(chunk) = response + .chunk() + .await + .map_err(|e| Error::Storage(format!("Failed to read V3 response: {e}")))? + { + buf.extend_from_slice(&chunk); + if buf.len() > MAX_RESPONSE_BODY_BYTES { + return Err(Error::Storage(format!( + "InfluxDB V3 response body exceeded {MAX_RESPONSE_BODY_BYTES} byte cap \ + while streaming; reduce batch_size to avoid OOM" + ))); + } + } + return String::from_utf8(buf) + .map_err(|e| Error::Storage(format!("V3 response body is not valid UTF-8: {e}"))); + } + + let body_text = response + .text() + .await + .unwrap_or_else(|_| "failed to read response body".to_string()); + + // 404 "database not found" means the namespace has not been written to yet; + // treat it as empty rather than a failure so the circuit breaker stays healthy. + // Any other 404 (e.g. "table not found") is a permanent error — don't swallow it. + if status.as_u16() == 404 { + if body_text.to_lowercase().contains("database not found") { + return Ok(String::new()); + } + return Err(Error::PermanentHttpError(format!( + "InfluxDB V3 query failed with status {status}: {body_text}" + ))); + } + + if iggy_connector_sdk::retry::is_transient_status(status) { + Err(Error::Storage(format!( + "InfluxDB V3 query failed with status {status}: {body_text}" + ))) + } else { + Err(Error::PermanentHttpError(format!( + "InfluxDB V3 query failed with status {status}: {body_text}" + ))) + } +} + +// ── Message building ────────────────────────────────────────────────────────── + +fn build_payload( + row: &Row, + payload_column: Option<&str>, + payload_format: PayloadFormat, + include_metadata: bool, + cursor_field: &str, +) -> Result<Vec<u8>, Error> { + if let Some(col) = payload_column { + let raw = row + .get(col) + .cloned() + .ok_or_else(|| Error::InvalidRecordValue(format!("Missing payload column '{col}'")))?; + return match payload_format { + // raw is already a serde_json::Value — serialize directly, no re-parse. + PayloadFormat::Json => serde_json::to_vec(&raw) + .map_err(|e| Error::Serialization(format!("JSON serialization failed: {e}"))), + PayloadFormat::Text => match raw { + serde_json::Value::String(s) => Ok(s.into_bytes()), + other => serde_json::to_vec(&other) + .map_err(|e| Error::Serialization(format!("JSON serialization failed: {e}"))), + }, + PayloadFormat::Raw => { + let s = raw.as_str().ok_or_else(|| { + Error::InvalidRecordValue(format!( + "Payload column '{col}' must be a string value for Raw format" + )) + })?; + general_purpose::STANDARD.decode(s.as_bytes()).map_err(|e| { + Error::InvalidRecordValue(format!("Failed to decode payload as base64: {e}")) + }) + } + }; + } + + // V3 rows carry typed serde_json::Values — clone directly, no parse_scalar needed. + // When include_metadata=false, exclude the cursor column (timestamp). + let json_row: serde_json::Map<_, _> = row + .iter() + .filter(|(k, _)| include_metadata || k.as_str() != cursor_field) + .map(|(k, v)| (k.clone(), v.clone())) + .collect(); + serde_json::to_vec(&json_row) + .map_err(|e| Error::Serialization(format!("JSON serialization failed: {e}"))) +} + +/// Compute the next effective batch size when the batch is stuck. +/// Doubles until it reaches `cap`. Returns `None` if already at cap. +pub(crate) fn next_stuck_batch_size(current: u32, base: u32, cap_factor: u32) -> Option<u32> { + let cap = base.saturating_mul(cap_factor); + if current >= cap { + None + } else { + Some(current.saturating_mul(2).min(cap)) + } +} + +// ── Poll ────────────────────────────────────────────────────────────────────── + +pub(crate) struct PollResult { + pub messages: Vec<ProducedMessage>, + pub new_state: V3State, + pub schema: Schema, + /// Set to true when the stuck-timestamp cap was reached and the circuit + /// breaker should be tripped by the caller. + pub trip_circuit_breaker: bool, +} + +// ── Row processing (pure, testable without HTTP) ────────────────────────────── + +/// Normalize a raw timestamp from InfluxDB V3 JSONL into a cursor-safe RFC 3339 string. +/// +/// InfluxDB 3 Core returns timestamps without a timezone suffix and with nanosecond +/// precision (e.g. `"2026-04-26T02:32:20.526360865"`). The only required fix is +/// appending `"Z"` when no timezone suffix is present (InfluxDB always stores UTC). +/// +/// Full nanosecond precision is intentionally preserved — truncating to milliseconds +/// would place the cursor BEFORE the actual row timestamps within the same millisecond, +/// causing `WHERE time > '$cursor'` to re-deliver already-seen rows on subsequent polls. +/// InfluxDB 3's DataFusion SQL engine handles RFC 3339 strings with any number of +/// fractional digits in WHERE clause timestamp comparisons. +fn normalize_v3_timestamp(ts: &str) -> String { + // Fast path: already a valid RFC 3339 timestamp with timezone suffix. + if chrono::DateTime::parse_from_rfc3339(ts).is_ok() { + return ts.to_string(); + } + // Slow path: no timezone suffix — append "Z" to make it RFC 3339 compliant. + format!("{ts}Z") +} + +/// Result of processing a batch of V3 rows into Iggy messages. +#[derive(Debug)] +pub(crate) struct RowProcessingResult { + pub messages: Vec<ProducedMessage>, + pub max_cursor: Option<String>, + /// `true` when every row's `cursor_field` value equals `current_cursor`. + /// Combined with `rows.len() >= effective_batch`, this signals a stuck batch: + /// all returned rows are at the current cursor, meaning the cursor cannot + /// advance with `> cursor` semantics. + pub all_at_cursor: bool, + /// Count of rows whose cursor == max_cursor (for tiebreaker offset). + pub rows_at_max_cursor: u64, +} + +/// Convert a slice of V3 query rows into Iggy messages. +/// +/// Also detects whether all rows share the same cursor value as `ctx.current_cursor` +/// (the `all_at_cursor` flag). The caller uses this together with batch fullness +/// to decide whether to inflate the batch size for the next poll. +/// +/// Unlike V2, V3 uses strict `> cursor` semantics, so there is no row-skipping. +/// All rows in the slice are emitted as messages. +pub(crate) fn process_rows( + rows: &[Row], + ctx: &RowContext<'_>, +) -> Result<RowProcessingResult, Error> { + let mut messages = Vec::with_capacity(rows.len()); + let mut max_cursor: Option<String> = None; + let mut max_cursor_parsed: Option<DateTime<Utc>> = None; // cache parsed form + // Starts true for non-empty batches; flipped to false as soon as any row + // either has a different cursor value or has no cursor field at all. + let mut all_at_cursor = !rows.is_empty(); + // Generate the base UUID once per poll; derive per-message IDs by addition. + // This is O(1) PRNG calls per batch instead of O(n), measurable at batch ≥ 100. + let id_base = Uuid::new_v4().as_u128(); + for row in rows.iter() { + if let Some(raw_cv) = row.get(ctx.cursor_field).and_then(|v| v.as_str()) { + let cv_owned = normalize_v3_timestamp(raw_cv); + let cv = cv_owned.as_str(); + if !timestamps_equal(cv, ctx.current_cursor) { + all_at_cursor = false; + } + validate_cursor(cv)?; + let cv_parsed = cv.parse::<DateTime<Utc>>().ok(); + match (cv_parsed, max_cursor_parsed) { + (Some(new_dt), Some(cur_dt)) if new_dt > cur_dt => { + max_cursor = Some(cv.to_string()); + max_cursor_parsed = Some(new_dt); + } + (Some(new_dt), None) => { + max_cursor = Some(cv.to_string()); + max_cursor_parsed = Some(new_dt); + } + (None, _) if max_cursor_parsed.is_none() => { + // Unparsable cursor — still track it (string fallback) if no + // parsable cursor has been seen yet. + max_cursor = Some(cv.to_string()); + } + _ => {} + } + } else { + all_at_cursor = false; + } + + let payload = build_payload( + row, + ctx.payload_col, + ctx.payload_format, + ctx.include_metadata, + ctx.cursor_field, + )?; + messages.push(ProducedMessage { + id: Some(id_base.wrapping_add(messages.len() as u128)), + checksum: None, + timestamp: Some(ctx.now_micros), + origin_timestamp: Some(ctx.now_micros), + headers: None, + payload, + }); + } + + let rows_at_max_cursor = rows + .iter() + .filter(|r| { + max_cursor.as_deref().is_some_and(|mc| { + r.get(ctx.cursor_field) + .and_then(|v| v.as_str()) + .is_some_and(|cv| normalize_v3_timestamp(cv) == mc) + }) + }) + .count() as u64; + + if !rows.is_empty() && max_cursor.is_none() { + return Err(Error::InvalidRecordValue(format!( + "No '{}' field found in any returned row — cursor cannot advance; \ + the connector would re-deliver the same rows on every poll. \ + Ensure your query selects the cursor column.", + ctx.cursor_field + ))); + } + + Ok(RowProcessingResult { + messages, + max_cursor, + all_at_cursor, + rows_at_max_cursor, + }) +} + +pub(crate) async fn poll( + client: &ClientWithMiddleware, + config: &V3SourceConfig, + auth: &str, + state: &V3State, + payload_format: PayloadFormat, + include_metadata: bool, +) -> Result<PollResult, Error> { + // Access config.initial_offset directly (not via the enum accessor) because + // poll() receives &V3SourceConfig — the inner struct — already matched by the + // caller in lib.rs. The enum accessor InfluxDbSourceConfig::initial_offset() + // is not available here. + let cursor = state + .last_timestamp + .clone() + .or_else(|| config.initial_offset.clone()) + .unwrap_or_else(|| "1970-01-01T00:00:00Z".to_string()); + + let base_batch = config.batch_size.unwrap_or(500); + let effective_batch = if state.effective_batch_size == 0 { + base_batch + } else { + state.effective_batch_size + }; + + let response_data = run_query( + client, + config, + auth, + &cursor, + effective_batch, + state.last_timestamp_row_offset, + ) + .await?; + let rows = parse_jsonl_rows(&response_data)?; + + let cap_factor = config + .stuck_batch_cap_factor + .unwrap_or(DEFAULT_STUCK_CAP_FACTOR); + let ctx = RowContext { + cursor_field: config + .cursor_field + .as_deref() + .unwrap_or(DEFAULT_V3_CURSOR_FIELD), + current_cursor: &cursor, + include_metadata, + payload_col: config.payload_column.as_deref(), + payload_format, + now_micros: iggy_common::Utc::now().timestamp_micros() as u64, + }; + + let result = process_rows(&rows, &ctx)?; + + // Stuck-timestamp detection: if every row is at the current cursor + // and the batch was full, inflate and request more next time. + let stuck = result.all_at_cursor && rows.len() >= effective_batch as usize; Review Comment: stuck-batch detection is unreachable with the default strict `> '$cursor'` query semantics. `all_at_cursor` at `v3.rs:286-288` flips false unless every returned row's cursor value equals the input cursor - but strict `>` filters rows AT the cursor, so the input cursor never appears in results. default fixture (`integration/tests/connectors/fixtures/influxdb/source_v3.rs:107`) and unit-test config (`v3.rs:793`) both use `WHERE time > '$cursor'`. concrete data-loss scenario: 600 rows at one timestamp T_new, cursor=T_old, batch=100. first poll returns 100 rows at T_new, `all_at_cursor=false` (input cursor T_old != T_new), cursor advances to T_new, next poll's `> T_new` filters all 500 remaining T_new siblings - silent drop. fix: gate stuck on `rows_at_max_cursor >= effective_batch as u64` (the actual signal) and validate that the user query contains `$offset` at `open()` when `stuck_batch_cap_factor > 0`. ########## core/connectors/sources/influxdb_source/src/v2.rs: ########## @@ -0,0 +1,912 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +//! InfluxDB V2 source — Flux queries, annotated-CSV responses, Token auth. + +use crate::common::{ + PayloadFormat, Row, RowContext, V2SourceConfig, V2State, apply_query_params, + is_timestamp_after, parse_csv_rows, parse_scalar, validate_cursor, +}; +use base64::{Engine as _, engine::general_purpose}; +use chrono::{DateTime, Utc}; +use iggy_connector_sdk::{Error, ProducedMessage, Schema}; +use reqwest::Url; +use reqwest_middleware::ClientWithMiddleware; +use serde_json::json; +use uuid::Uuid; + +fn build_query( + base: &str, + query: &str, + org: Option<&str>, +) -> Result<(Url, serde_json::Value), Error> { + let mut url = Url::parse(&format!("{base}/api/v2/query")) + .map_err(|e| Error::InvalidConfigValue(format!("Invalid InfluxDB URL: {e}")))?; + if let Some(o) = org { + url.query_pairs_mut().append_pair("org", o); + } + let body = json!({ + "query": query, + "dialect": { + "annotations": ["datatype", "group", "default"], + "delimiter": ",", + "header": true, + "commentPrefix": "#" + } + }); + Ok((url, body)) +} + +/// Maximum multiple of `batch_size` by which `already_seen` may inflate the +/// query limit. Prevents an unbounded request to InfluxDB when the cursor +/// is stuck at the same timestamp for many consecutive polls (analogous to +/// V3's `stuck_batch_cap_factor`). +const MAX_SKIP_INFLATION_FACTOR: u64 = 10; + +/// Render the final Flux query by substituting `$cursor` and `$limit`. +/// +/// The limit is inflated by `already_seen` (rows at the current cursor +/// timestamp that were delivered in a previous batch) so that re-fetching +/// with `>= cursor` returns enough rows to skip them and still fill a full +/// batch. Inflation is capped at `MAX_SKIP_INFLATION_FACTOR × batch_size` +/// to prevent excessively large queries when the cursor is stuck. +fn render_query(config: &V2SourceConfig, cursor: &str, already_seen: u64) -> Result<String, Error> { + validate_cursor(cursor)?; + let batch = config.batch_size.unwrap_or(500) as u64; + // Cap inflation so a stuck cursor cannot issue arbitrarily large queries. + let capped_seen = already_seen.min(batch.saturating_mul(MAX_SKIP_INFLATION_FACTOR)); + let limit = batch.saturating_add(capped_seen).to_string(); + Ok(apply_query_params(&config.query, cursor, &limit, "")) +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::common::{Row, RowContext}; + + fn row(pairs: &[(&str, &str)]) -> Row { + pairs + .iter() + .map(|(k, v)| (k.to_string(), serde_json::Value::String(v.to_string()))) + .collect() + } + + const BASE_CURSOR: &str = "1970-01-01T00:00:00Z"; + const T1: &str = "2024-01-01T00:00:00Z"; + const T2: &str = "2024-01-01T00:00:01Z"; + const T3: &str = "2024-01-01T00:00:02Z"; + + fn ctx(current_cursor: &str, now_micros: u64) -> RowContext<'_> { + RowContext { + cursor_field: "_time", + current_cursor, + include_metadata: true, + payload_col: None, + payload_format: PayloadFormat::Json, + now_micros, + } + } + + #[test] + fn process_rows_empty_returns_empty() { + let result = process_rows(&[], &ctx(BASE_CURSOR, 1000), 0).unwrap(); + assert!(result.messages.is_empty()); + assert!(result.max_cursor.is_none()); + assert_eq!(result.skipped, 0); + assert_eq!(result.rows_at_max_cursor, 0); + } + + #[test] + fn process_rows_single_row_produces_one_message() { + let rows = vec![row(&[("_time", T1), ("_value", "42")])]; + let result = process_rows(&rows, &ctx(BASE_CURSOR, 1000), 0).unwrap(); + assert_eq!(result.messages.len(), 1); + assert_eq!(result.max_cursor.as_deref(), Some(T1)); + assert_eq!(result.rows_at_max_cursor, 1); + assert_eq!(result.skipped, 0); + } + + #[test] + fn process_rows_skips_already_seen_at_cursor() { + // Three rows all at T1, cursor=T1, already_seen=1 → skip first, produce two. + let rows = vec![ + row(&[("_time", T1), ("_value", "1")]), + row(&[("_time", T1), ("_value", "2")]), + row(&[("_time", T1), ("_value", "3")]), + ]; + let result = process_rows(&rows, &ctx(T1, 1000), 1).unwrap(); + assert_eq!(result.skipped, 1); + assert_eq!(result.messages.len(), 2); + } + + #[test] + fn process_rows_does_not_skip_beyond_already_seen() { + // already_seen=1 but there are 3 rows at cursor; only the first should be skipped. + let rows = vec![ + row(&[("_time", T1)]), + row(&[("_time", T1)]), + row(&[("_time", T1)]), + ]; + let result = process_rows(&rows, &ctx(T1, 1000), 1).unwrap(); + assert_eq!(result.skipped, 1); + assert_eq!(result.messages.len(), 2); + } + + #[test] + fn process_rows_tracks_latest_max_cursor() { + let rows = vec![ + row(&[("_time", T1)]), + row(&[("_time", T3)]), + row(&[("_time", T2)]), + ]; + let result = process_rows(&rows, &ctx(BASE_CURSOR, 1000), 0).unwrap(); + assert_eq!(result.max_cursor.as_deref(), Some(T3)); + assert_eq!(result.rows_at_max_cursor, 1); + } + + #[test] + fn process_rows_counts_rows_at_max_cursor() { + let rows = vec![ + row(&[("_time", T1)]), + row(&[("_time", T2)]), + row(&[("_time", T2)]), + ]; + let result = process_rows(&rows, &ctx(BASE_CURSOR, 1000), 0).unwrap(); + assert_eq!(result.max_cursor.as_deref(), Some(T2)); + assert_eq!(result.rows_at_max_cursor, 2); + } + + #[test] + fn process_rows_message_ids_are_some_and_unique() { + let rows = vec![row(&[("_time", T1)]), row(&[("_time", T2)])]; + let result = process_rows(&rows, &ctx(BASE_CURSOR, 1000), 0).unwrap(); + assert!(result.messages[0].id.is_some()); + assert!(result.messages[1].id.is_some()); + assert_ne!(result.messages[0].id, result.messages[1].id); + } + + #[test] + fn process_rows_message_timestamps_use_now_micros() { + let rows = vec![row(&[("_time", T1)])]; + let result = process_rows(&rows, &ctx(BASE_CURSOR, 999_999), 0).unwrap(); + assert_eq!(result.messages[0].timestamp, Some(999_999)); + assert_eq!(result.messages[0].origin_timestamp, Some(999_999)); + } + + #[test] + fn process_rows_row_without_cursor_field_still_produces_message() { + let rows = vec![row(&[("_value", "42")])]; // no _time field + let result = process_rows(&rows, &ctx(BASE_CURSOR, 1000), 0).unwrap(); + assert_eq!(result.messages.len(), 1); + assert!(result.max_cursor.is_none()); + } +} + +// ── Query execution ─────────────────────────────────────────────────────────── + +pub(crate) async fn run_query( + client: &ClientWithMiddleware, + config: &V2SourceConfig, + auth: &str, + cursor: &str, + already_seen: u64, +) -> Result<String, Error> { + let query = render_query(config, cursor, already_seen)?; + let base = config.url.trim_end_matches('/'); + let (url, body) = build_query(base, &query, Some(&config.org))?; + + let response = client + .post(url) + .header("Authorization", auth) + .header("Content-Type", "application/json") + .header("Accept", "text/csv") + .json(&body) + .send() + .await + .map_err(|e| Error::Storage(format!("InfluxDB V2 query failed: {e}")))?; + + let status = response.status(); + if status.is_success() { + return response Review Comment: unbounded `response.text()` here vs the v3 path which streams chunk-by-chunk with a 256 MiB cap (`v3.rs:99-126`). `MAX_SKIP_INFLATION_FACTOR=10` at `v2.rs:59` flows through `render_query` (`v2.rs:71-73`) so LIMIT can reach `batch + min(already_seen, batch*10)`, up to ~11x batch_size. asymmetric DoS surface vs v3. mirror the v3 streaming cap: pre-check `Content-Length`, then `response.chunk()` loop that bails on `len > MAX_RESPONSE_BODY_BYTES`. ########## core/connectors/sources/influxdb_source/src/v3.rs: ########## @@ -0,0 +1,1298 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +//! InfluxDB V3 source — SQL queries, JSONL responses, Bearer auth. +//! +//! V3 uses strict `> cursor` semantics. DataFusion/Parquet does not guarantee +//! stable ordering for rows that share the same timestamp, so the V2 skip-N +//! approach is not safe here. If all rows in a batch share the same timestamp, +//! the cursor cannot advance — the effective batch size is doubled each poll +//! up to `stuck_batch_cap_factor × batch_size`. If the cap is reached, the +//! circuit breaker is tripped. + +use crate::common::{ + DEFAULT_V3_CURSOR_FIELD, PayloadFormat, Row, RowContext, V3SourceConfig, V3State, + apply_query_params, is_timestamp_after, parse_jsonl_rows, timestamps_equal, validate_cursor, +}; +use base64::{Engine as _, engine::general_purpose}; +use chrono::{DateTime, Utc}; +use iggy_connector_sdk::{Error, ProducedMessage, Schema}; +use reqwest::Url; +use reqwest_middleware::ClientWithMiddleware; +use serde_json::json; +use tracing::warn; +use uuid::Uuid; + +pub(crate) const DEFAULT_STUCK_CAP_FACTOR: u32 = 10; +/// Upper bound for `stuck_batch_cap_factor`. A value of 1000 with batch_size=1000 +/// would issue 1,000,000-row queries before tripping the circuit breaker. +pub(crate) const MAX_STUCK_CAP_FACTOR: u32 = 100; + +/// Hard cap on buffered JSONL response body size. +/// +/// `MAX_STUCK_CAP_FACTOR` can inflate the effective batch to 100 × `batch_size`, +/// making unbounded `response.text()` a real OOM vector under misconfiguration. +/// Streaming stops and returns an error once this many bytes have been read. +const MAX_RESPONSE_BODY_BYTES: usize = 256 * 1024 * 1024; // 256 MiB + +/// InfluxDB V3 query endpoint expects this exact string for JSONL response format. +const QUERY_FORMAT_JSONL: &str = "jsonl"; + +fn build_query(base: &str, query: &str, db: &str) -> Result<(Url, serde_json::Value), Error> { + let url = Url::parse(&format!("{base}/api/v3/query_sql")) + .map_err(|e| Error::InvalidConfigValue(format!("Invalid InfluxDB URL: {e}")))?; + let body = json!({ + "db": db, + "q": query, + "format": QUERY_FORMAT_JSONL + }); + Ok((url, body)) +} + +// ── Query execution ─────────────────────────────────────────────────────────── + +pub(crate) async fn run_query( + client: &ClientWithMiddleware, + config: &V3SourceConfig, + auth: &str, + cursor: &str, + effective_batch: u32, + offset: u64, +) -> Result<String, Error> { + validate_cursor(cursor)?; + let q = apply_query_params( + &config.query, + cursor, + &effective_batch.to_string(), + &offset.to_string(), /* &str */ + ); + let base = config.url.trim_end_matches('/'); + let (url, body) = build_query(base, &q, &config.db)?; + + let mut response = client + .post(url) + .header("Authorization", auth) + .header("Content-Type", "application/json") + .header("Accept", "application/json") + .json(&body) + .send() + .await + .map_err(|e| Error::Storage(format!("InfluxDB V3 query failed: {e}")))?; + + let status = response.status(); + if status.is_success() { + // Stream chunk-by-chunk with a hard byte cap to prevent OOM when + // MAX_STUCK_CAP_FACTOR inflates the effective batch to 100 × batch_size. + if response + .content_length() + .is_some_and(|n| n as usize > MAX_RESPONSE_BODY_BYTES) + { + return Err(Error::Storage(format!( + "InfluxDB V3 response body exceeds {MAX_RESPONSE_BODY_BYTES} byte cap; \ + reduce batch_size to avoid OOM" + ))); + } + let mut buf: Vec<u8> = Vec::new(); + while let Some(chunk) = response + .chunk() + .await + .map_err(|e| Error::Storage(format!("Failed to read V3 response: {e}")))? + { + buf.extend_from_slice(&chunk); + if buf.len() > MAX_RESPONSE_BODY_BYTES { + return Err(Error::Storage(format!( + "InfluxDB V3 response body exceeded {MAX_RESPONSE_BODY_BYTES} byte cap \ + while streaming; reduce batch_size to avoid OOM" + ))); + } + } + return String::from_utf8(buf) + .map_err(|e| Error::Storage(format!("V3 response body is not valid UTF-8: {e}"))); + } + + let body_text = response + .text() + .await + .unwrap_or_else(|_| "failed to read response body".to_string()); + + // 404 "database not found" means the namespace has not been written to yet; + // treat it as empty rather than a failure so the circuit breaker stays healthy. + // Any other 404 (e.g. "table not found") is a permanent error — don't swallow it. + if status.as_u16() == 404 { + if body_text.to_lowercase().contains("database not found") { + return Ok(String::new()); + } + return Err(Error::PermanentHttpError(format!( + "InfluxDB V3 query failed with status {status}: {body_text}" + ))); + } + + if iggy_connector_sdk::retry::is_transient_status(status) { + Err(Error::Storage(format!( + "InfluxDB V3 query failed with status {status}: {body_text}" + ))) + } else { + Err(Error::PermanentHttpError(format!( + "InfluxDB V3 query failed with status {status}: {body_text}" + ))) + } +} + +// ── Message building ────────────────────────────────────────────────────────── + +fn build_payload( + row: &Row, + payload_column: Option<&str>, + payload_format: PayloadFormat, + include_metadata: bool, + cursor_field: &str, +) -> Result<Vec<u8>, Error> { + if let Some(col) = payload_column { + let raw = row + .get(col) + .cloned() + .ok_or_else(|| Error::InvalidRecordValue(format!("Missing payload column '{col}'")))?; + return match payload_format { + // raw is already a serde_json::Value — serialize directly, no re-parse. + PayloadFormat::Json => serde_json::to_vec(&raw) + .map_err(|e| Error::Serialization(format!("JSON serialization failed: {e}"))), + PayloadFormat::Text => match raw { + serde_json::Value::String(s) => Ok(s.into_bytes()), + other => serde_json::to_vec(&other) + .map_err(|e| Error::Serialization(format!("JSON serialization failed: {e}"))), + }, + PayloadFormat::Raw => { + let s = raw.as_str().ok_or_else(|| { + Error::InvalidRecordValue(format!( + "Payload column '{col}' must be a string value for Raw format" + )) + })?; + general_purpose::STANDARD.decode(s.as_bytes()).map_err(|e| { + Error::InvalidRecordValue(format!("Failed to decode payload as base64: {e}")) + }) + } + }; + } + + // V3 rows carry typed serde_json::Values — clone directly, no parse_scalar needed. + // When include_metadata=false, exclude the cursor column (timestamp). + let json_row: serde_json::Map<_, _> = row + .iter() + .filter(|(k, _)| include_metadata || k.as_str() != cursor_field) + .map(|(k, v)| (k.clone(), v.clone())) + .collect(); + serde_json::to_vec(&json_row) + .map_err(|e| Error::Serialization(format!("JSON serialization failed: {e}"))) +} + +/// Compute the next effective batch size when the batch is stuck. +/// Doubles until it reaches `cap`. Returns `None` if already at cap. +pub(crate) fn next_stuck_batch_size(current: u32, base: u32, cap_factor: u32) -> Option<u32> { + let cap = base.saturating_mul(cap_factor); + if current >= cap { + None + } else { + Some(current.saturating_mul(2).min(cap)) + } +} + +// ── Poll ────────────────────────────────────────────────────────────────────── + +pub(crate) struct PollResult { + pub messages: Vec<ProducedMessage>, + pub new_state: V3State, + pub schema: Schema, + /// Set to true when the stuck-timestamp cap was reached and the circuit + /// breaker should be tripped by the caller. + pub trip_circuit_breaker: bool, +} + +// ── Row processing (pure, testable without HTTP) ────────────────────────────── + +/// Normalize a raw timestamp from InfluxDB V3 JSONL into a cursor-safe RFC 3339 string. +/// +/// InfluxDB 3 Core returns timestamps without a timezone suffix and with nanosecond +/// precision (e.g. `"2026-04-26T02:32:20.526360865"`). The only required fix is +/// appending `"Z"` when no timezone suffix is present (InfluxDB always stores UTC). +/// +/// Full nanosecond precision is intentionally preserved — truncating to milliseconds +/// would place the cursor BEFORE the actual row timestamps within the same millisecond, +/// causing `WHERE time > '$cursor'` to re-deliver already-seen rows on subsequent polls. +/// InfluxDB 3's DataFusion SQL engine handles RFC 3339 strings with any number of +/// fractional digits in WHERE clause timestamp comparisons. +fn normalize_v3_timestamp(ts: &str) -> String { + // Fast path: already a valid RFC 3339 timestamp with timezone suffix. + if chrono::DateTime::parse_from_rfc3339(ts).is_ok() { + return ts.to_string(); + } + // Slow path: no timezone suffix — append "Z" to make it RFC 3339 compliant. + format!("{ts}Z") +} + +/// Result of processing a batch of V3 rows into Iggy messages. +#[derive(Debug)] +pub(crate) struct RowProcessingResult { + pub messages: Vec<ProducedMessage>, + pub max_cursor: Option<String>, + /// `true` when every row's `cursor_field` value equals `current_cursor`. + /// Combined with `rows.len() >= effective_batch`, this signals a stuck batch: + /// all returned rows are at the current cursor, meaning the cursor cannot + /// advance with `> cursor` semantics. + pub all_at_cursor: bool, + /// Count of rows whose cursor == max_cursor (for tiebreaker offset). + pub rows_at_max_cursor: u64, +} + +/// Convert a slice of V3 query rows into Iggy messages. +/// +/// Also detects whether all rows share the same cursor value as `ctx.current_cursor` +/// (the `all_at_cursor` flag). The caller uses this together with batch fullness +/// to decide whether to inflate the batch size for the next poll. +/// +/// Unlike V2, V3 uses strict `> cursor` semantics, so there is no row-skipping. +/// All rows in the slice are emitted as messages. +pub(crate) fn process_rows( + rows: &[Row], + ctx: &RowContext<'_>, +) -> Result<RowProcessingResult, Error> { + let mut messages = Vec::with_capacity(rows.len()); + let mut max_cursor: Option<String> = None; + let mut max_cursor_parsed: Option<DateTime<Utc>> = None; // cache parsed form + // Starts true for non-empty batches; flipped to false as soon as any row + // either has a different cursor value or has no cursor field at all. + let mut all_at_cursor = !rows.is_empty(); + // Generate the base UUID once per poll; derive per-message IDs by addition. + // This is O(1) PRNG calls per batch instead of O(n), measurable at batch ≥ 100. + let id_base = Uuid::new_v4().as_u128(); + for row in rows.iter() { + if let Some(raw_cv) = row.get(ctx.cursor_field).and_then(|v| v.as_str()) { + let cv_owned = normalize_v3_timestamp(raw_cv); + let cv = cv_owned.as_str(); + if !timestamps_equal(cv, ctx.current_cursor) { + all_at_cursor = false; + } + validate_cursor(cv)?; + let cv_parsed = cv.parse::<DateTime<Utc>>().ok(); + match (cv_parsed, max_cursor_parsed) { + (Some(new_dt), Some(cur_dt)) if new_dt > cur_dt => { + max_cursor = Some(cv.to_string()); + max_cursor_parsed = Some(new_dt); + } + (Some(new_dt), None) => { + max_cursor = Some(cv.to_string()); + max_cursor_parsed = Some(new_dt); + } + (None, _) if max_cursor_parsed.is_none() => { + // Unparsable cursor — still track it (string fallback) if no + // parsable cursor has been seen yet. + max_cursor = Some(cv.to_string()); + } + _ => {} + } + } else { + all_at_cursor = false; + } + + let payload = build_payload( + row, + ctx.payload_col, + ctx.payload_format, + ctx.include_metadata, + ctx.cursor_field, + )?; + messages.push(ProducedMessage { + id: Some(id_base.wrapping_add(messages.len() as u128)), + checksum: None, + timestamp: Some(ctx.now_micros), + origin_timestamp: Some(ctx.now_micros), + headers: None, + payload, + }); + } + + let rows_at_max_cursor = rows + .iter() + .filter(|r| { + max_cursor.as_deref().is_some_and(|mc| { + r.get(ctx.cursor_field) + .and_then(|v| v.as_str()) + .is_some_and(|cv| normalize_v3_timestamp(cv) == mc) + }) + }) + .count() as u64; + + if !rows.is_empty() && max_cursor.is_none() { + return Err(Error::InvalidRecordValue(format!( + "No '{}' field found in any returned row — cursor cannot advance; \ + the connector would re-deliver the same rows on every poll. \ + Ensure your query selects the cursor column.", + ctx.cursor_field + ))); + } + + Ok(RowProcessingResult { + messages, + max_cursor, + all_at_cursor, + rows_at_max_cursor, + }) +} + +pub(crate) async fn poll( + client: &ClientWithMiddleware, + config: &V3SourceConfig, + auth: &str, + state: &V3State, + payload_format: PayloadFormat, + include_metadata: bool, +) -> Result<PollResult, Error> { + // Access config.initial_offset directly (not via the enum accessor) because + // poll() receives &V3SourceConfig — the inner struct — already matched by the + // caller in lib.rs. The enum accessor InfluxDbSourceConfig::initial_offset() + // is not available here. + let cursor = state + .last_timestamp + .clone() + .or_else(|| config.initial_offset.clone()) + .unwrap_or_else(|| "1970-01-01T00:00:00Z".to_string()); + + let base_batch = config.batch_size.unwrap_or(500); + let effective_batch = if state.effective_batch_size == 0 { + base_batch + } else { + state.effective_batch_size + }; + + let response_data = run_query( + client, + config, + auth, + &cursor, + effective_batch, + state.last_timestamp_row_offset, + ) + .await?; + let rows = parse_jsonl_rows(&response_data)?; + + let cap_factor = config + .stuck_batch_cap_factor + .unwrap_or(DEFAULT_STUCK_CAP_FACTOR); + let ctx = RowContext { + cursor_field: config + .cursor_field + .as_deref() + .unwrap_or(DEFAULT_V3_CURSOR_FIELD), + current_cursor: &cursor, + include_metadata, + payload_col: config.payload_column.as_deref(), + payload_format, + now_micros: iggy_common::Utc::now().timestamp_micros() as u64, + }; + + let result = process_rows(&rows, &ctx)?; + + // Stuck-timestamp detection: if every row is at the current cursor + // and the batch was full, inflate and request more next time. + let stuck = result.all_at_cursor && rows.len() >= effective_batch as usize; + + if stuck { + return match next_stuck_batch_size(effective_batch, base_batch, cap_factor) { + Some(next_batch) => { + warn!( + "InfluxDB V3 source — all {} rows share timestamp {cursor:?}; \ + inflating batch size {} → {} (cap={}×{}={})", + rows.len(), + effective_batch, + next_batch, + cap_factor, + base_batch, + base_batch.saturating_mul(cap_factor) + ); + Ok(PollResult { + messages: vec![], + new_state: V3State { + last_timestamp: state.last_timestamp.clone(), + processed_rows: state.processed_rows, + effective_batch_size: next_batch, + last_timestamp_row_offset: result.rows_at_max_cursor, + }, + schema: Schema::Json, + trip_circuit_breaker: false, + }) + } + None => { Review Comment: ref to lines 438-454. on stuck-cap circuit-breaker trip, `new_state` keeps `effective_batch_size: effective_batch` (which is at cap) and `last_timestamp` unchanged, then this state is persisted unconditionally at `lib.rs:436-445`. the only path that resets `effective_batch_size` to `base_batch` is the successful-advance path at `v3.rs:481` - which never fires when data is genuinely stuck. after circuit cool-down, the next poll re-enters at cap and immediately re-trips. permanent stall, persisted across restart. fix: do not mutate `effective_batch_size` or `last_timestamp_row_offset` on the trip path, or reset `effective_batch_size = base_batch` when tripping the breaker. also note `last_timestamp_row_offset: result.rows_at_max_cursor` is written here on a path that emitted no messages, which is suspect on its own. ########## core/connectors/sources/influxdb_source/src/lib.rs: ########## @@ -752,118 +315,250 @@ impl Source for InfluxDbSource { }); } - match self.poll_messages().await { - Ok((messages, max_cursor, rows_at_max_cursor, skipped)) => { - // Successful poll — reset circuit breaker - self.circuit_breaker.record_success(); - - let mut state = self.state.lock().await; - state.last_poll_time = Utc::now(); - state.processed_rows += messages.len() as u64; - match max_cursor { - Some(ref new_cursor) - if state.last_timestamp.as_deref() != Some(new_cursor.as_str()) => - { - // Cursor advanced to a new timestamp — reset the row counter. - state.last_timestamp = max_cursor.clone(); - state.cursor_row_count = rows_at_max_cursor; - } - Some(_) => { - // Cursor stayed at the same timestamp — accumulate so the - // next poll skips all already-delivered rows at this timestamp. - state.cursor_row_count = - state.cursor_row_count.saturating_add(rows_at_max_cursor); - } - None => { - // No rows delivered. If we skipped some rows it means - // every row in the result was at the current cursor - // timestamp and had already been seen. `skipped` is - // therefore the true row count at that timestamp for - // this query result, so we correct cursor_row_count to - // that value. This prevents a permanently-inflated - // counter (e.g. after rows are deleted or compacted in - // InfluxDB) from causing the skip logic to over-skip on - // every subsequent poll and stall the connector. - if skipped > 0 { - state.cursor_row_count = skipped; + let client = self.get_client()?; + let auth = self + .auth_header + .as_ref() + .map(|s| s.expose_secret().as_str()) + .ok_or_else(|| { + Error::Connection("auth_header not initialised — was open() called?".to_string()) + })?; + match &self.version_state { + VersionState::V2(state_mu) => { + let InfluxDbSourceConfig::V2(cfg) = &self.config else { + return Err(Error::InvalidState); + }; + + let state_snap = state_mu.lock().await.clone(); + match v2::poll( + client, + cfg, + auth, + &state_snap, + self.payload_format, + self.config.include_metadata(), + ) + .await + { + Ok(result) => { + self.circuit_breaker.record_success(); + let mut state = state_mu.lock().await; + state.processed_rows += result.messages.len() as u64; + apply_v2_cursor_advance( + &mut state, + result.max_cursor, + result.rows_at_max_cursor, + result.skipped, + ); + + if self.config.verbose_logging() { + info!( + "{CONNECTOR_NAME} ID: {} produced {} messages (V2). \ + Total: {}. Cursor: {:?}", + self.id, + result.messages.len(), + state.processed_rows, + state.last_timestamp + ); + } else { + debug!( + "{CONNECTOR_NAME} ID: {} produced {} messages (V2). \ + Total: {}. Cursor: {:?}", + self.id, + result.messages.len(), + state.processed_rows, + state.last_timestamp + ); } - } - } - if self.verbose { - info!( - "InfluxDB source ID: {} produced {} messages. \ - Total processed: {}. Cursor: {:?}", - self.id, - messages.len(), - state.processed_rows, - state.last_timestamp - ); - } else { - debug!( - "InfluxDB source ID: {} produced {} messages. \ - Total processed: {}. Cursor: {:?}", - self.id, - messages.len(), - state.processed_rows, - state.last_timestamp - ); + let persisted = ConnectorState::serialize( + &PersistedState::V2(V2State { + last_timestamp: state.last_timestamp.clone(), + processed_rows: state.processed_rows, + cursor_row_count: state.cursor_row_count, + }), + CONNECTOR_NAME, + self.id, + ); + + Ok(ProducedMessages { + schema: result.schema, + messages: result.messages, + state: persisted, + }) + } + Err(e) => self.handle_poll_error(e).await, } + } - let schema = if self.config.payload_column.is_some() { - self.payload_format().schema() - } else { - Schema::Json + VersionState::V3(state_mu) => { + let InfluxDbSourceConfig::V3(cfg) = &self.config else { + return Err(Error::InvalidState); }; - let persisted_state = self.serialize_state(&state); + let state_snap = state_mu.lock().await.clone(); + match v3::poll( + client, + cfg, + auth, + &state_snap, + self.payload_format, + self.config.include_metadata(), + ) + .await + { + Ok(result) => { + if result.trip_circuit_breaker { + self.circuit_breaker.record_failure().await; + } else { + self.circuit_breaker.record_success(); + } - Ok(ProducedMessages { - schema, - messages, - state: persisted_state, - }) - } - Err(e) => { - // Only count transient/connectivity failures toward the - // circuit breaker. PermanentHttpError (400, 401, etc.) are - // config/data issues that retrying will not fix. - if !matches!(e, Error::PermanentHttpError(_)) { - self.circuit_breaker.record_failure().await; + let new = result.new_state; + let msg_count = result.messages.len(); + let mut state = state_mu.lock().await; + *state = new; + + if self.config.verbose_logging() { + info!( + "{CONNECTOR_NAME} ID: {} produced {} messages (V3). \ + Total: {}. Cursor: {:?}", + self.id, msg_count, state.processed_rows, state.last_timestamp + ); + } else { + debug!( + "{CONNECTOR_NAME} ID: {} produced {} messages (V3). \ + Total: {}. Cursor: {:?}", + self.id, msg_count, state.processed_rows, state.last_timestamp + ); + } + + let persisted = ConnectorState::serialize( + &PersistedState::V3(V3State { + last_timestamp: state.last_timestamp.clone(), + processed_rows: state.processed_rows, + effective_batch_size: state.effective_batch_size, + last_timestamp_row_offset: state.last_timestamp_row_offset, + }), + CONNECTOR_NAME, + self.id, + ); + + Ok(ProducedMessages { + schema: result.schema, + messages: result.messages, + state: persisted, + }) + } + Err(e) => self.handle_poll_error(e).await, } - error!( - "InfluxDB source ID: {} poll failed: {e}. \ - Consecutive failures tracked by circuit breaker.", - self.id - ); - tokio::time::sleep(self.poll_interval).await; - Err(e) } } } async fn close(&mut self) -> Result<(), Error> { self.client = None; - let state = self.state.lock().await; + let processed = match &self.version_state { + VersionState::V2(mu) => mu.lock().await.processed_rows, + VersionState::V3(mu) => mu.lock().await.processed_rows, + }; info!( - "InfluxDB source connector ID: {} closed. Total rows processed: {}", - self.id, state.processed_rows + "{CONNECTOR_NAME} ID: {} closed. Total rows processed: {processed}", + self.id ); Ok(()) } } -// --------------------------------------------------------------------------- -// Unit tests -// --------------------------------------------------------------------------- +impl InfluxDbSource { + async fn handle_poll_error(&self, e: Error) -> Result<ProducedMessages, Error> { + if !matches!(e, Error::PermanentHttpError(_)) { + self.circuit_breaker.record_failure().await; + } + error!("{CONNECTOR_NAME} ID: {} poll failed: {e}", self.id); + tokio::time::sleep(self.poll_interval).await; + Err(e) + } +} + +// ── Sort heuristic ──────────────────────────────────────────────────────────── + +/// Return `true` if `query` contains a `sort(` call that is not part of a longer +/// identifier (e.g. `mysort(` is excluded; `|> sort(` and bare `sort(` are included). +/// Best-effort check: warns if `sort(` does not appear outside +/// line comments. Not a full parser; documents its limitations. +fn query_has_sort_call(query: &str) -> bool { + query.lines().any(|line| { + // Strip line comments before checking for sort( + let code = match line.find("//") { + Some(pos) => &line[..pos], + None => line, + }; + // Find all occurrences of "sort(" and verify none are preceded by a + // word character (letter, digit, underscore) — that would mean the + // "sort" is part of a longer identifier like "mysort" or "do_sort". + let mut search = code; + while let Some(pos) = search.find("sort(") { + let preceded_by_word_char = search[..pos] + .chars() + .last() + .is_some_and(|c| c.is_alphanumeric() || c == '_'); + if !preceded_by_word_char { + return true; + } + search = &search[pos + 5..]; // skip past "sort(" + } + false + }) +} + +// ── V2 cursor advance logic ─────────────────────────────────────────────────── + +/// Update V2 polling state after a successful poll. +/// +/// V2 uses `>= $cursor` semantics, so the first batch after a cursor advance +/// will include rows already delivered at the previous max timestamp. The +/// `cursor_row_count` tracks how many such rows to skip on the next poll. +/// +/// - New cursor → store it with the count of rows that landed at that timestamp. +/// - Same cursor → accumulate: more rows at this timestamp were delivered. +/// - No new cursor (all skipped) → correct `cursor_row_count` to `skipped` +/// so the skip counter reflects reality rather than a stale inflated value. +fn apply_v2_cursor_advance( + state: &mut V2State, + max_cursor: Option<String>, + rows_at_max_cursor: u64, + skipped: u64, +) { + if let Some(ref new_cursor) = max_cursor { + let should_advance = match state.last_timestamp.as_deref() { Review Comment: ref to lines 533-546. `is_ok_and(...)` returns false on parse failure of `state.last_timestamp`, so `should_advance=false`, control falls into the else-branch which silently bumps `cursor_row_count` without emitting a warning or returning an error. `validate_cursor` only guards `initial_offset` at `open()` (`lib.rs:205-207`); a corrupted persisted cursor bypasses validation through `restore_v2_state` (`lib.rs:127-156`) since it deserializes without re-validating cursor strings. fix: on parse failure, return `Err(Error::InvalidState)` (with a message, see comment above) or at minimum `error!()` log so operators get a signal. ########## core/connectors/sinks/influxdb_sink/src/lib.rs: ########## @@ -479,20 +573,34 @@ impl InfluxDbSink { message.timestamp }; let ts = self.to_precision_timestamp(base_micros); + write!(buf, " {ts}").expect("infallible"); - { - use std::fmt::Write as _; - write!(buf, " {ts}").expect("write to String is infallible"); - } - - debug!( - "InfluxDB sink ID: {} point — offset={}, raw_ts={}, influx_ts={ts}", - self.id, message.offset, message.timestamp - ); - + debug!("sink ID: {} — offset={}, ts={ts}", self.id, message.offset); Ok(()) } + /// Build the newline-separated line-protocol body for a batch of messages. + /// Pure function — no I/O; extracted for testability. The empty-slice path is + /// unreachable in production (process_batch returns early when messages is empty) + /// but is exercised by unit tests for defensive completeness. + fn build_body( + &self, + topic_metadata: &TopicMetadata, + messages_metadata: &MessagesMetadata, + messages: &[ConsumedMessage], + ) -> Result<String, Error> { + // 1 KiB per message is a conservative estimate that accommodates JSON + // payloads without excessive reallocation. + let mut body = String::with_capacity(messages.len() * 1024); Review Comment: `String::with_capacity(messages.len() * 1024)` reserves ~512 KiB per call at default batch=500. typical telemetry line is 250-600 bytes, so the heuristic is 2-4x over for normal payloads, dead-on for fat JSON. single allocation per batch (no buffer reuse across batches) means each batch is a malloc + free. low-impact at default batch sizes since one alloc dwarfs HTTP+serialize, but the real win is to keep a `String` buffer on `&mut self` and `clear()` between batches rather than tuning the capacity heuristic. ########## core/connectors/sources/influxdb_source/src/common.rs: ########## @@ -0,0 +1,1060 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use iggy_common::serde_secret::serialize_secret; +use iggy_common::{DateTime, Utc}; +use iggy_connector_sdk::{Error, Schema}; +use secrecy::SecretString; +use serde::{Deserialize, Serialize}; +use std::sync::OnceLock; +use tracing::warn; + +pub(crate) use crate::row::{Row, parse_csv_rows, parse_jsonl_rows}; + +// ── Constants ───────────────────────────────────────────────────────────────── + +/// Default cursor column for V2 (Flux annotated-CSV timestamp annotation). +pub(crate) const DEFAULT_V2_CURSOR_FIELD: &str = "_time"; +/// Default cursor column for V3 (SQL timestamp column name). +pub(crate) const DEFAULT_V3_CURSOR_FIELD: &str = "time"; + +// ── Config ──────────────────────────────────────────────────────────────────── +// +// Uses `#[serde(tag = "version")]` instead of `#[serde(flatten)]` because +// serde's flatten interacts poorly with tagged enums — the tag field can be +// consumed before the variant content is parsed, causing deserialization to fail. + +#[derive(Debug, Clone, Serialize)] +#[serde(tag = "version")] +pub enum InfluxDbSourceConfig { + #[serde(rename = "v2")] + V2(V2SourceConfig), + #[serde(rename = "v3")] + V3(V3SourceConfig), +} + +/// Deserializes `InfluxDbSourceConfig` with backward-compatible version defaulting. +/// +/// Existing V2 configs that omit the `version` field are treated as `"v2"` so +/// deployments can upgrade without touching their config files. Explicitly +/// unknown version strings are rejected with a clear error. +impl<'de> serde::Deserialize<'de> for InfluxDbSourceConfig { + fn deserialize<D: serde::Deserializer<'de>>(d: D) -> Result<Self, D::Error> { + let raw = serde_json::Value::deserialize(d)?; + let version = match raw.get("version") { + None => "v2", // absent key → backward compat default + Some(v) => v.as_str().ok_or_else(|| { + serde::de::Error::custom(format!( + "\"version\" must be a string (e.g. \"v2\" or \"v3\"), got: {v}" + )) + })?, + }; + match version { + "v2" => serde_json::from_value::<V2SourceConfig>(raw) + .map(Self::V2) + .map_err(serde::de::Error::custom), + "v3" => serde_json::from_value::<V3SourceConfig>(raw) + .map(Self::V3) + .map_err(serde::de::Error::custom), + other => Err(serde::de::Error::custom(format!( + "unknown InfluxDB version {other:?}; expected \"v2\" or \"v3\"" + ))), + } + } +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct V2SourceConfig { + pub(crate) url: String, + pub(crate) org: String, + #[serde(serialize_with = "serialize_secret")] + pub(crate) token: SecretString, + pub(crate) query: String, + pub(crate) poll_interval: Option<String>, + pub(crate) batch_size: Option<u32>, + pub(crate) cursor_field: Option<String>, + pub(crate) initial_offset: Option<String>, + pub(crate) payload_column: Option<String>, + pub(crate) payload_format: Option<String>, + pub(crate) include_metadata: Option<bool>, + pub(crate) verbose_logging: Option<bool>, + pub(crate) max_retries: Option<u32>, + pub(crate) retry_delay: Option<String>, + pub(crate) timeout: Option<String>, + pub(crate) max_open_retries: Option<u32>, + pub(crate) open_retry_max_delay: Option<String>, + pub(crate) retry_max_delay: Option<String>, + pub(crate) circuit_breaker_threshold: Option<u32>, + pub(crate) circuit_breaker_cool_down: Option<String>, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct V3SourceConfig { + pub(crate) url: String, + pub(crate) db: String, + #[serde(serialize_with = "serialize_secret")] + pub(crate) token: SecretString, + pub(crate) query: String, + pub(crate) poll_interval: Option<String>, + pub(crate) batch_size: Option<u32>, + pub(crate) cursor_field: Option<String>, + pub(crate) initial_offset: Option<String>, + pub(crate) payload_column: Option<String>, + pub(crate) payload_format: Option<String>, + /// When `false`, the cursor column (`time` by default) is excluded from the + /// emitted JSON payload. Useful when consumers don't need the timestamp in + /// the message body since it's available as message metadata. + pub(crate) include_metadata: Option<bool>, + pub(crate) verbose_logging: Option<bool>, + pub(crate) max_retries: Option<u32>, + pub(crate) retry_delay: Option<String>, + pub(crate) timeout: Option<String>, + pub(crate) max_open_retries: Option<u32>, + pub(crate) open_retry_max_delay: Option<String>, + pub(crate) retry_max_delay: Option<String>, + pub(crate) circuit_breaker_threshold: Option<u32>, + pub(crate) circuit_breaker_cool_down: Option<String>, + /// Maximum factor by which batch_size may be inflated before the stuck-timestamp + /// circuit breaker trips. Defaults to 10 (i.e. up to 10× the configured batch_size). + /// Maximum accepted value is 100; higher values risk OOM-inducing queries. + pub(crate) stuck_batch_cap_factor: Option<u32>, +} + +// Eliminates the repetitive "match self { V2(c) => …, V3(c) => … }" pattern for +// fields that are identical across all config variants. Methods with version-specific +// logic (cursor_field, max_retries, version_label) remain explicit. +// +// Supported patterns: +// delegate!(ref self.url) → &String (borrow) +// delegate!(opt self.poll_interval) → Option<&str> +// delegate!(unwrap self.batch_size, 500) → T: Copy with value fallback +// +// Not supported (use explicit match arms instead): +// Fields with version-specific defaults (e.g. cursor_field: "_time" vs "time") +// Fields with chained transformations (e.g. max_retries + .max(1)) +// Fields that only exist on one variant (e.g. V3's stuck_batch_cap_factor) +macro_rules! delegate { + // &T field reference → fn foo(&self) -> &T + (ref $self:ident . $field:ident) => { + match $self { + Self::V2(c) => &c.$field, + Self::V3(c) => &c.$field, + } + }; + // Option<String> → Option<&str> + (opt $self:ident . $field:ident) => { + match $self { + Self::V2(c) => c.$field.as_deref(), + Self::V3(c) => c.$field.as_deref(), + } + }; + // Option<T: Copy> → T with fallback + (unwrap $self:ident . $field:ident, $default:expr) => { + match $self { + Self::V2(c) => c.$field.unwrap_or($default), + Self::V3(c) => c.$field.unwrap_or($default), + } + }; +} + +impl InfluxDbSourceConfig { + pub fn url(&self) -> &str { + delegate!(ref self.url) + } + pub fn token_secret(&self) -> &SecretString { + delegate!(ref self.token) + } + pub fn poll_interval(&self) -> Option<&str> { + delegate!(opt self.poll_interval) + } + pub fn batch_size(&self) -> u32 { + // Floor at 1 — callers build LIMIT $limit queries; LIMIT 0 stalls silently. + // open() also rejects 0 explicitly, but defense-in-depth here costs nothing. + delegate!(unwrap self.batch_size, 500).max(1) + } + pub fn initial_offset(&self) -> Option<&str> { + delegate!(opt self.initial_offset) + } + pub fn payload_column(&self) -> Option<&str> { + delegate!(opt self.payload_column) + } + pub fn payload_format(&self) -> Option<&str> { + delegate!(opt self.payload_format) + } + pub fn verbose_logging(&self) -> bool { + delegate!(unwrap self.verbose_logging, false) + } + pub fn retry_delay(&self) -> Option<&str> { + delegate!(opt self.retry_delay) + } + pub fn timeout(&self) -> Option<&str> { + delegate!(opt self.timeout) + } + pub fn max_open_retries(&self) -> u32 { + delegate!(unwrap self.max_open_retries, 10) + } + pub fn open_retry_max_delay(&self) -> Option<&str> { + delegate!(opt self.open_retry_max_delay) + } + pub fn retry_max_delay(&self) -> Option<&str> { + delegate!(opt self.retry_max_delay) + } + pub fn circuit_breaker_threshold(&self) -> u32 { + delegate!(unwrap self.circuit_breaker_threshold, 5) + } + pub fn circuit_breaker_cool_down(&self) -> Option<&str> { + delegate!(opt self.circuit_breaker_cool_down) + } + + // V2 and V3 use different default cursor column names. + pub fn cursor_field(&self) -> &str { + match self { + Self::V2(c) => c.cursor_field.as_deref().unwrap_or(DEFAULT_V2_CURSOR_FIELD), + Self::V3(c) => c.cursor_field.as_deref().unwrap_or(DEFAULT_V3_CURSOR_FIELD), + } + } + + pub fn include_metadata(&self) -> bool { + delegate!(unwrap self.include_metadata, true) + } + + // Both arms are identical; `delegate!` is not used because the `.max(1)` chain + // cannot be expressed in the macro without adding a new variant. + pub fn max_retries(&self) -> u32 { + match self { + Self::V2(c) => c.max_retries.unwrap_or(3).max(1), + Self::V3(c) => c.max_retries.unwrap_or(3).max(1), + } + } + + pub fn version_label(&self) -> &'static str { + match self { + Self::V2(_) => "v2", + Self::V3(_) => "v3", + } + } + + /// URL with any trailing slash stripped — used as the base for all endpoint URLs. + pub(crate) fn base_url(&self) -> &str { + self.url().trim_end_matches('/') + } +} + +// ── Row processing context ──────────────────────────────────────────────────── + +/// Per-poll fields that are constant across all rows in a batch. +/// Passed by reference to `process_rows` so the function signature stays at ≤ 3 parameters. +#[derive(Debug, Clone, Copy)] +pub(crate) struct RowContext<'a> { + pub cursor_field: &'a str, + pub current_cursor: &'a str, + pub include_metadata: bool, + pub payload_col: Option<&'a str>, + pub payload_format: PayloadFormat, + pub now_micros: u64, +} + +// ── Persisted state ─────────────────────────────────────────────────────────── + +#[derive(Debug, Serialize, Deserialize)] +#[serde(tag = "version")] +pub enum PersistedState { + #[serde(rename = "v2")] + V2(V2State), + #[serde(rename = "v3")] + V3(V3State), +} + +#[derive(Debug, Default, Clone, Serialize, Deserialize)] +pub struct V2State { + pub last_timestamp: Option<String>, + pub processed_rows: u64, + /// Rows at `last_timestamp` already delivered; used to skip them when the + /// Flux query uses `>= $cursor` and a batch boundary lands mid-timestamp. + pub cursor_row_count: u64, +} + +#[derive(Debug, Default, Clone, Serialize, Deserialize)] +pub struct V3State { + pub last_timestamp: Option<String>, + pub processed_rows: u64, + /// Current effective batch size after stuck-timestamp inflation. + /// Reset to the configured base value when the cursor advances. + pub effective_batch_size: u32, + /// Row offset within the last timestamp group — used as a tiebreaker + /// so that siblings at the same timestamp are not silently dropped. + pub last_timestamp_row_offset: u64, +} + +// ── Payload format ──────────────────────────────────────────────────────────── + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)] +pub enum PayloadFormat { + #[default] + Json, + Text, + Raw, +} + +impl PayloadFormat { + pub fn from_config(value: Option<&str>) -> Self { + match value.map(|v| v.to_ascii_lowercase()).as_deref() { + Some("text") | Some("utf8") => PayloadFormat::Text, + Some("raw") | Some("base64") => PayloadFormat::Raw, + Some("json") => PayloadFormat::Json, + other => { + if other.is_some() { + warn!( + "Unrecognized payload_format {:?}, falling back to JSON", + other + ); + } + PayloadFormat::Json + } + } + } + + pub fn schema(self) -> Schema { + match self { + PayloadFormat::Json => Schema::Json, + PayloadFormat::Text => Schema::Text, + PayloadFormat::Raw => Schema::Raw, + } + } +} + +// ── Cursor validation ───────────────────────────────────────────────────────── + +static CURSOR_RE: OnceLock<regex::Regex> = OnceLock::new(); + +pub fn cursor_re() -> &'static regex::Regex { + CURSOR_RE.get_or_init(|| { + // Validates RFC 3339 timestamp structure with proper field ranges: + // month 01-12, day 01-31, hour 00-23, minute/second 00-59. + // Timezone suffix is required: a naive timestamp without Z or +HH:MM + // is rejected to prevent silent UTC-vs-local ambiguity between V2 (Flux + // always treats timestamps as UTC) and V3 (SQL engine timezone depends + // on server config). + // Note: day 29-31 validity for a given month is not checked by the regex; + // chrono parsing inside validate_cursor handles that for tz-aware timestamps. + regex::Regex::new( + r"(?-u)^\d{4}-(0[1-9]|1[0-2])-(0[1-9]|[12]\d|3[01])T([01]\d|2[0-3]):[0-5]\d:[0-5]\d(\.\d+)?(Z|[+-]\d{2}:\d{2})$" + ) + .expect("hardcoded regex is valid") + }) +} + +pub fn validate_cursor(cursor: &str) -> Result<(), Error> { + if !cursor_re().is_match(cursor) { + return Err(Error::InvalidConfigValue(format!( + "cursor value {cursor:?} is not a valid RFC 3339 timestamp" + ))); + } + // Chain chrono parse to catch calendar-invalid dates (e.g. Feb 30) + chrono::DateTime::parse_from_rfc3339(cursor).map_err(|e| { + Error::InvalidConfigValue(format!( + "cursor value {cursor:?} failed chrono validation: {e}" + )) + })?; + Ok(()) +} + +/// Validate `cursor_field` for the given connector version. +/// +/// `version` should be `"v2"` or `"v3"`. The function is version-strict: `"_time"` +/// is only valid for V2 (Flux annotation column) and `"time"` is only valid for V3 +/// (SQL timestamp column). Swapping them silently would produce empty result sets +/// or query errors at the InfluxDB level. +pub fn validate_cursor_field(field: &str, version: &str) -> Result<(), Error> { + if field.is_empty() { + return Err(Error::InvalidConfigValue(format!( + "cursor_field must not be empty for {version} — \ + use \"_time\" for v2 or \"time\" for v3" + ))); + } + match (field, version) { + ("time", "v2") => Err(Error::InvalidConfigValue( + "cursor_field \"time\" is not valid for v2 — use \"_time\" \ + (the Flux annotated-CSV timestamp column)" + .into(), + )), + ("_time", "v3") => Err(Error::InvalidConfigValue( + "cursor_field \"_time\" is not valid for v3 — use \"time\" \ + (the SQL timestamp column)" + .into(), + )), + // Allow everything else — custom column names are valid + _ => Ok(()), + } +} + +// ── Timestamp helpers ───────────────────────────────────────────────────────── + +/// Return `true` if timestamp string `a` is strictly after the pre-parsed `b`. +/// +/// `b` is accepted as an already-parsed `DateTime<Utc>` so callers that compare +/// against the same cursor on every row in a batch parse it once, not O(n) times. +/// `a` is parsed on each call. Returns `false` conservatively when `a` fails to +/// parse — do NOT advance the cursor when comparison is ambiguous. Lexicographic +/// comparison is incorrect for timestamps with different timezone offsets +/// (e.g. `+05:30` vs `Z`) and would silently produce wrong cursor advancement. +pub fn is_timestamp_after(a: &str, b_parsed: DateTime<Utc>) -> bool { + match a.parse::<DateTime<Utc>>() { + Ok(dt_a) => dt_a > b_parsed, + Err(_) => { + warn!( + "is_timestamp_after: could not parse {a:?} as RFC 3339; \ + refusing to advance cursor" + ); + false + } + } +} + +/// Return `true` if timestamps `a` and `b` represent the same instant, +/// regardless of timezone format differences. +/// +/// Raw string equality is wrong here: `"2024-01-01T00:00:00Z"` and +/// `"2024-01-01T00:00:00+00:00"` are the same instant but differ lexically. +/// This causes `all_at_cursor` to flip `false` incorrectly for one poll round, +/// producing duplicate delivery that self-heals next poll once the cursor +/// string is overwritten. +/// +/// Falls back to string equality if either value fails to parse — conservative, +/// avoids a false "not equal" that would produce unnecessary duplicates. +pub(crate) fn timestamps_equal(a: &str, b: &str) -> bool { + match (a.parse::<DateTime<Utc>>(), b.parse::<DateTime<Utc>>()) { + (Ok(dt_a), Ok(dt_b)) => dt_a == dt_b, + _ => { + warn!( + "timestamps_equal: could not parse timestamps as RFC 3339 \ + ({a:?} vs {b:?}); falling back to string equality" + ); + a == b + } + } +} +// ── Scalar parsing ──────────────────────────────────────────────────────────── + +/// Parse a string value from InfluxDB into the most specific JSON scalar type. +/// +/// Tries `bool`, then `i64`, then `f64`; falls back to `String`. An empty +/// string becomes `null`. `NaN` and `±Infinity` are emitted as strings because +/// JSON has no representation for non-finite floats +/// (`serde_json::Number::from_f64` returns `None` for them). +pub fn parse_scalar(value: &str) -> serde_json::Value { + if value.is_empty() { + return serde_json::Value::Null; + } + if let Ok(v) = value.parse::<bool>() { + return serde_json::Value::Bool(v); + } + if let Ok(v) = value.parse::<i64>() { + return serde_json::Value::Number(v.into()); + } + if let Ok(v) = value.parse::<f64>() + && let Some(number) = serde_json::Number::from_f64(v) + { + return serde_json::Value::Number(number); + } + serde_json::Value::String(value.to_string()) +} + +// ── Query template substitution ─────────────────────────────────────────────── + +/// Substitute `$cursor` and `$limit` placeholders in a query template in a +/// single pass, avoiding the two intermediate `String` allocations that +/// `clone() + replace() + replace()` would produce. +pub(crate) fn apply_query_params( Review Comment: `apply_query_params` silently no-ops when `$offset` is absent from the user template. v3 stuck-batch logic at `v3.rs:432,449` writes `last_timestamp_row_offset` into state on the assumption that the next query will use it via `OFFSET $offset` - but `open()` (`lib.rs:193-302`) never validates that the v3 query string contains `$offset` when `stuck_batch_cap_factor > 1`. default v3 fixture (`integration/tests/connectors/influxdb/source_v3.toml`) and the source `config.toml` template lack `OFFSET $offset` entirely. result: stuck-cap inflation runs without the offset clause, the same head rows get re-fetched and re-emitted on every poll - duplicate delivery. fix: at `open()`, when `stuck_batch_cap_factor > 0` (or just unconditionally for v3), reject queries whose template doesn't contain the literal `$offset` token. ########## core/integration/tests/connectors/fixtures/wiremock.rs: ########## @@ -77,12 +77,19 @@ impl WireMockContainer { message: format!("Failed to get host: {e}"), })?; - let host_port = container - .get_host_port_ipv4(WIREMOCK_PORT) + let ports = container Review Comment: unrelated scope: this IPv4/IPv6 fallback change is for the wiremock fixture, not influxdb. influxdb fixtures (`fixtures/influxdb/*.rs`) contain zero references to wiremock. split into its own PR per the single-purpose-PR policy. ########## core/connectors/sources/influxdb_source/src/v3.rs: ########## @@ -0,0 +1,1298 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +//! InfluxDB V3 source — SQL queries, JSONL responses, Bearer auth. +//! +//! V3 uses strict `> cursor` semantics. DataFusion/Parquet does not guarantee +//! stable ordering for rows that share the same timestamp, so the V2 skip-N +//! approach is not safe here. If all rows in a batch share the same timestamp, +//! the cursor cannot advance — the effective batch size is doubled each poll +//! up to `stuck_batch_cap_factor × batch_size`. If the cap is reached, the +//! circuit breaker is tripped. + +use crate::common::{ + DEFAULT_V3_CURSOR_FIELD, PayloadFormat, Row, RowContext, V3SourceConfig, V3State, + apply_query_params, is_timestamp_after, parse_jsonl_rows, timestamps_equal, validate_cursor, +}; +use base64::{Engine as _, engine::general_purpose}; +use chrono::{DateTime, Utc}; +use iggy_connector_sdk::{Error, ProducedMessage, Schema}; +use reqwest::Url; +use reqwest_middleware::ClientWithMiddleware; +use serde_json::json; +use tracing::warn; +use uuid::Uuid; + +pub(crate) const DEFAULT_STUCK_CAP_FACTOR: u32 = 10; +/// Upper bound for `stuck_batch_cap_factor`. A value of 1000 with batch_size=1000 +/// would issue 1,000,000-row queries before tripping the circuit breaker. +pub(crate) const MAX_STUCK_CAP_FACTOR: u32 = 100; + +/// Hard cap on buffered JSONL response body size. +/// +/// `MAX_STUCK_CAP_FACTOR` can inflate the effective batch to 100 × `batch_size`, +/// making unbounded `response.text()` a real OOM vector under misconfiguration. +/// Streaming stops and returns an error once this many bytes have been read. +const MAX_RESPONSE_BODY_BYTES: usize = 256 * 1024 * 1024; // 256 MiB + +/// InfluxDB V3 query endpoint expects this exact string for JSONL response format. +const QUERY_FORMAT_JSONL: &str = "jsonl"; + +fn build_query(base: &str, query: &str, db: &str) -> Result<(Url, serde_json::Value), Error> { + let url = Url::parse(&format!("{base}/api/v3/query_sql")) + .map_err(|e| Error::InvalidConfigValue(format!("Invalid InfluxDB URL: {e}")))?; + let body = json!({ + "db": db, + "q": query, + "format": QUERY_FORMAT_JSONL + }); + Ok((url, body)) +} + +// ── Query execution ─────────────────────────────────────────────────────────── + +pub(crate) async fn run_query( + client: &ClientWithMiddleware, + config: &V3SourceConfig, + auth: &str, + cursor: &str, + effective_batch: u32, + offset: u64, +) -> Result<String, Error> { + validate_cursor(cursor)?; + let q = apply_query_params( + &config.query, + cursor, + &effective_batch.to_string(), + &offset.to_string(), /* &str */ + ); + let base = config.url.trim_end_matches('/'); + let (url, body) = build_query(base, &q, &config.db)?; + + let mut response = client + .post(url) + .header("Authorization", auth) + .header("Content-Type", "application/json") + .header("Accept", "application/json") + .json(&body) + .send() + .await + .map_err(|e| Error::Storage(format!("InfluxDB V3 query failed: {e}")))?; + + let status = response.status(); + if status.is_success() { + // Stream chunk-by-chunk with a hard byte cap to prevent OOM when + // MAX_STUCK_CAP_FACTOR inflates the effective batch to 100 × batch_size. + if response + .content_length() + .is_some_and(|n| n as usize > MAX_RESPONSE_BODY_BYTES) + { + return Err(Error::Storage(format!( + "InfluxDB V3 response body exceeds {MAX_RESPONSE_BODY_BYTES} byte cap; \ + reduce batch_size to avoid OOM" + ))); + } + let mut buf: Vec<u8> = Vec::new(); + while let Some(chunk) = response + .chunk() + .await + .map_err(|e| Error::Storage(format!("Failed to read V3 response: {e}")))? + { + buf.extend_from_slice(&chunk); + if buf.len() > MAX_RESPONSE_BODY_BYTES { + return Err(Error::Storage(format!( + "InfluxDB V3 response body exceeded {MAX_RESPONSE_BODY_BYTES} byte cap \ + while streaming; reduce batch_size to avoid OOM" + ))); + } + } + return String::from_utf8(buf) + .map_err(|e| Error::Storage(format!("V3 response body is not valid UTF-8: {e}"))); + } + + let body_text = response + .text() + .await + .unwrap_or_else(|_| "failed to read response body".to_string()); + + // 404 "database not found" means the namespace has not been written to yet; + // treat it as empty rather than a failure so the circuit breaker stays healthy. + // Any other 404 (e.g. "table not found") is a permanent error — don't swallow it. + if status.as_u16() == 404 { + if body_text.to_lowercase().contains("database not found") { + return Ok(String::new()); + } + return Err(Error::PermanentHttpError(format!( + "InfluxDB V3 query failed with status {status}: {body_text}" + ))); + } + + if iggy_connector_sdk::retry::is_transient_status(status) { + Err(Error::Storage(format!( + "InfluxDB V3 query failed with status {status}: {body_text}" + ))) + } else { + Err(Error::PermanentHttpError(format!( + "InfluxDB V3 query failed with status {status}: {body_text}" + ))) + } +} + +// ── Message building ────────────────────────────────────────────────────────── + +fn build_payload( + row: &Row, + payload_column: Option<&str>, + payload_format: PayloadFormat, + include_metadata: bool, + cursor_field: &str, +) -> Result<Vec<u8>, Error> { + if let Some(col) = payload_column { + let raw = row + .get(col) + .cloned() + .ok_or_else(|| Error::InvalidRecordValue(format!("Missing payload column '{col}'")))?; + return match payload_format { + // raw is already a serde_json::Value — serialize directly, no re-parse. + PayloadFormat::Json => serde_json::to_vec(&raw) + .map_err(|e| Error::Serialization(format!("JSON serialization failed: {e}"))), + PayloadFormat::Text => match raw { + serde_json::Value::String(s) => Ok(s.into_bytes()), + other => serde_json::to_vec(&other) + .map_err(|e| Error::Serialization(format!("JSON serialization failed: {e}"))), + }, + PayloadFormat::Raw => { + let s = raw.as_str().ok_or_else(|| { + Error::InvalidRecordValue(format!( + "Payload column '{col}' must be a string value for Raw format" + )) + })?; + general_purpose::STANDARD.decode(s.as_bytes()).map_err(|e| { + Error::InvalidRecordValue(format!("Failed to decode payload as base64: {e}")) + }) + } + }; + } + + // V3 rows carry typed serde_json::Values — clone directly, no parse_scalar needed. + // When include_metadata=false, exclude the cursor column (timestamp). + let json_row: serde_json::Map<_, _> = row Review Comment: ref to lines 194-198. `(k.clone(), v.clone())` per field per row in `build_payload`. at default batch=500 with ~20 fields per row that's ~10k `String` + `serde_json::Value` clones per poll. called from `process_rows` at `v3.rs:311-317`, fully in the hot path. fix: emit directly to a writer without materializing the filtered map - either a custom `Serialize` impl over `(&Row, &str, bool)` that filters during serialization, or a borrowing `serde_json::Map<&str, &Value>` written via `serde_json::to_writer`. ########## core/connectors/sources/influxdb_source/src/lib.rs: ########## @@ -678,70 +195,116 @@ impl Source for InfluxDbSource { return Err(Error::InvalidState); } + let ver = self.config.version_label(); info!( - "Opening InfluxDB source connector with ID: {}. Org: {}", - self.id, self.config.org + "Opening {CONNECTOR_NAME} with ID: {} (version={ver})", + self.id ); - // Build the raw client first and use it for the startup connectivity - // check. The connectivity retry loop uses separate delay bounds - // (open_retry_max_delay) from the per-query middleware retries, so - // we keep them independent. - let raw_client = self.build_raw_client()?; + validate_cursor_field(self.config.cursor_field(), self.config.version_label())?; + if let Some(offset) = self.config.initial_offset() { + validate_cursor(offset)?; + } + + if let InfluxDbSourceConfig::V3(cfg) = &self.config + && let Some(cap) = cfg.stuck_batch_cap_factor + && cap > v3::MAX_STUCK_CAP_FACTOR + { + return Err(Error::InvalidConfigValue(format!( + "stuck_batch_cap_factor {cap} exceeds maximum of {}; \ + reduce it to avoid querying up to {}×batch_size rows per poll.", + v3::MAX_STUCK_CAP_FACTOR, + v3::MAX_STUCK_CAP_FACTOR + ))); + } + + if let InfluxDbSourceConfig::V3(_) = &self.config + && self.config.batch_size() == 0 + { + return Err(Error::InvalidConfigValue( + "batch_size must be >= 1; got 0. \ + A LIMIT 0 query would return no rows and stall the connector." + .into(), + )); + } - // Validate cursor_field before touching the network: string comparison - // is only safe for timestamp columns. See validate_cursor_field for details. - Self::validate_cursor_field(self.cursor_field())?; + // Skip-N dedup for V2 requires rows to arrive sorted by time. If the Flux + // query lacks an explicit sort, InfluxDB may return rows in storage order, + // causing the dedup to skip the wrong rows silently. + if let InfluxDbSourceConfig::V2(cfg) = &self.config + && !query_has_sort_call(&cfg.query) + { + warn!( Review Comment: ref to lines 231-244. v2 sort heuristic only `warn!`s. skip-N at `v2.rs:387` is order-dependent: with `>=` query semantics and no `|> sort(columns: ["_time"])`, influxdb may return rows in storage order, causing dedup to skip the wrong rows silently. shipped default config uses strict `>` so skip-N is currently dead code, but any user that switches to `>=` (per the comment's own implication) silently gets wrong rows. fix: hard-error at `open()` when query lacks the sort call, or reject `>=`-style queries that don't pass the heuristic. ########## core/connectors/sinks/influxdb_sink/src/lib.rs: ########## @@ -443,33 +543,27 @@ impl InfluxDbSink { buf.push('"'); } PayloadFormat::Text => { - let payload_bytes = message.payload.try_to_bytes().map_err(|e| { - Error::CannotStoreData(format!("Failed to convert payload to bytes: {e}")) + let bytes = message.payload.try_to_bytes().map_err(|e| { + Error::CannotStoreData(format!("Payload conversion failed: {e}")) })?; - let text = String::from_utf8(payload_bytes).map_err(|e| { - Error::CannotStoreData(format!( - "Payload format is text but payload is invalid UTF-8: {e}" - )) + let text = String::from_utf8(bytes).map_err(|e| { + Error::CannotStoreData(format!("Payload is not valid UTF-8: {e}")) })?; buf.push_str(",payload_text=\""); write_field_string(buf, &text); buf.push('"'); } PayloadFormat::Base64 => { - let payload_bytes = message.payload.try_to_bytes().map_err(|e| { - Error::CannotStoreData(format!("Failed to convert payload to bytes: {e}")) + let bytes = message.payload.try_to_bytes().map_err(|e| { + Error::CannotStoreData(format!("Payload conversion failed: {e}")) })?; - let encoded = general_purpose::STANDARD.encode(&payload_bytes); + let encoded = general_purpose::STANDARD.encode(&bytes); buf.push_str(",payload_base64=\""); write_field_string(buf, &encoded); buf.push('"'); } } - // ── Timestamp ──────────────────────────────────────────────────────── - // message.timestamp is microseconds since Unix epoch. - // Fall back to now() when unset (0) so points are not stored at the - // Unix epoch (year 1970), which falls outside every range(start:-1h). let base_micros = if message.timestamp == 0 { Review Comment: ref to lines 567-574. `if message.timestamp == 0` is a magic-value sentinel, theoretically colliding with a legitimate UNIX_EPOCH (1970-01-01T00:00:00Z) message. iggy server overwrites `header.timestamp` with `IggyTimestamp::now().as_micros()` during ingest (see `core/common/src/types/message/messages_batch_mut.rs`), so live production traffic should never present zero. the collision is real only for externally-imported journal/snapshot data. lower priority but the pattern is fragile - prefer `Option<u64>` on `ConsumedMessage.timestamp` or a named const + explicit log when substituting `SystemTime::now()`. ########## core/connectors/sinks/influxdb_sink/src/lib.rs: ########## @@ -125,316 +370,171 @@ enum PayloadFormat { impl PayloadFormat { fn from_config(value: Option<&str>) -> Self { match value.map(|v| v.to_ascii_lowercase()).as_deref() { - Some("text") | Some("utf8") => PayloadFormat::Text, - Some("base64") | Some("raw") => PayloadFormat::Base64, - Some("json") => PayloadFormat::Json, - other => { - warn!( - "Unrecognized payload_format value {:?}, falling back to JSON. \ - Valid values are: \"json\", \"text\", \"utf8\", \"base64\", \"raw\".", - other - ); - PayloadFormat::Json - } - } - } -} - -// --------------------------------------------------------------------------- -// Helpers -// --------------------------------------------------------------------------- - -/// Write an escaped measurement name into `buf`. -/// Escapes: `\` → `\\`, `,` → `\,`, ` ` → `\ `, `\n` → `\\n`, `\r` → `\\r` -/// -/// Newline (`\n`) and carriage-return (`\r`) are the InfluxDB line-protocol -/// record delimiters; a literal newline inside a measurement name would split -/// the line and corrupt the batch. -fn write_measurement(buf: &mut String, value: &str) { - for ch in value.chars() { - match ch { - '\\' => buf.push_str("\\\\"), - ',' => buf.push_str("\\,"), - ' ' => buf.push_str("\\ "), - '\n' => buf.push_str("\\n"), - '\r' => buf.push_str("\\r"), - _ => buf.push(ch), - } - } -} - -/// Write an escaped tag key/value into `buf`. -/// Escapes: `\` → `\\`, `,` → `\,`, `=` → `\=`, ` ` → `\ `, `\n` → `\\n`, `\r` → `\\r` -/// -/// Newline and carriage-return are escaped for the same reason as in -/// [`write_measurement`]: they are InfluxDB line-protocol record delimiters. -fn write_tag_value(buf: &mut String, value: &str) { - for ch in value.chars() { - match ch { - '\\' => buf.push_str("\\\\"), - ',' => buf.push_str("\\,"), - '=' => buf.push_str("\\="), - ' ' => buf.push_str("\\ "), - '\n' => buf.push_str("\\n"), - '\r' => buf.push_str("\\r"), - _ => buf.push(ch), - } - } -} - -/// Write an escaped string field value (without surrounding quotes) into `buf`. -/// Escapes: `\` → `\\`, `"` → `\"`, `\n` → `\\n`, `\r` → `\\r` -/// -/// Newline and carriage-return are the InfluxDB line-protocol record -/// delimiters; a literal newline inside a string field value (e.g. from a -/// multi-line text payload) would split the line and corrupt the batch. -fn write_field_string(buf: &mut String, value: &str) { - for ch in value.chars() { - match ch { - '\\' => buf.push_str("\\\\"), - '"' => buf.push_str("\\\""), - '\n' => buf.push_str("\\n"), - '\r' => buf.push_str("\\r"), - _ => buf.push(ch), + Some("text") | Some("utf8") => Self::Text, + Some("base64") | Some("raw") => Self::Base64, + _ => Self::Json, } } } -// --------------------------------------------------------------------------- -// InfluxDbSink implementation -// --------------------------------------------------------------------------- +// ── InfluxDbSink impl ───────────────────────────────────────────────────────── impl InfluxDbSink { pub fn new(id: u32, config: InfluxDbSinkConfig) -> Self { - let verbose = config.verbose_logging.unwrap_or(false); - let retry_delay = parse_duration(config.retry_delay.as_deref(), DEFAULT_RETRY_DELAY); - let payload_format = PayloadFormat::from_config(config.payload_format.as_deref()); - - // Build circuit breaker from config - let cb_threshold = config - .circuit_breaker_threshold - .unwrap_or(DEFAULT_CIRCUIT_BREAKER_THRESHOLD); - let cb_cool_down = parse_duration( - config.circuit_breaker_cool_down.as_deref(), - DEFAULT_CIRCUIT_COOL_DOWN, - ); - - InfluxDbSink { + let verbose = config.verbose_logging(); + let retry_delay = parse_duration(config.retry_delay(), DEFAULT_RETRY_DELAY); + let payload_format = PayloadFormat::from_config(config.payload_format()); + let circuit_breaker = Arc::new(CircuitBreaker::new( + config.circuit_breaker_threshold(), + parse_duration( + config.circuit_breaker_cool_down(), + DEFAULT_CIRCUIT_COOL_DOWN, + ), + )); + let measurement = config.measurement().unwrap_or("iggy_messages").to_string(); + let precision = config.precision().to_string(); + let include_metadata = config.include_metadata(); + let include_checksum = config.include_checksum(); + let include_origin_timestamp = config.include_origin_timestamp(); + let include_stream_tag = config.include_stream_tag(); + let include_topic_tag = config.include_topic_tag(); + let include_partition_tag = config.include_partition_tag(); + let batch_size_limit = config.batch_size().max(1) as usize; + + Self { id, config, client: None, write_url: None, + auth_header: None, + circuit_breaker, messages_attempted: AtomicU64::new(0), write_success: AtomicU64::new(0), write_errors: AtomicU64::new(0), verbose, retry_delay, payload_format, - circuit_breaker: Arc::new(CircuitBreaker::new(cb_threshold, cb_cool_down)), + measurement, + precision, + include_metadata, + include_checksum, + include_origin_timestamp, + include_stream_tag, + include_topic_tag, + include_partition_tag, + batch_size_limit, } } fn build_raw_client(&self) -> Result<reqwest::Client, Error> { - let timeout = parse_duration(self.config.timeout.as_deref(), DEFAULT_TIMEOUT); + let timeout = parse_duration(self.config.timeout(), DEFAULT_TIMEOUT); reqwest::Client::builder() .timeout(timeout) .build() .map_err(|e| Error::InitError(format!("Failed to create HTTP client: {e}"))) } - fn build_write_url(&self) -> Result<Url, Error> { - let base = self.config.url.trim_end_matches('/'); - let mut url = Url::parse(&format!("{base}/api/v2/write")) - .map_err(|e| Error::InvalidConfigValue(format!("Invalid InfluxDB URL: {e}")))?; - - let precision = self - .config - .precision - .as_deref() - .unwrap_or(DEFAULT_PRECISION); - url.query_pairs_mut() - .append_pair("org", &self.config.org) - .append_pair("bucket", &self.config.bucket) - .append_pair("precision", precision); - - Ok(url) - } - - fn build_health_url(&self) -> Result<Url, Error> { - let base = self.config.url.trim_end_matches('/'); - Url::parse(&format!("{base}/health")) - .map_err(|e| Error::InvalidConfigValue(format!("Invalid InfluxDB URL: {e}"))) - } - fn get_client(&self) -> Result<&ClientWithMiddleware, Error> { - self.client - .as_ref() - .ok_or_else(|| Error::Connection("InfluxDB client is not initialized".to_string())) - } - - fn measurement(&self) -> &str { - self.config - .measurement - .as_deref() - .unwrap_or("iggy_messages") - } - - fn payload_format(&self) -> PayloadFormat { - self.payload_format - } - - fn timestamp_precision(&self) -> &str { - self.config - .precision - .as_deref() - .unwrap_or(DEFAULT_PRECISION) - } - - fn get_max_retries(&self) -> u32 { - self.config - .max_retries - .unwrap_or(DEFAULT_MAX_RETRIES) - .max(1) + self.client.as_ref().ok_or_else(|| { + Error::Connection("InfluxDB client not initialized — call open() first".to_string()) + }) } + #[inline] fn to_precision_timestamp(&self, micros: u64) -> u64 { - match self.timestamp_precision() { + match self.precision.as_str() { "ns" => micros.saturating_mul(1_000), "us" => micros, "ms" => micros / 1_000, "s" => micros / 1_000_000, - _ => micros, + _ => micros, // unreachable if open() validated precision as the precision is validated in the config validation } } - /// Serialise one message as a line-protocol line, appending directly into - /// `buf` with no intermediate `Vec<String>` for tags or fields. - /// - /// # Allocation budget (per message, happy path) - /// - Zero `Vec` allocations for tags or fields. - /// - Zero per-tag/per-field `format!` allocations. - /// - One `Vec<u8>` for `payload_bytes` (unavoidable — payload must be - /// decoded/serialised before it can be escaped into the buffer). - /// - The caller's `buf` grows in place; if it was pre-allocated with - /// `with_capacity` it will not reallocate for typical message sizes. fn append_line( &self, buf: &mut String, topic_metadata: &TopicMetadata, messages_metadata: &MessagesMetadata, message: &ConsumedMessage, ) -> Result<(), Error> { - let include_metadata = self.config.include_metadata.unwrap_or(true); - let include_checksum = self.config.include_checksum.unwrap_or(true); - let include_origin_timestamp = self.config.include_origin_timestamp.unwrap_or(true); - let include_stream_tag = self.config.include_stream_tag.unwrap_or(true); - let include_topic_tag = self.config.include_topic_tag.unwrap_or(true); - let include_partition_tag = self.config.include_partition_tag.unwrap_or(true); - - // ── Measurement ────────────────────────────────────────────────────── - write_measurement(buf, self.measurement()); - - // ── Tag set ────────────────────────────────────────────────────────── - // Tags are written as ",key=value" pairs directly into buf. - // The offset tag is always present — it makes every point unique in - // InfluxDB's deduplication key (measurement + tag set + timestamp), - // regardless of precision or how many messages share a timestamp. - if include_metadata && include_stream_tag { + write_measurement(buf, &self.measurement); + + // Tag *key* strings below ("stream", "topic", "partition", "offset", etc.) are + // static ASCII literals — they contain no InfluxDB line-protocol special chars + // (comma, equals, space, backslash, newline) and therefore do not need escaping. + // Only the tag *values* (user-supplied stream/topic names) are escaped via + // `write_tag_value`. The user-supplied `measurement` is escaped via + // `write_measurement`. + if self.include_metadata && self.include_stream_tag { buf.push_str(",stream="); write_tag_value(buf, &topic_metadata.stream); } - if include_metadata && include_topic_tag { + if self.include_metadata && self.include_topic_tag { buf.push_str(",topic="); write_tag_value(buf, &topic_metadata.topic); } - if include_metadata && include_partition_tag { - use std::fmt::Write as _; - write!(buf, ",partition={}", messages_metadata.partition_id) - .expect("write to String is infallible"); - } - // offset tag — always written, ensures point uniqueness - { - use std::fmt::Write as _; - write!(buf, ",offset={}", message.offset).expect("write to String is infallible"); + if self.include_metadata && self.include_partition_tag { + write!(buf, ",partition={}", messages_metadata.partition_id).expect("infallible"); } + // `offset` is always written as a tag regardless of `include_metadata`. + // It forms the deduplication key for idempotent writes: without it, two + // messages at the same timestamp in the same measurement+tag-set would + // silently overwrite each other in InfluxDB's last-write-wins model. + write!(buf, ",offset={}", message.offset).expect("infallible"); - // ── Field set ──────────────────────────────────────────────────────── - // First field: no leading comma. All subsequent fields: leading comma. buf.push(' '); - buf.push_str("message_id=\""); - write_field_string(buf, &message.id.to_string()); + let _ = write!(buf, "{}", message.id); buf.push('"'); - // offset as a numeric field (queryable in Flux) in addition to the tag - { - use std::fmt::Write as _; - write!(buf, ",offset={}u", message.offset).expect("write to String is infallible"); - } - - // Optional metadata fields written when the corresponding tag is - // disabled (so the value is still queryable as a field). - if include_metadata && !include_stream_tag { + if self.include_metadata && !self.include_stream_tag { buf.push_str(",iggy_stream=\""); write_field_string(buf, &topic_metadata.stream); buf.push('"'); } - if include_metadata && !include_topic_tag { + if self.include_metadata && !self.include_topic_tag { buf.push_str(",iggy_topic=\""); write_field_string(buf, &topic_metadata.topic); buf.push('"'); } - if include_metadata && !include_partition_tag { - use std::fmt::Write as _; + if self.include_metadata && !self.include_partition_tag { write!( buf, ",iggy_partition={}u", messages_metadata.partition_id as u64 ) - .expect("write to String is infallible"); + .expect("infallible"); } - if include_checksum { - use std::fmt::Write as _; - write!(buf, ",iggy_checksum={}u", message.checksum) - .expect("write to String is infallible"); + if self.include_checksum { + write!(buf, ",iggy_checksum={}u", message.checksum).expect("infallible"); } - if include_origin_timestamp { - use std::fmt::Write as _; + if self.include_origin_timestamp { write!(buf, ",iggy_origin_timestamp={}u", message.origin_timestamp) - .expect("write to String is infallible"); + .expect("infallible"); } - // ── Payload field ──────────────────────────────────────────────────── - match self.payload_format() { + match self.payload_format { PayloadFormat::Json => { - // Fast path: if the payload is already a parsed simd_json value, - // serialise directly to a compact string — one pass, no bytes - // round-trip. Avoids: simd_json→bytes, bytes→serde_json::Value, - // serde_json::Value→string (three allocating passes per message). - // - // Fallback: any other Payload variant (Raw bytes that happen to - // contain JSON, Text, etc.) goes through try_to_bytes() first. + // simd_json is used here (not serde_json) because this is the hot path: Review Comment: the comment claims `simd_json` provides ~2x throughput on the sink emit path, but simd-json 0.17.0 has no SIMD on serialize - `simd-json/src/serde/se.rs:19-26,33-38` shows `to_vec`/`to_string` use a plain `Vec<u8>` writer through a serde Serializer trait via `BaseGenerator`. SIMD applies only to the parse path. additionally the fallback branch (lines 528-539) does `try_to_bytes` -> `serde_json::from_slice` -> `serde_json::to_string`, a parse+reserialize round-trip strictly slower than direct `serde_json::to_string`. drop the comment or replace with measured perf data, and switch the hot path to `serde_json::to_writer(&mut buf, value)` to avoid the intermediate `String` allocation. ########## Cargo.toml: ########## @@ -186,7 +186,7 @@ left-right = "0.11" lending-iterator = "0.1.7" libc = "0.2.186" log = "0.4.29" -lz4_flex = "0.13.0" +lz4_flex = "0.12.1" Review Comment: what is reason for this change? ########## core/connectors/sinks/quickwit_sink/src/lib.rs: ########## @@ -119,29 +119,60 @@ impl QuickwitSink { ); Review Comment: this PR is titled "InfluxDB v2 and v3 connector" but adds an unrelated retry-on-404 loop to the quickwit sink (5 attempts, 500ms backoff, `body.clone()` per attempt at line 132). PR description mentions only influxdb. repo policy is single-purpose PRs - split this into its own PR. ########## core/connectors/sources/influxdb_source/src/v3.rs: ########## @@ -0,0 +1,1298 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +//! InfluxDB V3 source — SQL queries, JSONL responses, Bearer auth. +//! +//! V3 uses strict `> cursor` semantics. DataFusion/Parquet does not guarantee +//! stable ordering for rows that share the same timestamp, so the V2 skip-N +//! approach is not safe here. If all rows in a batch share the same timestamp, +//! the cursor cannot advance — the effective batch size is doubled each poll +//! up to `stuck_batch_cap_factor × batch_size`. If the cap is reached, the +//! circuit breaker is tripped. + +use crate::common::{ + DEFAULT_V3_CURSOR_FIELD, PayloadFormat, Row, RowContext, V3SourceConfig, V3State, + apply_query_params, is_timestamp_after, parse_jsonl_rows, timestamps_equal, validate_cursor, +}; +use base64::{Engine as _, engine::general_purpose}; +use chrono::{DateTime, Utc}; +use iggy_connector_sdk::{Error, ProducedMessage, Schema}; +use reqwest::Url; +use reqwest_middleware::ClientWithMiddleware; +use serde_json::json; +use tracing::warn; +use uuid::Uuid; + +pub(crate) const DEFAULT_STUCK_CAP_FACTOR: u32 = 10; +/// Upper bound for `stuck_batch_cap_factor`. A value of 1000 with batch_size=1000 +/// would issue 1,000,000-row queries before tripping the circuit breaker. +pub(crate) const MAX_STUCK_CAP_FACTOR: u32 = 100; + +/// Hard cap on buffered JSONL response body size. +/// +/// `MAX_STUCK_CAP_FACTOR` can inflate the effective batch to 100 × `batch_size`, +/// making unbounded `response.text()` a real OOM vector under misconfiguration. +/// Streaming stops and returns an error once this many bytes have been read. +const MAX_RESPONSE_BODY_BYTES: usize = 256 * 1024 * 1024; // 256 MiB + +/// InfluxDB V3 query endpoint expects this exact string for JSONL response format. +const QUERY_FORMAT_JSONL: &str = "jsonl"; + +fn build_query(base: &str, query: &str, db: &str) -> Result<(Url, serde_json::Value), Error> { + let url = Url::parse(&format!("{base}/api/v3/query_sql")) + .map_err(|e| Error::InvalidConfigValue(format!("Invalid InfluxDB URL: {e}")))?; + let body = json!({ + "db": db, + "q": query, + "format": QUERY_FORMAT_JSONL + }); + Ok((url, body)) +} + +// ── Query execution ─────────────────────────────────────────────────────────── + +pub(crate) async fn run_query( + client: &ClientWithMiddleware, + config: &V3SourceConfig, + auth: &str, + cursor: &str, + effective_batch: u32, + offset: u64, +) -> Result<String, Error> { + validate_cursor(cursor)?; + let q = apply_query_params( + &config.query, + cursor, + &effective_batch.to_string(), + &offset.to_string(), /* &str */ + ); + let base = config.url.trim_end_matches('/'); + let (url, body) = build_query(base, &q, &config.db)?; + + let mut response = client + .post(url) + .header("Authorization", auth) + .header("Content-Type", "application/json") + .header("Accept", "application/json") + .json(&body) + .send() + .await + .map_err(|e| Error::Storage(format!("InfluxDB V3 query failed: {e}")))?; + + let status = response.status(); + if status.is_success() { + // Stream chunk-by-chunk with a hard byte cap to prevent OOM when + // MAX_STUCK_CAP_FACTOR inflates the effective batch to 100 × batch_size. + if response + .content_length() + .is_some_and(|n| n as usize > MAX_RESPONSE_BODY_BYTES) + { + return Err(Error::Storage(format!( + "InfluxDB V3 response body exceeds {MAX_RESPONSE_BODY_BYTES} byte cap; \ + reduce batch_size to avoid OOM" + ))); + } + let mut buf: Vec<u8> = Vec::new(); + while let Some(chunk) = response + .chunk() + .await + .map_err(|e| Error::Storage(format!("Failed to read V3 response: {e}")))? + { + buf.extend_from_slice(&chunk); + if buf.len() > MAX_RESPONSE_BODY_BYTES { + return Err(Error::Storage(format!( + "InfluxDB V3 response body exceeded {MAX_RESPONSE_BODY_BYTES} byte cap \ + while streaming; reduce batch_size to avoid OOM" + ))); + } + } + return String::from_utf8(buf) + .map_err(|e| Error::Storage(format!("V3 response body is not valid UTF-8: {e}"))); + } + + let body_text = response + .text() + .await + .unwrap_or_else(|_| "failed to read response body".to_string()); + + // 404 "database not found" means the namespace has not been written to yet; + // treat it as empty rather than a failure so the circuit breaker stays healthy. + // Any other 404 (e.g. "table not found") is a permanent error — don't swallow it. + if status.as_u16() == 404 { + if body_text.to_lowercase().contains("database not found") { + return Ok(String::new()); + } + return Err(Error::PermanentHttpError(format!( + "InfluxDB V3 query failed with status {status}: {body_text}" + ))); + } + + if iggy_connector_sdk::retry::is_transient_status(status) { + Err(Error::Storage(format!( + "InfluxDB V3 query failed with status {status}: {body_text}" + ))) + } else { + Err(Error::PermanentHttpError(format!( + "InfluxDB V3 query failed with status {status}: {body_text}" + ))) + } +} + +// ── Message building ────────────────────────────────────────────────────────── + +fn build_payload( + row: &Row, + payload_column: Option<&str>, + payload_format: PayloadFormat, + include_metadata: bool, + cursor_field: &str, +) -> Result<Vec<u8>, Error> { + if let Some(col) = payload_column { + let raw = row + .get(col) + .cloned() + .ok_or_else(|| Error::InvalidRecordValue(format!("Missing payload column '{col}'")))?; + return match payload_format { + // raw is already a serde_json::Value — serialize directly, no re-parse. + PayloadFormat::Json => serde_json::to_vec(&raw) + .map_err(|e| Error::Serialization(format!("JSON serialization failed: {e}"))), + PayloadFormat::Text => match raw { + serde_json::Value::String(s) => Ok(s.into_bytes()), + other => serde_json::to_vec(&other) + .map_err(|e| Error::Serialization(format!("JSON serialization failed: {e}"))), + }, + PayloadFormat::Raw => { + let s = raw.as_str().ok_or_else(|| { + Error::InvalidRecordValue(format!( + "Payload column '{col}' must be a string value for Raw format" + )) + })?; + general_purpose::STANDARD.decode(s.as_bytes()).map_err(|e| { + Error::InvalidRecordValue(format!("Failed to decode payload as base64: {e}")) + }) + } + }; + } + + // V3 rows carry typed serde_json::Values — clone directly, no parse_scalar needed. + // When include_metadata=false, exclude the cursor column (timestamp). + let json_row: serde_json::Map<_, _> = row + .iter() + .filter(|(k, _)| include_metadata || k.as_str() != cursor_field) + .map(|(k, v)| (k.clone(), v.clone())) + .collect(); + serde_json::to_vec(&json_row) + .map_err(|e| Error::Serialization(format!("JSON serialization failed: {e}"))) +} + +/// Compute the next effective batch size when the batch is stuck. +/// Doubles until it reaches `cap`. Returns `None` if already at cap. +pub(crate) fn next_stuck_batch_size(current: u32, base: u32, cap_factor: u32) -> Option<u32> { + let cap = base.saturating_mul(cap_factor); + if current >= cap { + None + } else { + Some(current.saturating_mul(2).min(cap)) + } +} + +// ── Poll ────────────────────────────────────────────────────────────────────── + +pub(crate) struct PollResult { + pub messages: Vec<ProducedMessage>, + pub new_state: V3State, + pub schema: Schema, + /// Set to true when the stuck-timestamp cap was reached and the circuit + /// breaker should be tripped by the caller. + pub trip_circuit_breaker: bool, +} + +// ── Row processing (pure, testable without HTTP) ────────────────────────────── + +/// Normalize a raw timestamp from InfluxDB V3 JSONL into a cursor-safe RFC 3339 string. +/// +/// InfluxDB 3 Core returns timestamps without a timezone suffix and with nanosecond +/// precision (e.g. `"2026-04-26T02:32:20.526360865"`). The only required fix is +/// appending `"Z"` when no timezone suffix is present (InfluxDB always stores UTC). +/// +/// Full nanosecond precision is intentionally preserved — truncating to milliseconds +/// would place the cursor BEFORE the actual row timestamps within the same millisecond, +/// causing `WHERE time > '$cursor'` to re-deliver already-seen rows on subsequent polls. +/// InfluxDB 3's DataFusion SQL engine handles RFC 3339 strings with any number of +/// fractional digits in WHERE clause timestamp comparisons. +fn normalize_v3_timestamp(ts: &str) -> String { + // Fast path: already a valid RFC 3339 timestamp with timezone suffix. + if chrono::DateTime::parse_from_rfc3339(ts).is_ok() { + return ts.to_string(); + } + // Slow path: no timezone suffix — append "Z" to make it RFC 3339 compliant. + format!("{ts}Z") +} + +/// Result of processing a batch of V3 rows into Iggy messages. +#[derive(Debug)] +pub(crate) struct RowProcessingResult { + pub messages: Vec<ProducedMessage>, + pub max_cursor: Option<String>, + /// `true` when every row's `cursor_field` value equals `current_cursor`. + /// Combined with `rows.len() >= effective_batch`, this signals a stuck batch: + /// all returned rows are at the current cursor, meaning the cursor cannot + /// advance with `> cursor` semantics. + pub all_at_cursor: bool, + /// Count of rows whose cursor == max_cursor (for tiebreaker offset). + pub rows_at_max_cursor: u64, +} + +/// Convert a slice of V3 query rows into Iggy messages. +/// +/// Also detects whether all rows share the same cursor value as `ctx.current_cursor` +/// (the `all_at_cursor` flag). The caller uses this together with batch fullness +/// to decide whether to inflate the batch size for the next poll. +/// +/// Unlike V2, V3 uses strict `> cursor` semantics, so there is no row-skipping. +/// All rows in the slice are emitted as messages. +pub(crate) fn process_rows( + rows: &[Row], + ctx: &RowContext<'_>, +) -> Result<RowProcessingResult, Error> { + let mut messages = Vec::with_capacity(rows.len()); + let mut max_cursor: Option<String> = None; + let mut max_cursor_parsed: Option<DateTime<Utc>> = None; // cache parsed form + // Starts true for non-empty batches; flipped to false as soon as any row + // either has a different cursor value or has no cursor field at all. + let mut all_at_cursor = !rows.is_empty(); + // Generate the base UUID once per poll; derive per-message IDs by addition. + // This is O(1) PRNG calls per batch instead of O(n), measurable at batch ≥ 100. + let id_base = Uuid::new_v4().as_u128(); + for row in rows.iter() { + if let Some(raw_cv) = row.get(ctx.cursor_field).and_then(|v| v.as_str()) { + let cv_owned = normalize_v3_timestamp(raw_cv); + let cv = cv_owned.as_str(); + if !timestamps_equal(cv, ctx.current_cursor) { + all_at_cursor = false; + } + validate_cursor(cv)?; + let cv_parsed = cv.parse::<DateTime<Utc>>().ok(); + match (cv_parsed, max_cursor_parsed) { + (Some(new_dt), Some(cur_dt)) if new_dt > cur_dt => { + max_cursor = Some(cv.to_string()); + max_cursor_parsed = Some(new_dt); + } + (Some(new_dt), None) => { + max_cursor = Some(cv.to_string()); + max_cursor_parsed = Some(new_dt); + } + (None, _) if max_cursor_parsed.is_none() => { + // Unparsable cursor — still track it (string fallback) if no + // parsable cursor has been seen yet. + max_cursor = Some(cv.to_string()); + } + _ => {} + } + } else { + all_at_cursor = false; + } + + let payload = build_payload( + row, + ctx.payload_col, + ctx.payload_format, + ctx.include_metadata, + ctx.cursor_field, + )?; + messages.push(ProducedMessage { + id: Some(id_base.wrapping_add(messages.len() as u128)), + checksum: None, + timestamp: Some(ctx.now_micros), + origin_timestamp: Some(ctx.now_micros), + headers: None, + payload, + }); + } + + let rows_at_max_cursor = rows Review Comment: ref to lines 328-337. second pass over `rows` to compute `rows_at_max_cursor`, after the main loop already iterated and tracked `max_cursor`. each row in the second pass calls `normalize_v3_timestamp` again, which on its fast path still allocates via `to_string()` (line 241). at default batch=500 that's 500 extra allocations per poll. fold the counter into the main loop: increment when a row's cursor equals the current max; reset to 1 and recount when max advances. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
