harangozop commented on code in PR #24654:
URL: https://github.com/apache/pulsar/pull/24654#discussion_r2292976598


##########
pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/AbstractKafkaConnectSource.java:
##########
@@ -179,20 +245,21 @@ public synchronized Record<T> read() throws Exception {
                 AbstractKafkaSourceRecord<T> processRecord = 
processSourceRecord(currentBatch.next());
                 if (processRecord == null || processRecord.isEmpty()) {
                     outstandingRecords.decrementAndGet();
-                    continue;
+                    // If the entire batch is filtered, flush offsets now so 
it won't block later
+                    triggerOffsetsFlushIfNeeded();
                 } else {
                     return processRecord;
                 }
             } else {
-                // there is no records any more, then waiting for the batch to 
complete writing
-                // to sink and the offsets are committed as well, then do next 
round read.
+                // No more records in this batch: wait for offsets to be 
committed before next batch
+                triggerOffsetsFlushIfNeeded();

Review Comment:
   For most of the cases calling `triggerOffsetsFlushIfNeeded` here is useless, 
won't do anything, since the flush is only executed when outstanding records 
reaches zero, no matter how many times you call this function. Alone itself 
won't decrease the outstanding records.
   
   That can happen on two paths:
   - when a record is filtered (no emit)
   - and when an emitted record is acked
    
   I'll remove the invocation here, and add more tests to cover:
   - partially acked -> shouldn't flush
   - all acked -> should flush
   - no ack -> shouldn't flush
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to