ryerraguntla commented on code in PR #3140:
URL: https://github.com/apache/iggy/pull/3140#discussion_r3232897508
##########
core/connectors/sources/influxdb_source/src/lib.rs:
##########
@@ -752,118 +315,250 @@ impl Source for InfluxDbSource {
});
}
- match self.poll_messages().await {
- Ok((messages, max_cursor, rows_at_max_cursor, skipped)) => {
- // Successful poll — reset circuit breaker
- self.circuit_breaker.record_success();
-
- let mut state = self.state.lock().await;
- state.last_poll_time = Utc::now();
- state.processed_rows += messages.len() as u64;
- match max_cursor {
- Some(ref new_cursor)
- if state.last_timestamp.as_deref() !=
Some(new_cursor.as_str()) =>
- {
- // Cursor advanced to a new timestamp — reset the row
counter.
- state.last_timestamp = max_cursor.clone();
- state.cursor_row_count = rows_at_max_cursor;
- }
- Some(_) => {
- // Cursor stayed at the same timestamp — accumulate so
the
- // next poll skips all already-delivered rows at this
timestamp.
- state.cursor_row_count =
-
state.cursor_row_count.saturating_add(rows_at_max_cursor);
- }
- None => {
- // No rows delivered. If we skipped some rows it means
- // every row in the result was at the current cursor
- // timestamp and had already been seen. `skipped` is
- // therefore the true row count at that timestamp for
- // this query result, so we correct cursor_row_count to
- // that value. This prevents a permanently-inflated
- // counter (e.g. after rows are deleted or compacted in
- // InfluxDB) from causing the skip logic to over-skip
on
- // every subsequent poll and stall the connector.
- if skipped > 0 {
- state.cursor_row_count = skipped;
+ let client = self.get_client()?;
+ let auth = self
+ .auth_header
+ .as_ref()
+ .map(|s| s.expose_secret().as_str())
+ .ok_or_else(|| {
+ Error::Connection("auth_header not initialised — was open()
called?".to_string())
+ })?;
+ match &self.version_state {
+ VersionState::V2(state_mu) => {
+ let InfluxDbSourceConfig::V2(cfg) = &self.config else {
+ return Err(Error::InvalidState);
+ };
+
+ let state_snap = state_mu.lock().await.clone();
+ match v2::poll(
+ client,
+ cfg,
+ auth,
+ &state_snap,
+ self.payload_format,
+ self.config.include_metadata(),
+ )
+ .await
+ {
+ Ok(result) => {
+ self.circuit_breaker.record_success();
+ let mut state = state_mu.lock().await;
+ state.processed_rows += result.messages.len() as u64;
+ apply_v2_cursor_advance(
+ &mut state,
+ result.max_cursor,
+ result.rows_at_max_cursor,
+ result.skipped,
+ );
+
+ if self.config.verbose_logging() {
+ info!(
+ "{CONNECTOR_NAME} ID: {} produced {} messages
(V2). \
+ Total: {}. Cursor: {:?}",
+ self.id,
+ result.messages.len(),
+ state.processed_rows,
+ state.last_timestamp
+ );
+ } else {
+ debug!(
+ "{CONNECTOR_NAME} ID: {} produced {} messages
(V2). \
+ Total: {}. Cursor: {:?}",
+ self.id,
+ result.messages.len(),
+ state.processed_rows,
+ state.last_timestamp
+ );
}
- }
- }
- if self.verbose {
- info!(
- "InfluxDB source ID: {} produced {} messages. \
- Total processed: {}. Cursor: {:?}",
- self.id,
- messages.len(),
- state.processed_rows,
- state.last_timestamp
- );
- } else {
- debug!(
- "InfluxDB source ID: {} produced {} messages. \
- Total processed: {}. Cursor: {:?}",
- self.id,
- messages.len(),
- state.processed_rows,
- state.last_timestamp
- );
+ let persisted = ConnectorState::serialize(
+ &PersistedState::V2(V2State {
+ last_timestamp: state.last_timestamp.clone(),
+ processed_rows: state.processed_rows,
+ cursor_row_count: state.cursor_row_count,
+ }),
+ CONNECTOR_NAME,
+ self.id,
+ );
+
+ Ok(ProducedMessages {
+ schema: result.schema,
+ messages: result.messages,
+ state: persisted,
+ })
+ }
+ Err(e) => self.handle_poll_error(e).await,
}
+ }
- let schema = if self.config.payload_column.is_some() {
- self.payload_format().schema()
- } else {
- Schema::Json
+ VersionState::V3(state_mu) => {
+ let InfluxDbSourceConfig::V3(cfg) = &self.config else {
+ return Err(Error::InvalidState);
};
- let persisted_state = self.serialize_state(&state);
+ let state_snap = state_mu.lock().await.clone();
+ match v3::poll(
+ client,
+ cfg,
+ auth,
+ &state_snap,
+ self.payload_format,
+ self.config.include_metadata(),
+ )
+ .await
+ {
+ Ok(result) => {
+ if result.trip_circuit_breaker {
+ self.circuit_breaker.record_failure().await;
+ } else {
+ self.circuit_breaker.record_success();
+ }
- Ok(ProducedMessages {
- schema,
- messages,
- state: persisted_state,
- })
- }
- Err(e) => {
- // Only count transient/connectivity failures toward the
- // circuit breaker. PermanentHttpError (400, 401, etc.) are
- // config/data issues that retrying will not fix.
- if !matches!(e, Error::PermanentHttpError(_)) {
- self.circuit_breaker.record_failure().await;
+ let new = result.new_state;
+ let msg_count = result.messages.len();
+ let mut state = state_mu.lock().await;
+ *state = new;
+
+ if self.config.verbose_logging() {
+ info!(
+ "{CONNECTOR_NAME} ID: {} produced {} messages
(V3). \
+ Total: {}. Cursor: {:?}",
+ self.id, msg_count, state.processed_rows,
state.last_timestamp
+ );
+ } else {
+ debug!(
+ "{CONNECTOR_NAME} ID: {} produced {} messages
(V3). \
+ Total: {}. Cursor: {:?}",
+ self.id, msg_count, state.processed_rows,
state.last_timestamp
+ );
+ }
+
+ let persisted = ConnectorState::serialize(
+ &PersistedState::V3(V3State {
+ last_timestamp: state.last_timestamp.clone(),
+ processed_rows: state.processed_rows,
+ effective_batch_size:
state.effective_batch_size,
+ last_timestamp_row_offset:
state.last_timestamp_row_offset,
+ }),
+ CONNECTOR_NAME,
+ self.id,
+ );
+
+ Ok(ProducedMessages {
+ schema: result.schema,
+ messages: result.messages,
+ state: persisted,
+ })
+ }
+ Err(e) => self.handle_poll_error(e).await,
}
- error!(
- "InfluxDB source ID: {} poll failed: {e}. \
- Consecutive failures tracked by circuit breaker.",
- self.id
- );
- tokio::time::sleep(self.poll_interval).await;
- Err(e)
}
}
}
async fn close(&mut self) -> Result<(), Error> {
self.client = None;
- let state = self.state.lock().await;
+ let processed = match &self.version_state {
+ VersionState::V2(mu) => mu.lock().await.processed_rows,
+ VersionState::V3(mu) => mu.lock().await.processed_rows,
+ };
info!(
- "InfluxDB source connector ID: {} closed. Total rows processed:
{}",
- self.id, state.processed_rows
+ "{CONNECTOR_NAME} ID: {} closed. Total rows processed:
{processed}",
+ self.id
);
Ok(())
}
}
-// ---------------------------------------------------------------------------
-// Unit tests
-// ---------------------------------------------------------------------------
+impl InfluxDbSource {
+ async fn handle_poll_error(&self, e: Error) -> Result<ProducedMessages,
Error> {
+ if !matches!(e, Error::PermanentHttpError(_)) {
+ self.circuit_breaker.record_failure().await;
+ }
+ error!("{CONNECTOR_NAME} ID: {} poll failed: {e}", self.id);
+ tokio::time::sleep(self.poll_interval).await;
+ Err(e)
+ }
+}
+
+// ── Sort heuristic
────────────────────────────────────────────────────────────
+
+/// Return `true` if `query` contains a `sort(` call that is not part of a
longer
+/// identifier (e.g. `mysort(` is excluded; `|> sort(` and bare `sort(` are
included).
+/// Best-effort check: warns if `sort(` does not appear outside
+/// line comments. Not a full parser; documents its limitations.
+fn query_has_sort_call(query: &str) -> bool {
+ query.lines().any(|line| {
+ // Strip line comments before checking for sort(
+ let code = match line.find("//") {
+ Some(pos) => &line[..pos],
+ None => line,
+ };
+ // Find all occurrences of "sort(" and verify none are preceded by a
+ // word character (letter, digit, underscore) — that would mean the
+ // "sort" is part of a longer identifier like "mysort" or "do_sort".
+ let mut search = code;
+ while let Some(pos) = search.find("sort(") {
+ let preceded_by_word_char = search[..pos]
+ .chars()
+ .last()
+ .is_some_and(|c| c.is_alphanumeric() || c == '_');
+ if !preceded_by_word_char {
+ return true;
+ }
+ search = &search[pos + 5..]; // skip past "sort("
+ }
+ false
+ })
+}
+
+// ── V2 cursor advance logic
───────────────────────────────────────────────────
+
+/// Update V2 polling state after a successful poll.
+///
+/// V2 uses `>= $cursor` semantics, so the first batch after a cursor advance
+/// will include rows already delivered at the previous max timestamp. The
+/// `cursor_row_count` tracks how many such rows to skip on the next poll.
+///
+/// - New cursor → store it with the count of rows that landed at that
timestamp.
+/// - Same cursor → accumulate: more rows at this timestamp were delivered.
+/// - No new cursor (all skipped) → correct `cursor_row_count` to `skipped`
+/// so the skip counter reflects reality rather than a stale inflated value.
+fn apply_v2_cursor_advance(
+ state: &mut V2State,
+ max_cursor: Option<String>,
+ rows_at_max_cursor: u64,
+ skipped: u64,
+) {
+ if let Some(ref new_cursor) = max_cursor {
+ let should_advance = match state.last_timestamp.as_deref() {
Review Comment:
Done !
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]