[ 
https://issues.apache.org/jira/browse/KAFKA-4593?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15981970#comment-15981970
 ] 

Matthias J. Sax commented on KAFKA-4593:
----------------------------------------

This JIRA seems to relate to this one: 
https://issues.apache.org/jira/browse/KAFKA-3941 Any thought if we can close 
this as duplicate? [~guozhang] [~damianguy]

> Task migration during rebalance callback process could lead the obsoleted 
> task's IllegalStateException
> ------------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-4593
>                 URL: https://issues.apache.org/jira/browse/KAFKA-4593
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>            Reporter: Guozhang Wang
>              Labels: infrastructure
>
> 1. Assume 2 running threads A and B, and one task t1 jut for simplicity.
> 2. First rebalance is triggered, task t1 is assigned to A (B has no assigned 
> task).
> 3. During the first rebalance callback, task t1's state store need to be 
> restored on thread A, and this is called in "restoreActiveState" of 
> "createStreamTask".
> 4. Not suppose thread A has a long GC causing it to stall, a second rebalance 
> then will be triggered and kicked A out of the group; B gets the task t1 and 
> did the same restoration process, after the process thread B continues to 
> process data and update the state store, while at the same time writes more 
> messages to the changelog (so its log end offset has incremented).
> 5. After a while A resumes from the long GC, not knowing it has actually be 
> kicked out of the group and task t1 is no longer owned to itself, it 
> continues the restoration process but then realize that the log end offset 
> has advanced. When this happens, we will see the following exception on 
> thread A:
> {code}
> java.lang.IllegalStateException: task XXX Log end offset of
> YYY-table_stream-changelog-ZZ should not change while
> restoring: old end offset .., current offset ..
>         at
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.restoreActiveState(ProcessorStateManager.java:248)
>         at
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.register(ProcessorStateManager.java:201)
>         at
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.register(ProcessorContextImpl.java:122)
>         at
> org.apache.kafka.streams.state.internals.RocksDBWindowStore.init(RocksDBWindowStore.java:200)
>         at
> org.apache.kafka.streams.state.internals.MeteredWindowStore.init(MeteredWindowStore.java:65)
>         at
> org.apache.kafka.streams.state.internals.CachingWindowStore.init(CachingWindowStore.java:65)
>         at
> org.apache.kafka.streams.processor.internals.AbstractTask.initializeStateStores(AbstractTask.java:86)
>         at
> org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:120)
>         at
> org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:794)
>         at
> org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1222)
>         at
> org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1195)
>         at
> org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:897)
>         at
> org.apache.kafka.streams.processor.internals.StreamThread.access$500(StreamThread.java:71)
>         at
> org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:240)
>         at
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:230)
>         at
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:314)
>         at
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:278)
>         at
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:261)
>         at
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1039)
>         at
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1004)
>         at
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:570)
>         at
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:359)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to