Justinwins created KAFKA-14131: ---------------------------------- Summary: KafkaBasedLog#readToLogEnd() may accciedently falls into infinite loop Key: KAFKA-14131 URL: https://issues.apache.org/jira/browse/KAFKA-14131 Project: Kafka Issue Type: Bug Components: mirrormaker Reporter: Justinwins
When a herder starts ,its KafkaOffsetBackingStore will readToLogEnd() by DistributedHerder.herderExecutor of name "Distrubuted-connect-" thread , e.g . Distrubuted-connect-28-1 , which may consume a few minutes. If another thread tries to shut down this herder , it will block for "task.shutdown.graceful.timeout.ms ' before the DistributedHerder.herderExecutor is interrupted. And if thread in DistributedHerder.herderExecutor is interupted, KafkaOffsetBackingStore.readToLogEnd() will poll(Integer.MAX_VALUE) and log " Error polling" as the the read has been interrupted, then "consumer.position" will not advance, readToLogEnd() falls into infinite loop. ``` private void readToLogEnd() { Set<TopicPartition> assignment = consumer.assignment(); Map<TopicPartition, Long> endOffsets = readEndOffsets(assignment); log.trace("Reading to end of log offsets {}", endOffsets); while (!endOffsets.isEmpty()) { // this loop will never jump out Iterator<Map.Entry<TopicPartition, Long>> it = endOffsets.entrySet().iterator(); while (it.hasNext()) { Map.Entry<TopicPartition, Long> entry = it.next(); TopicPartition topicPartition = entry.getKey(); long endOffset = entry.getValue(); long lastConsumedOffset = consumer.position(topicPartition); // when thread was in interupted status ,consumer.position will not advance if (lastConsumedOffset >= endOffset) { log.trace("Read to end offset {} for {}", endOffset, topicPartition); it.remove(); } else { log.trace("Behind end offset {} for {}; last-read offset is {}", endOffset, topicPartition, lastConsumedOffset); poll(Integer.MAX_VALUE); // here , poll() will catch InterruptedException and log it without throwing it up break; } } } } ``` -- This message was sent by Atlassian Jira (v8.20.10#820010)