Hi
This is with version 10.1.0 kafka streams (server running in remote and
streams app running local in my laptop).



I have a kafka stream pipeline like this

source topic(with 10 partitions) stream -> filter for null value ->map to
make it keyed by id ->custom processor to mystore(persistent)

I am getting the below exception. This happens when the flush happens.
If I restart the app the data i sent is actually present in rocksdb store.
I see the message of the keyed stream went to partition 0 on which flush
happened correctly i guess as I see below partition 9 task failed to flush
not sure about the complain about timestamp() here.

Can somebody explain what does this mean.


Not sure if it has something to do with below timestamp extractor property
i am setting or any other time like producer create time ???

        props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG,
ConsumerRecordTimestampExtractor.class);


Regards
Sai


2016-10-25 14:31:29.822000
org.apache.kafka.streams.processor.internals.StreamThread StreamThread-1
ERROR stream-thread [StreamThread-1] Failed to commit StreamTask 0_9 state:


org.apache.kafka.streams.errors.ProcessorStateException: task [0_9] Failed
to flush state store Products

at
org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:331)
~[kafka-streams-0.10.1.0.jar!/:?]

at
org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:275)
~[kafka-streams-0.10.1.0.jar!/:?]

at
org.apache.kafka.streams.processor.internals.StreamThread.commitOne(StreamThread.java:576)
[kafka-streams-0.10.1.0.jar!/:?]

at
org.apache.kafka.streams.processor.internals.StreamThread.commitAll(StreamThread.java:562)
[kafka-streams-0.10.1.0.jar!/:?]

at
org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:538)
[kafka-streams-0.10.1.0.jar!/:?]

at
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:456)
[kafka-streams-0.10.1.0.jar!/:?]

at
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:242)
[kafka-streams-0.10.1.0.jar!/:?]

Caused by: java.lang.IllegalStateException: This should not happen as
timestamp() should only be called while a record is processed

at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.timestamp(ProcessorContextImpl.java:192)
~[kafka-streams-0.10.1.0.jar!/:?]

at
org.apache.kafka.streams.state.internals.StoreChangeLogger.logChange(StoreChangeLogger.java:112)
~[kafka-streams-0.10.1.0.jar!/:?]

at
org.apache.kafka.streams.state.internals.RocksDBStore.flush(RocksDBStore.java:375)
~[kafka-streams-0.10.1.0.jar!/:?]

at
org.apache.kafka.streams.state.internals.MeteredKeyValueStore.flush(MeteredKeyValueStore.java:175)
~[kafka-streams-0.10.1.0.jar!/:?]

at
org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:329)
~[kafka-streams-0.10.1.0.jar!/:?]

... 6 more

Reply via email to