Hi This is with version 10.1.0 kafka streams (server running in remote and streams app running local in my laptop).
I have a kafka stream pipeline like this source topic(with 10 partitions) stream -> filter for null value ->map to make it keyed by id ->custom processor to mystore(persistent) I am getting the below exception. This happens when the flush happens. If I restart the app the data i sent is actually present in rocksdb store. I see the message of the keyed stream went to partition 0 on which flush happened correctly i guess as I see below partition 9 task failed to flush not sure about the complain about timestamp() here. Can somebody explain what does this mean. Not sure if it has something to do with below timestamp extractor property i am setting or any other time like producer create time ??? props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, ConsumerRecordTimestampExtractor.class); Regards Sai 2016-10-25 14:31:29.822000 org.apache.kafka.streams.processor.internals.StreamThread StreamThread-1 ERROR stream-thread [StreamThread-1] Failed to commit StreamTask 0_9 state: org.apache.kafka.streams.errors.ProcessorStateException: task [0_9] Failed to flush state store Products at org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:331) ~[kafka-streams-0.10.1.0.jar!/:?] at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:275) ~[kafka-streams-0.10.1.0.jar!/:?] at org.apache.kafka.streams.processor.internals.StreamThread.commitOne(StreamThread.java:576) [kafka-streams-0.10.1.0.jar!/:?] at org.apache.kafka.streams.processor.internals.StreamThread.commitAll(StreamThread.java:562) [kafka-streams-0.10.1.0.jar!/:?] at org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:538) [kafka-streams-0.10.1.0.jar!/:?] at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:456) [kafka-streams-0.10.1.0.jar!/:?] at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:242) [kafka-streams-0.10.1.0.jar!/:?] Caused by: java.lang.IllegalStateException: This should not happen as timestamp() should only be called while a record is processed at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.timestamp(ProcessorContextImpl.java:192) ~[kafka-streams-0.10.1.0.jar!/:?] at org.apache.kafka.streams.state.internals.StoreChangeLogger.logChange(StoreChangeLogger.java:112) ~[kafka-streams-0.10.1.0.jar!/:?] at org.apache.kafka.streams.state.internals.RocksDBStore.flush(RocksDBStore.java:375) ~[kafka-streams-0.10.1.0.jar!/:?] at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.flush(MeteredKeyValueStore.java:175) ~[kafka-streams-0.10.1.0.jar!/:?] at org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:329) ~[kafka-streams-0.10.1.0.jar!/:?] ... 6 more