C0urante commented on a change in pull request #10907:
URL: https://github.com/apache/kafka/pull/10907#discussion_r654976449



##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
##########
@@ -337,15 +353,29 @@ private void readToLogEnd() {
                 } else {
                     log.trace("Behind end offset {} for {}; last-read offset 
is {}",
                             endOffset, topicPartition, lastConsumedOffset);
-                    poll(Integer.MAX_VALUE);
+                    if (topicContainsTransactions) {
+                        // The consumer won't return from its poll method if a 
transaction is aborted, even though
+                        // its position will advance. So, we poll for at most 
one second, then give ourselves another
+                        // chance to check whether we've reached the end of 
the topic.
+                        poll(1000);

Review comment:
       This causes a new consumer fetch request to be issued every second while 
there's an active read-to-end request for the log.
   
   If this is too expensive and ends up causing issues (either on the worker or 
on the broker), we might be able to retain the existing 
`poll(Integer.MAX_VALUE)` behavior by first producing a sentinel message to 
each topic partition we want to read to the end of. That way, even if a 
transaction that was initially blocking our read to the end of the partition is 
aborted, the call to `Consumer::poll` will return, since there will be a 
non-transactional message after it.
   
   Existing, older workers would be able to handle these sentinel messages 
gracefully (the only downside would be some `ERROR`-level log messages from the 
`KafkaConfigBackingStore` class upon encountering unrecognized in the config 
topic). However, we'd need to reason carefully about how this might work with 
multiple workers trying to read to the end of the same topic partitions at the 
same time.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to