[ https://issues.apache.org/jira/browse/KAFKA-4593?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15981970#comment-15981970 ]
Matthias J. Sax commented on KAFKA-4593: ---------------------------------------- This JIRA seems to relate to this one: https://issues.apache.org/jira/browse/KAFKA-3941 Any thought if we can close this as duplicate? [~guozhang] [~damianguy] > Task migration during rebalance callback process could lead the obsoleted > task's IllegalStateException > ------------------------------------------------------------------------------------------------------ > > Key: KAFKA-4593 > URL: https://issues.apache.org/jira/browse/KAFKA-4593 > Project: Kafka > Issue Type: Bug > Components: streams > Reporter: Guozhang Wang > Labels: infrastructure > > 1. Assume 2 running threads A and B, and one task t1 jut for simplicity. > 2. First rebalance is triggered, task t1 is assigned to A (B has no assigned > task). > 3. During the first rebalance callback, task t1's state store need to be > restored on thread A, and this is called in "restoreActiveState" of > "createStreamTask". > 4. Not suppose thread A has a long GC causing it to stall, a second rebalance > then will be triggered and kicked A out of the group; B gets the task t1 and > did the same restoration process, after the process thread B continues to > process data and update the state store, while at the same time writes more > messages to the changelog (so its log end offset has incremented). > 5. After a while A resumes from the long GC, not knowing it has actually be > kicked out of the group and task t1 is no longer owned to itself, it > continues the restoration process but then realize that the log end offset > has advanced. When this happens, we will see the following exception on > thread A: > {code} > java.lang.IllegalStateException: task XXX Log end offset of > YYY-table_stream-changelog-ZZ should not change while > restoring: old end offset .., current offset .. > at > org.apache.kafka.streams.processor.internals.ProcessorStateManager.restoreActiveState(ProcessorStateManager.java:248) > at > org.apache.kafka.streams.processor.internals.ProcessorStateManager.register(ProcessorStateManager.java:201) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.register(ProcessorContextImpl.java:122) > at > org.apache.kafka.streams.state.internals.RocksDBWindowStore.init(RocksDBWindowStore.java:200) > at > org.apache.kafka.streams.state.internals.MeteredWindowStore.init(MeteredWindowStore.java:65) > at > org.apache.kafka.streams.state.internals.CachingWindowStore.init(CachingWindowStore.java:65) > at > org.apache.kafka.streams.processor.internals.AbstractTask.initializeStateStores(AbstractTask.java:86) > at > org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:120) > at > org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:794) > at > org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1222) > at > org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1195) > at > org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:897) > at > org.apache.kafka.streams.processor.internals.StreamThread.access$500(StreamThread.java:71) > at > org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:240) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:230) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:314) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:278) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:261) > at > org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1039) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1004) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:570) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:359) > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)