Hi Sachin, This should not happen. The previous owner of the task should have stopped processing before the restoration begins. So if this is happening, then that signals a bug. Do you have more logs?
Thanks, Damian On Sat, 25 Mar 2017 at 08:20 Sachin Mittal <sjmit...@gmail.com> wrote: > Hi, > So recently we fixed the deadlock issue and also built the streams jar by > copying the rocks db configs from trunk. > So we don't get any deadlock issue now and also we see that the wait time > of CPU cores stays around 5% (down from 50% earlier). > > However we now get a new exception which is not handled by streams > application and causes the instance to shutdown. > > org.apache.kafka.streams.errors.StreamsException: stream-thread > [StreamThread-2] Failed to rebalance > at > > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:622) > ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na] > at > > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:378) > ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na] > Caused by: java.lang.IllegalStateException: task [0_9] Log end offset of > new-part-advice-key-table-changelog-9 should not change while restoring: > old end offset 647352, current offset 647632 > at > > org.apache.kafka.streams.processor.internals.ProcessorStateManager.restoreActiveState(ProcessorStateManager.java:240) > ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na] > at > > org.apache.kafka.streams.processor.internals.ProcessorStateManager.register(ProcessorStateManager.java:193) > ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na] > at > > org.apache.kafka.streams.processor.internals.AbstractProcessorContext.register(AbstractProcessorContext.java:99) > ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na] > at > > org.apache.kafka.streams.state.internals.RocksDBSegmentedBytesStore.init(RocksDBSegmentedBytesStore.java:101) > ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na] > at > > org.apache.kafka.streams.state.internals.ChangeLoggingSegmentedBytesStore.init(ChangeLoggingSegmentedBytesStore.java:68) > ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na] > at > > org.apache.kafka.streams.state.internals.MeteredSegmentedBytesStore.init(MeteredSegmentedBytesStore.java:66) > ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na] > at > > org.apache.kafka.streams.state.internals.RocksDBWindowStore.init(RocksDBWindowStore.java:76) > ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na] > at > > org.apache.kafka.streams.processor.internals.AbstractTask.initializeStateStores(AbstractTask.java:86) > ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na] > at > > org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:141) > ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na] > > What I check from logs is this > DEBUG 2017-03-25 02:07:24,499 [StreamThread-2]: > org.apache.kafka.streams.processor.internals.StreamThread - stream-thread > [StreamThread-2] creating new task 0_9 > So it creates the task at this time. > > To create the local state store from the chnagelog topic it starts at > > DEBUG 2017-03-25 02:07:24,550 [StreamThread-2]: > org.apache.kafka.clients.consumer.KafkaConsumer - Subscribed to > partition(s): new-part-advice-key-table-changelog-9 > DEBUG 2017-03-25 02:07:24,550 [StreamThread-2]: > org.apache.kafka.clients.consumer.KafkaConsumer - Seeking to end of > partition new-part-advice-key-table-changelog-9 > DEBUG 2017-03-25 02:07:24,550 [StreamThread-2]: > org.apache.kafka.clients.consumer.internals.Fetcher - Resetting offset for > partition new-part-advice-key-table-changelog-9 to latest offset. > DEBUG 2017-03-25 02:07:24,552 [StreamThread-2]: > org.apache.kafka.clients.consumer.internals.Fetcher - Handling > ListOffsetResponse response for new-part-advice-key-table-changelog-9. > Fetched offset 647352, timestamp -1 > DEBUG 2017-03-25 02:07:24,552 [StreamThread-2]: > org.apache.kafka.clients.consumer.KafkaConsumer - Seeking to beginning of > partition new-part-advice-key-table-changelog-9 > DEBUG 2017-03-25 02:07:24,552 [StreamThread-2]: > org.apache.kafka.clients.consumer.internals.Fetcher - Resetting offset for > partition new-part-advice-key-table-changelog-9 to earliest offset. > > and process is over at > DEBUG 2017-03-25 02:10:21,225 [StreamThread-2]: > org.apache.kafka.clients.consumer.internals.Fetcher - Sending fetch for > partitions [new-part-advice-key-table-changelog-9] to broker > 192.168.73.199:9092 (id: 5 rack: null) > DEBUG 2017-03-25 02:10:21,230 [StreamThread-2]: > org.apache.kafka.clients.consumer.KafkaConsumer - Unsubscribed all topics > or patterns and assigned partitions > > And the exception is thrown at: > ERROR 2017-03-25 02:10:21,232 [StreamThread-2]: > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - User > provided listener > org.apache.kafka.streams.processor.internals.StreamThread$1 for group > new-part-advice failed on partition assignment > java.lang.IllegalStateException: task [0_9] Log end offset of > new-part-advice-key-table-changelog-9 should not change while restoring: > old end offset 647352, current offset 647632 > > So you can clearly see that while restoring the state some other thread (on > same or other instance) did commit some more offset for this change long > partition so in the end of the process two offsets did not match. I think > this is fairly a reasonable scenario and while restoring the state it > should also consider any added offsets and not assume that this is the only > thread processing that partition. It may have been some other instance did > commit some more offsets while this thread is trying to restore the state. > > So I feel this exception should be handled and not thrown all the way to > the streams. > > What do you all think. > > Thanks > Sachin >