ryerraguntla commented on code in PR #3140:
URL: https://github.com/apache/iggy/pull/3140#discussion_r3142578309
##########
core/connectors/sources/influxdb_source/src/lib.rs:
##########
@@ -752,118 +293,235 @@ impl Source for InfluxDbSource {
});
}
- match self.poll_messages().await {
- Ok((messages, max_cursor, rows_at_max_cursor, skipped)) => {
- // Successful poll — reset circuit breaker
- self.circuit_breaker.record_success();
-
- let mut state = self.state.lock().await;
- state.last_poll_time = Utc::now();
- state.processed_rows += messages.len() as u64;
- match max_cursor {
- Some(ref new_cursor)
- if state.last_timestamp.as_deref() !=
Some(new_cursor.as_str()) =>
- {
- // Cursor advanced to a new timestamp — reset the row
counter.
- state.last_timestamp = max_cursor.clone();
- state.cursor_row_count = rows_at_max_cursor;
- }
- Some(_) => {
- // Cursor stayed at the same timestamp — accumulate so
the
- // next poll skips all already-delivered rows at this
timestamp.
- state.cursor_row_count =
-
state.cursor_row_count.saturating_add(rows_at_max_cursor);
- }
- None => {
- // No rows delivered. If we skipped some rows it means
- // every row in the result was at the current cursor
- // timestamp and had already been seen. `skipped` is
- // therefore the true row count at that timestamp for
- // this query result, so we correct cursor_row_count to
- // that value. This prevents a permanently-inflated
- // counter (e.g. after rows are deleted or compacted in
- // InfluxDB) from causing the skip logic to over-skip
on
- // every subsequent poll and stall the connector.
- if skipped > 0 {
- state.cursor_row_count = skipped;
+ let client = self.get_client()?;
+ let auth = self.auth_header.as_deref().ok_or_else(|| {
+ Error::Connection("auth_header not initialised — was open()
called?".to_string())
+ })?;
+
+ match &self.version_state {
+ VersionState::V2(state_mu) => {
+ let InfluxDbSourceConfig::V2(cfg) = &self.config else {
Review Comment:
Fixed
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]