lhotari commented on code in PR #24654:
URL: https://github.com/apache/pulsar/pull/24654#discussion_r2290572664
##########
pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/AbstractKafkaConnectSource.java:
##########
@@ -179,20 +245,21 @@ public synchronized Record<T> read() throws Exception {
AbstractKafkaSourceRecord<T> processRecord =
processSourceRecord(currentBatch.next());
if (processRecord == null || processRecord.isEmpty()) {
outstandingRecords.decrementAndGet();
- continue;
+ // If the entire batch is filtered, flush offsets now so
it won't block later
+ triggerOffsetsFlushIfNeeded();
} else {
return processRecord;
}
} else {
- // there is no records any more, then waiting for the batch to
complete writing
- // to sink and the offsets are committed as well, then do next
round read.
+ // No more records in this batch: wait for offsets to be
committed before next batch
+ triggerOffsetsFlushIfNeeded();
Review Comment:
It seems that this changes the previous logic since it would only trigger
the flush when "ack" has been called. I guess the existing tests don't cover
this. I think it's essential so that records aren't lost if the processing
isn't acknowledged by calling `ack`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]