harangozop commented on code in PR #24654:
URL: https://github.com/apache/pulsar/pull/24654#discussion_r2292976598
##########
pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/AbstractKafkaConnectSource.java:
##########
@@ -179,20 +245,21 @@ public synchronized Record<T> read() throws Exception {
AbstractKafkaSourceRecord<T> processRecord =
processSourceRecord(currentBatch.next());
if (processRecord == null || processRecord.isEmpty()) {
outstandingRecords.decrementAndGet();
- continue;
+ // If the entire batch is filtered, flush offsets now so
it won't block later
+ triggerOffsetsFlushIfNeeded();
} else {
return processRecord;
}
} else {
- // there is no records any more, then waiting for the batch to
complete writing
- // to sink and the offsets are committed as well, then do next
round read.
+ // No more records in this batch: wait for offsets to be
committed before next batch
+ triggerOffsetsFlushIfNeeded();
Review Comment:
For most of the cases calling `triggerOffsetsFlushIfNeeded` here is useless,
won't do anything, since the flush is only executed when outstanding records
reaches zero, no matter how many times you call this function. Alone itself
won't decrease the outstanding records.
That can happen on two paths:
- when a record is filtered (no emit)
- and when an emitted record is acked
I'll remove the invocation here, and add more tests to cover:
- partially acked -> shouldn't flush
- all acked -> should flush
- no ack -> shouldn't flush
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]