[ 
https://issues.apache.org/jira/browse/KAFKA-7112?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16525774#comment-16525774
 ] 

ASF GitHub Bot commented on KAFKA-7112:
---------------------------------------

guozhangwang opened a new pull request #5306: KAFKA-7112: Only resume 
restoration if state is still PARTITIONS_ASSIGNED after poll
URL: https://github.com/apache/kafka/pull/5306
 
 
   Before KIP-266, consumer.poll(0) would call 
`updateAssignmentMetadataIfNeeded(Long.MAX_VALUE)`, which makes sure that the 
rebalance is definitely completed, i.e. both onPartitionRevoked and 
onPartitionAssigned called within this `poll(0)`. After KIP-266, however, it is 
possible that only onPartitionRevoked will be called if timeout is elapsed. And 
hence we need to double check that state is still `PARTITIONS_ASSIGNED` after 
the `consumer.poll(duration)` call.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> StreamThread does not check for state again after pollRequests()
> ----------------------------------------------------------------
>
>                 Key: KAFKA-7112
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7112
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>            Reporter: Guozhang Wang
>            Priority: Major
>
> In StreamThread's main loop, we have:
> {code}
>         if (state == State.PARTITIONS_ASSIGNED) {
>             // try to fetch some records with zero poll millis
>             // to unblock the restoration as soon as possible
>             records = pollRequests(Duration.ZERO);
>             if (taskManager.updateNewAndRestoringTasks()) {
>                 setState(State.RUNNING);
>             }
>         }
> {code}
> in which we first check for state, and if it is {{PARTITIONS_ASSIGNED}} then 
> call `consumer.poll()` and then call 
> `askManager.updateNewAndRestoringTasks()`. There is a race condition though, 
> that if another rebalance gets triggered, then `onPartitionRevoked` will be 
> called in which we will {{restoreConsumer.unsubscribe();}}, and then if we 
> call {{taskManager.updateNewAndRestoringTasks()}} right away we will see this:
> {code}
> Encountered the following error during processing: 
> (org.apache.kafka.streams.processor.internals.StreamThread)
> java.lang.IllegalStateException: Consumer is not subscribed to any topics or 
> assigned any partitions
>         at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1159)
>         at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1150)
>         at 
> org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:83)
>         at 
> org.apache.kafka.streams.processor.internals.TaskManager.updateNewAndRestoringTasks(TaskManager.java:317)
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:808)
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:767)
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:736)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to