C0urante commented on a change in pull request #10907: URL: https://github.com/apache/kafka/pull/10907#discussion_r654976449
########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java ########## @@ -337,15 +353,29 @@ private void readToLogEnd() { } else { log.trace("Behind end offset {} for {}; last-read offset is {}", endOffset, topicPartition, lastConsumedOffset); - poll(Integer.MAX_VALUE); + if (topicContainsTransactions) { + // The consumer won't return from its poll method if a transaction is aborted, even though + // its position will advance. So, we poll for at most one second, then give ourselves another + // chance to check whether we've reached the end of the topic. + poll(1000); Review comment: This causes a new consumer fetch request to be issued every second while there's an active read-to-end request for the log. If this is too expensive and ends up causing issues (either on the worker or on the broker), we might be able to retain the existing `poll(Integer.MAX_VALUE)` behavior by first producing a sentinel message to each topic partition we want to read to the end of. That way, even if a transaction that was initially blocking our read to the end of the partition is aborted, the call to `Consumer::poll` will return, since there will be a non-transactional message after it. Existing, older workers would be able to handle these sentinel messages gracefully (the only downside would be some `ERROR`-level log messages from the `KafkaConfigBackingStore` class upon encountering unrecognized in the config topic). However, we'd need to reason carefully about how this might work with multiple workers trying to read to the end of the same topic partitions at the same time. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org