ryerraguntla commented on code in PR #3140:
URL: https://github.com/apache/iggy/pull/3140#discussion_r3232895765


##########
core/connectors/sources/influxdb_source/src/lib.rs:
##########
@@ -678,70 +195,116 @@ impl Source for InfluxDbSource {
             return Err(Error::InvalidState);
         }
 
+        let ver = self.config.version_label();
         info!(
-            "Opening InfluxDB source connector with ID: {}. Org: {}",
-            self.id, self.config.org
+            "Opening {CONNECTOR_NAME} with ID: {} (version={ver})",
+            self.id
         );
 
-        // Build the raw client first and use it for the startup connectivity
-        // check. The connectivity retry loop uses separate delay bounds
-        // (open_retry_max_delay) from the per-query middleware retries, so
-        // we keep them independent.
-        let raw_client = self.build_raw_client()?;
+        validate_cursor_field(self.config.cursor_field(), 
self.config.version_label())?;
+        if let Some(offset) = self.config.initial_offset() {
+            validate_cursor(offset)?;
+        }
+
+        if let InfluxDbSourceConfig::V3(cfg) = &self.config
+            && let Some(cap) = cfg.stuck_batch_cap_factor
+            && cap > v3::MAX_STUCK_CAP_FACTOR
+        {
+            return Err(Error::InvalidConfigValue(format!(
+                "stuck_batch_cap_factor {cap} exceeds maximum of {}; \
+                 reduce it to avoid querying up to {}×batch_size rows per 
poll.",
+                v3::MAX_STUCK_CAP_FACTOR,
+                v3::MAX_STUCK_CAP_FACTOR
+            )));
+        }
+
+        if let InfluxDbSourceConfig::V3(_) = &self.config
+            && self.config.batch_size() == 0
+        {
+            return Err(Error::InvalidConfigValue(
+                "batch_size must be >= 1; got 0. \
+         A LIMIT 0 query would return no rows and stall the connector."
+                    .into(),
+            ));
+        }
 
-        // Validate cursor_field before touching the network: string comparison
-        // is only safe for timestamp columns. See validate_cursor_field for 
details.
-        Self::validate_cursor_field(self.cursor_field())?;
+        // Skip-N dedup for V2 requires rows to arrive sorted by time. If the 
Flux
+        // query lacks an explicit sort, InfluxDB may return rows in storage 
order,
+        // causing the dedup to skip the wrong rows silently.
+        if let InfluxDbSourceConfig::V2(cfg) = &self.config
+            && !query_has_sort_call(&cfg.query)
+        {
+            warn!(

Review Comment:
   Done!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to