Hi Sachin,

This should not happen. The previous owner of the task should have stopped
processing before the restoration begins. So if this is happening, then
that signals a bug. Do you have more logs?

Thanks,
Damian

On Sat, 25 Mar 2017 at 08:20 Sachin Mittal <sjmit...@gmail.com> wrote:

> Hi,
> So recently we fixed the deadlock issue and also built the streams jar by
> copying the rocks db configs from trunk.
> So we don't get any deadlock issue now and also we see that the wait time
> of CPU cores stays around 5% (down from 50% earlier).
>
> However we now get a new exception which is not handled by streams
> application and causes the instance to shutdown.
>
> org.apache.kafka.streams.errors.StreamsException: stream-thread
> [StreamThread-2] Failed to rebalance
>     at
>
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:622)
> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
>     at
>
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:378)
> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
> Caused by: java.lang.IllegalStateException: task [0_9] Log end offset of
> new-part-advice-key-table-changelog-9 should not change while restoring:
> old end offset 647352, current offset 647632
>     at
>
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.restoreActiveState(ProcessorStateManager.java:240)
> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
>     at
>
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.register(ProcessorStateManager.java:193)
> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
>     at
>
> org.apache.kafka.streams.processor.internals.AbstractProcessorContext.register(AbstractProcessorContext.java:99)
> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
>     at
>
> org.apache.kafka.streams.state.internals.RocksDBSegmentedBytesStore.init(RocksDBSegmentedBytesStore.java:101)
> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
>     at
>
> org.apache.kafka.streams.state.internals.ChangeLoggingSegmentedBytesStore.init(ChangeLoggingSegmentedBytesStore.java:68)
> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
>     at
>
> org.apache.kafka.streams.state.internals.MeteredSegmentedBytesStore.init(MeteredSegmentedBytesStore.java:66)
> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
>     at
>
> org.apache.kafka.streams.state.internals.RocksDBWindowStore.init(RocksDBWindowStore.java:76)
> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
>     at
>
> org.apache.kafka.streams.processor.internals.AbstractTask.initializeStateStores(AbstractTask.java:86)
> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
>     at
>
> org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:141)
> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
>
> What I check from logs is this
> DEBUG 2017-03-25 02:07:24,499 [StreamThread-2]:
> org.apache.kafka.streams.processor.internals.StreamThread - stream-thread
> [StreamThread-2] creating new task 0_9
> So it creates the task at this time.
>
> To create the local state store from the chnagelog topic it starts at
>
> DEBUG 2017-03-25 02:07:24,550 [StreamThread-2]:
> org.apache.kafka.clients.consumer.KafkaConsumer - Subscribed to
> partition(s): new-part-advice-key-table-changelog-9
> DEBUG 2017-03-25 02:07:24,550 [StreamThread-2]:
> org.apache.kafka.clients.consumer.KafkaConsumer - Seeking to end of
> partition new-part-advice-key-table-changelog-9
> DEBUG 2017-03-25 02:07:24,550 [StreamThread-2]:
> org.apache.kafka.clients.consumer.internals.Fetcher - Resetting offset for
> partition new-part-advice-key-table-changelog-9 to latest offset.
> DEBUG 2017-03-25 02:07:24,552 [StreamThread-2]:
> org.apache.kafka.clients.consumer.internals.Fetcher - Handling
> ListOffsetResponse response for new-part-advice-key-table-changelog-9.
> Fetched offset 647352, timestamp -1
> DEBUG 2017-03-25 02:07:24,552 [StreamThread-2]:
> org.apache.kafka.clients.consumer.KafkaConsumer - Seeking to beginning of
> partition new-part-advice-key-table-changelog-9
> DEBUG 2017-03-25 02:07:24,552 [StreamThread-2]:
> org.apache.kafka.clients.consumer.internals.Fetcher - Resetting offset for
> partition new-part-advice-key-table-changelog-9 to earliest offset.
>
> and process is over at
> DEBUG 2017-03-25 02:10:21,225 [StreamThread-2]:
> org.apache.kafka.clients.consumer.internals.Fetcher - Sending fetch for
> partitions [new-part-advice-key-table-changelog-9] to broker
> 192.168.73.199:9092 (id: 5 rack: null)
> DEBUG 2017-03-25 02:10:21,230 [StreamThread-2]:
> org.apache.kafka.clients.consumer.KafkaConsumer - Unsubscribed all topics
> or patterns and assigned partitions
>
> And the exception is thrown at:
> ERROR 2017-03-25 02:10:21,232 [StreamThread-2]:
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - User
> provided listener
> org.apache.kafka.streams.processor.internals.StreamThread$1 for group
> new-part-advice failed on partition assignment
> java.lang.IllegalStateException: task [0_9] Log end offset of
> new-part-advice-key-table-changelog-9 should not change while restoring:
> old end offset 647352, current offset 647632
>
> So you can clearly see that while restoring the state some other thread (on
> same or other instance) did commit some more offset for this change long
> partition so in the end of the process two offsets did not match. I think
> this is fairly a reasonable scenario and while restoring the state it
> should also consider any added offsets and not assume that this is the only
> thread processing that partition. It may have been some other instance did
> commit some more offsets while this thread is trying to restore the state.
>
> So I feel this exception should be handled and not thrown all the way to
> the streams.
>
> What do you all think.
>
> Thanks
> Sachin
>

Reply via email to