Yes this is similar meaning it was all about KafkaStreams not started correctly in my spring app and NOT a bug in KafkaStreams. Inside the comments in the JIRA I have mentioned what I was doing wrong.
These type of exceptions largely indicate kafka streams was not started correctly Thanks for your valuable time on this Regards Sai On Wed, Oct 26, 2016 at 2:34 PM, Guozhang Wang <wangg...@gmail.com> wrote: > Is it a similar report as https://issues.apache.org/jira/browse/KAFKA-4344 > ? > > On Tue, Oct 25, 2016 at 2:43 PM, saiprasad mishra < > saiprasadmis...@gmail.com > > wrote: > > > Hi > > This is with version 10.1.0 kafka streams (server running in remote and > > streams app running local in my laptop). > > > > > > > > I have a kafka stream pipeline like this > > > > source topic(with 10 partitions) stream -> filter for null value ->map to > > make it keyed by id ->custom processor to mystore(persistent) > > > > I am getting the below exception. This happens when the flush happens. > > If I restart the app the data i sent is actually present in rocksdb > store. > > I see the message of the keyed stream went to partition 0 on which flush > > happened correctly i guess as I see below partition 9 task failed to > flush > > not sure about the complain about timestamp() here. > > > > Can somebody explain what does this mean. > > > > > > Not sure if it has something to do with below timestamp extractor > property > > i am setting or any other time like producer create time ??? > > > > props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, > > ConsumerRecordTimestampExtractor.class); > > > > > > Regards > > Sai > > > > > > 2016-10-25 14:31:29.822000 > > org.apache.kafka.streams.processor.internals.StreamThread StreamThread-1 > > ERROR stream-thread [StreamThread-1] Failed to commit StreamTask 0_9 > state: > > > > > > org.apache.kafka.streams.errors.ProcessorStateException: task [0_9] > Failed > > to flush state store Products > > > > at > > org.apache.kafka.streams.processor.internals. > ProcessorStateManager.flush( > > ProcessorStateManager.java:331) > > ~[kafka-streams-0.10.1.0.jar!/:?] > > > > at > > org.apache.kafka.streams.processor.internals. > StreamTask.commit(StreamTask. > > java:275) > > ~[kafka-streams-0.10.1.0.jar!/:?] > > > > at > > org.apache.kafka.streams.processor.internals.StreamThread.commitOne( > > StreamThread.java:576) > > [kafka-streams-0.10.1.0.jar!/:?] > > > > at > > org.apache.kafka.streams.processor.internals.StreamThread.commitAll( > > StreamThread.java:562) > > [kafka-streams-0.10.1.0.jar!/:?] > > > > at > > org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit( > > StreamThread.java:538) > > [kafka-streams-0.10.1.0.jar!/:?] > > > > at > > org.apache.kafka.streams.processor.internals.StreamThread.runLoop( > > StreamThread.java:456) > > [kafka-streams-0.10.1.0.jar!/:?] > > > > at > > org.apache.kafka.streams.processor.internals. > > StreamThread.run(StreamThread.java:242) > > [kafka-streams-0.10.1.0.jar!/:?] > > > > Caused by: java.lang.IllegalStateException: This should not happen as > > timestamp() should only be called while a record is processed > > > > at > > org.apache.kafka.streams.processor.internals.ProcessorContextImpl. > > timestamp(ProcessorContextImpl.java:192) > > ~[kafka-streams-0.10.1.0.jar!/:?] > > > > at > > org.apache.kafka.streams.state.internals.StoreChangeLogger.logChange( > > StoreChangeLogger.java:112) > > ~[kafka-streams-0.10.1.0.jar!/:?] > > > > at > > org.apache.kafka.streams.state.internals.RocksDBStore. > > flush(RocksDBStore.java:375) > > ~[kafka-streams-0.10.1.0.jar!/:?] > > > > at > > org.apache.kafka.streams.state.internals.MeteredKeyValueStore.flush( > > MeteredKeyValueStore.java:175) > > ~[kafka-streams-0.10.1.0.jar!/:?] > > > > at > > org.apache.kafka.streams.processor.internals. > ProcessorStateManager.flush( > > ProcessorStateManager.java:329) > > ~[kafka-streams-0.10.1.0.jar!/:?] > > > > ... 6 more > > > > > > -- > -- Guozhang >