lhotari commented on code in PR #24654:
URL: https://github.com/apache/pulsar/pull/24654#discussion_r2290572664


##########
pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/AbstractKafkaConnectSource.java:
##########
@@ -179,20 +245,21 @@ public synchronized Record<T> read() throws Exception {
                 AbstractKafkaSourceRecord<T> processRecord = 
processSourceRecord(currentBatch.next());
                 if (processRecord == null || processRecord.isEmpty()) {
                     outstandingRecords.decrementAndGet();
-                    continue;
+                    // If the entire batch is filtered, flush offsets now so 
it won't block later
+                    triggerOffsetsFlushIfNeeded();
                 } else {
                     return processRecord;
                 }
             } else {
-                // there is no records any more, then waiting for the batch to 
complete writing
-                // to sink and the offsets are committed as well, then do next 
round read.
+                // No more records in this batch: wait for offsets to be 
committed before next batch
+                triggerOffsetsFlushIfNeeded();

Review Comment:
   It seems that this changes the previous logic since it would only trigger 
the flush when "ack" has been called. I guess the existing tests don't cover 
this. I think it's essential so that records aren't lost if the processing 
isn't acknowledged by calling `ack`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to