[ 
https://issues.apache.org/jira/browse/KAFKA-4593?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16177048#comment-16177048
 ] 

ASF GitHub Bot commented on KAFKA-4593:
---------------------------------------

GitHub user mjsax opened a pull request:

    https://github.com/apache/kafka/pull/3948

    KAFKA-4593: Don't throw IllegalStateException and die on task migration

    

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/mjsax/kafka 
kafka-4593-illegal-state-exception-in-restore

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/kafka/pull/3948.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #3948
    
----
commit 7001cf72781b71459f3dbf852c529f237edf804a
Author: Matthias J. Sax <matth...@confluent.io>
Date:   2017-09-22T20:12:33Z

    KAFKA-4593: Don't throw IllegalStateException and die on task migration

----


> Task migration during rebalance callback process could lead the obsoleted 
> task's IllegalStateException
> ------------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-4593
>                 URL: https://issues.apache.org/jira/browse/KAFKA-4593
>             Project: Kafka
>          Issue Type: Sub-task
>          Components: streams
>            Reporter: Guozhang Wang
>            Assignee: Matthias J. Sax
>              Labels: infrastructure
>
> 1. Assume 2 running threads A and B, and one task t1 just for simplicity. 
> Thread A and B are on different machines so their local state dir are not 
> shared.
> 2. First rebalance is triggered, task t1 is assigned to A (B has no assigned 
> task).
> 3. During the first rebalance callback, task t1's state store need to be 
> restored on thread A, and this is called in "restoreActiveState" of 
> "createStreamTask".
> 4. Now suppose thread A has a long GC causing it to stall, a second rebalance 
> then will be triggered and kicked A out of the group; B gets the task t1 and 
> did the same restoration process, after the process thread B continues to 
> process data and update the state store, while at the same time writes more 
> messages to the changelog (so its log end offset has incremented).
> 5. After a while A resumes from the long GC, not knowing it has actually be 
> kicked out of the group and task t1 is no longer owned to itself, it 
> continues the restoration process but then realize that the log end offset 
> has advanced. When this happens, we will see the following exception on 
> thread A:
> {code}
> java.lang.IllegalStateException: task XXX Log end offset of
> YYY-table_stream-changelog-ZZ should not change while
> restoring: old end offset .., current offset ..
>         at
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.restoreActiveState(ProcessorStateManager.java:248)
>         at
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.register(ProcessorStateManager.java:201)
>         at
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.register(ProcessorContextImpl.java:122)
>         at
> org.apache.kafka.streams.state.internals.RocksDBWindowStore.init(RocksDBWindowStore.java:200)
>         at
> org.apache.kafka.streams.state.internals.MeteredWindowStore.init(MeteredWindowStore.java:65)
>         at
> org.apache.kafka.streams.state.internals.CachingWindowStore.init(CachingWindowStore.java:65)
>         at
> org.apache.kafka.streams.processor.internals.AbstractTask.initializeStateStores(AbstractTask.java:86)
>         at
> org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:120)
>         at
> org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:794)
>         at
> org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1222)
>         at
> org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1195)
>         at
> org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:897)
>         at
> org.apache.kafka.streams.processor.internals.StreamThread.access$500(StreamThread.java:71)
>         at
> org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:240)
>         at
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:230)
>         at
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:314)
>         at
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:278)
>         at
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:261)
>         at
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1039)
>         at
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1004)
>         at
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:570)
>         at
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:359)
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to