[ https://issues.apache.org/jira/browse/KAFKA-7112?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Rajini Sivaram resolved KAFKA-7112. ----------------------------------- Resolution: Fixed Fix Version/s: 2.0.0 > StreamThread does not check for state again after pollRequests() > ---------------------------------------------------------------- > > Key: KAFKA-7112 > URL: https://issues.apache.org/jira/browse/KAFKA-7112 > Project: Kafka > Issue Type: Bug > Components: streams > Reporter: Guozhang Wang > Assignee: Guozhang Wang > Priority: Major > Fix For: 2.0.0 > > > In StreamThread's main loop, we have: > {code} > if (state == State.PARTITIONS_ASSIGNED) { > // try to fetch some records with zero poll millis > // to unblock the restoration as soon as possible > records = pollRequests(Duration.ZERO); > if (taskManager.updateNewAndRestoringTasks()) { > setState(State.RUNNING); > } > } > {code} > in which we first check for state, and if it is {{PARTITIONS_ASSIGNED}} then > call `consumer.poll()` and then call > `askManager.updateNewAndRestoringTasks()`. There is a race condition though, > that if another rebalance gets triggered, then `onPartitionRevoked` will be > called in which we will {{restoreConsumer.unsubscribe();}}, and then if we > call {{taskManager.updateNewAndRestoringTasks()}} right away we will see this: > {code} > Encountered the following error during processing: > (org.apache.kafka.streams.processor.internals.StreamThread) > java.lang.IllegalStateException: Consumer is not subscribed to any topics or > assigned any partitions > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1159) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1150) > at > org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:83) > at > org.apache.kafka.streams.processor.internals.TaskManager.updateNewAndRestoringTasks(TaskManager.java:317) > at > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:808) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:767) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:736) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)