ryerraguntla commented on code in PR #3140:
URL: https://github.com/apache/iggy/pull/3140#discussion_r3142423388
##########
core/connectors/sources/influxdb_source/src/lib.rs:
##########
@@ -752,118 +293,235 @@ impl Source for InfluxDbSource {
});
}
- match self.poll_messages().await {
- Ok((messages, max_cursor, rows_at_max_cursor, skipped)) => {
- // Successful poll — reset circuit breaker
- self.circuit_breaker.record_success();
-
- let mut state = self.state.lock().await;
- state.last_poll_time = Utc::now();
- state.processed_rows += messages.len() as u64;
- match max_cursor {
- Some(ref new_cursor)
- if state.last_timestamp.as_deref() !=
Some(new_cursor.as_str()) =>
- {
- // Cursor advanced to a new timestamp — reset the row
counter.
- state.last_timestamp = max_cursor.clone();
- state.cursor_row_count = rows_at_max_cursor;
- }
- Some(_) => {
- // Cursor stayed at the same timestamp — accumulate so
the
- // next poll skips all already-delivered rows at this
timestamp.
- state.cursor_row_count =
-
state.cursor_row_count.saturating_add(rows_at_max_cursor);
- }
- None => {
- // No rows delivered. If we skipped some rows it means
- // every row in the result was at the current cursor
- // timestamp and had already been seen. `skipped` is
- // therefore the true row count at that timestamp for
- // this query result, so we correct cursor_row_count to
- // that value. This prevents a permanently-inflated
- // counter (e.g. after rows are deleted or compacted in
- // InfluxDB) from causing the skip logic to over-skip
on
- // every subsequent poll and stall the connector.
- if skipped > 0 {
- state.cursor_row_count = skipped;
+ let client = self.get_client()?;
+ let auth = self.auth_header.as_deref().ok_or_else(|| {
+ Error::Connection("auth_header not initialised — was open()
called?".to_string())
+ })?;
+
+ match &self.version_state {
+ VersionState::V2(state_mu) => {
+ let InfluxDbSourceConfig::V2(cfg) = &self.config else {
+ unreachable!("V2 state with non-V2 config")
+ };
+
+ let state_snap = state_mu.lock().await.clone();
+ match v2::poll(
+ client,
+ cfg,
+ auth,
+ &state_snap,
+ self.payload_format,
+ self.config.include_metadata(),
+ )
+ .await
+ {
+ Ok(result) => {
+ self.circuit_breaker.record_success();
+ let mut state = state_mu.lock().await;
+ state.processed_rows += result.messages.len() as u64;
+ apply_v2_cursor_advance(
+ &mut state,
+ result.max_cursor,
+ result.rows_at_max_cursor,
+ result.skipped,
+ );
+
+ if self.config.verbose_logging() {
+ info!(
+ "{CONNECTOR_NAME} ID: {} produced {} messages
(V2). \
+ Total: {}. Cursor: {:?}",
+ self.id,
+ result.messages.len(),
+ state.processed_rows,
+ state.last_timestamp
+ );
+ } else {
+ debug!(
+ "{CONNECTOR_NAME} ID: {} produced {} messages
(V2). \
+ Total: {}. Cursor: {:?}",
+ self.id,
+ result.messages.len(),
+ state.processed_rows,
+ state.last_timestamp
+ );
}
- }
- }
- if self.verbose {
- info!(
- "InfluxDB source ID: {} produced {} messages. \
- Total processed: {}. Cursor: {:?}",
- self.id,
- messages.len(),
- state.processed_rows,
- state.last_timestamp
- );
- } else {
- debug!(
- "InfluxDB source ID: {} produced {} messages. \
- Total processed: {}. Cursor: {:?}",
- self.id,
- messages.len(),
- state.processed_rows,
- state.last_timestamp
- );
+ let persisted = ConnectorState::serialize(
+ &PersistedState::V2(V2State {
+ last_timestamp: state.last_timestamp.clone(),
+ processed_rows: state.processed_rows,
+ cursor_row_count: state.cursor_row_count,
+ }),
+ CONNECTOR_NAME,
+ self.id,
+ );
+
+ Ok(ProducedMessages {
+ schema: result.schema,
+ messages: result.messages,
+ state: persisted,
+ })
+ }
+ Err(e) => self.handle_poll_error(e).await,
}
+ }
- let schema = if self.config.payload_column.is_some() {
- self.payload_format().schema()
- } else {
- Schema::Json
+ VersionState::V3(state_mu) => {
+ let InfluxDbSourceConfig::V3(cfg) = &self.config else {
+ unreachable!("V3 state with non-V3 config")
};
- let persisted_state = self.serialize_state(&state);
+ let state_snap = state_mu.lock().await.clone();
+ match v3::poll(
+ client,
+ cfg,
+ auth,
+ &state_snap,
+ self.payload_format,
+ self.config.include_metadata(),
+ )
+ .await
+ {
+ Ok(result) => {
+ if result.trip_circuit_breaker {
+ self.circuit_breaker.record_failure().await;
+ } else {
+ self.circuit_breaker.record_success();
+ }
+
+ let new = result.new_state;
+ let msg_count = result.messages.len();
+ let mut state = state_mu.lock().await;
+ *state = new;
+
+ if self.config.verbose_logging() {
+ info!(
+ "{CONNECTOR_NAME} ID: {} produced {} messages
(V3). \
+ Total: {}. Cursor: {:?}",
+ self.id, msg_count, state.processed_rows,
state.last_timestamp
+ );
+ } else {
+ debug!(
+ "{CONNECTOR_NAME} ID: {} produced {} messages
(V3). \
+ Total: {}. Cursor: {:?}",
+ self.id, msg_count, state.processed_rows,
state.last_timestamp
+ );
+ }
- Ok(ProducedMessages {
- schema,
- messages,
- state: persisted_state,
- })
- }
- Err(e) => {
- // Only count transient/connectivity failures toward the
- // circuit breaker. PermanentHttpError (400, 401, etc.) are
- // config/data issues that retrying will not fix.
- if !matches!(e, Error::PermanentHttpError(_)) {
- self.circuit_breaker.record_failure().await;
+ let persisted = ConnectorState::serialize(
+ &PersistedState::V3(V3State {
+ last_timestamp: state.last_timestamp.clone(),
+ processed_rows: state.processed_rows,
+ effective_batch_size:
state.effective_batch_size,
+ }),
+ CONNECTOR_NAME,
+ self.id,
+ );
+
+ Ok(ProducedMessages {
+ schema: result.schema,
+ messages: result.messages,
+ state: persisted,
+ })
+ }
+ Err(e) => self.handle_poll_error(e).await,
}
- error!(
- "InfluxDB source ID: {} poll failed: {e}. \
- Consecutive failures tracked by circuit breaker.",
- self.id
- );
- tokio::time::sleep(self.poll_interval).await;
- Err(e)
}
}
}
async fn close(&mut self) -> Result<(), Error> {
self.client = None;
- let state = self.state.lock().await;
+ let processed = match &self.version_state {
+ VersionState::V2(mu) => mu.lock().await.processed_rows,
+ VersionState::V3(mu) => mu.lock().await.processed_rows,
+ };
info!(
- "InfluxDB source connector ID: {} closed. Total rows processed:
{}",
- self.id, state.processed_rows
+ "{CONNECTOR_NAME} ID: {} closed. Total rows processed:
{processed}",
+ self.id
);
Ok(())
}
}
-// ---------------------------------------------------------------------------
-// Unit tests
-// ---------------------------------------------------------------------------
+impl InfluxDbSource {
+ async fn handle_poll_error(&self, e: Error) -> Result<ProducedMessages,
Error> {
+ if !matches!(e, Error::PermanentHttpError(_)) {
+ self.circuit_breaker.record_failure().await;
+ }
+ error!("{CONNECTOR_NAME} ID: {} poll failed: {e}", self.id);
+ tokio::time::sleep(self.poll_interval).await;
+ Err(e)
+ }
+}
+
+// ── Sort heuristic
────────────────────────────────────────────────────────────
+
+/// Return `true` if `query` contains a `sort(` call that is not part of a
longer
+/// identifier (e.g. `mysort(` is excluded; `|> sort(` and bare `sort(` are
included).
+fn query_has_sort_call(query: &str) -> bool {
+ let needle = "sort(";
+ let mut search = query;
+ while let Some(idx) = search.find(needle) {
+ // Word-boundary check: the character immediately before `sort` must
not be
+ // an ASCII alphanumeric or underscore (which would make it part of a
longer name).
+ let abs_idx = query.len() - search.len() + idx;
+ let prev = if abs_idx == 0 {
+ None
+ } else {
+ query.as_bytes().get(abs_idx - 1).copied()
+ };
+ let is_word_start = prev.is_none_or(|b| !b.is_ascii_alphanumeric() &&
b != b'_');
+ if is_word_start {
+ return true;
+ }
+ search = &search[idx + needle.len()..];
+ }
+ false
+}
+
+// ── V2 cursor advance logic
───────────────────────────────────────────────────
+
+/// Update V2 polling state after a successful poll.
+///
+/// V2 uses `>= $cursor` semantics, so the first batch after a cursor advance
+/// will include rows already delivered at the previous max timestamp. The
+/// `cursor_row_count` tracks how many such rows to skip on the next poll.
+///
+/// - New cursor → store it with the count of rows that landed at that
timestamp.
+/// - Same cursor → accumulate: more rows at this timestamp were delivered.
+/// - No new cursor (all skipped) → correct `cursor_row_count` to `skipped`
+/// so the skip counter reflects reality rather than a stale inflated value.
+fn apply_v2_cursor_advance(
+ state: &mut V2State,
+ max_cursor: Option<String>,
+ rows_at_max_cursor: u64,
+ skipped: u64,
+) {
+ match max_cursor {
+ Some(ref new_cursor) if state.last_timestamp.as_deref() !=
Some(new_cursor.as_str()) => {
Review Comment:
Fixed
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]