Just to give some more information, the ProcessorContext that gets passed to the init method of the custom store has a null RecordContext. Gave the following debug statement ..
println(context.asInstanceOf[ProcessorContextImpl].recordContext) and got null. regards. On Sat, Jul 1, 2017 at 9:41 PM, Debasish Ghosh <ghosh.debas...@gmail.com> wrote: > Hi - > > I have implemented a custom state store named BFStore with a change > logger as follows: > > class BFStoreChangeLogger[K, V](val storeName: String, > val context: ProcessorContext, > val partition: Int, > val serialization: StateSerdes[K, V]) { > > private val topic = > ProcessorStateManager.storeChangelogTopic(context.applicationId, > storeName) > private val collector = context.asInstanceOf[RecordCollector.Supplier]. > recordCollector > > def this(storeName: String, context: ProcessorContext, serialization: > StateSerdes[K, V]) { > this(storeName, context, context.taskId.partition, serialization) > } > > def logChange(key: K, value: V): Unit = { > if (collector != null) { > val keySerializer = serialization.keySerializer > val valueSerializer = serialization.valueSerializer > collector.send(this.topic, key, value, this.partition, > context.timestamp, keySerializer, valueSerializer) //**// > } > } > } > > In my driver program I build the topology and start the streams as follows: > > val builder: TopologyBuilder = new TopologyBuilder() > > builder.addSource("Source", config.fromTopic) > .addProcessor("Process", () => new WeblogProcessor(), "Source") > .addStateStore(new BFStoreSupplier[String](LOG_COUNT_STATE_STORE, > stringSerde, true, changelogConfig), "Process") > .addSink("Sink", "weblog-count-topic", "Process") > > val streams = new KafkaStreams(builder, streamingConfig) > streams.start() > > When I run the program, immediately I get the following exception .. > > Exception in thread "StreamThread-1" > org.apache.kafka.streams.errors.ProcessorStateException: > task [0_0] Failed to flush state store log-counts > at org.apache.kafka.streams.processor.internals. > ProcessorStateManager.flush(ProcessorStateManager.java:337) > at org.apache.kafka.streams.processor.internals. > StreamTask$1.run(StreamTask.java:72) > at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl. > measureLatencyNs(StreamsMetricsImpl.java:188) > at org.apache.kafka.streams.processor.internals. > StreamTask.commit(StreamTask.java:280) > at org.apache.kafka.streams.processor.internals.StreamThread.commitOne( > StreamThread.java:807) > at org.apache.kafka.streams.processor.internals.StreamThread.commitAll( > StreamThread.java:794) > at org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit( > StreamThread.java:769) > at org.apache.kafka.streams.processor.internals.StreamThread.runLoop( > StreamThread.java:647) > at org.apache.kafka.streams.processor.internals. > StreamThread.run(StreamThread.java:361) > *Caused by: java.lang.IllegalStateException: This should not happen as > timestamp() should only be called while a record is processed* > at org.apache.kafka.streams.processor.internals.AbstractProcessorContext. > timestamp(AbstractProcessorContext.java:150) > at com.lightbend.fdp.sample.kstream.processor. > BFStoreChangeLogger.logChange(BFStoreChangeLogger.scala:24) > at com.lightbend.fdp.sample.kstream.processor.BFStore. > flush(BFStore.scala:86) > at org.apache.kafka.streams.processor.internals. > ProcessorStateManager.flush(ProcessorStateManager.java:335) > ... 8 more > > Not sure I understand the whole trace but looks like this may be related > to https://issues.apache.org/jira/browse/KAFKA-5528. It comes from the > class BFStoreChangeLogger in the line I marked above with //**//. > > Any help / workaround will be appreciated .. > > regards. > -- > Debasish Ghosh > http://manning.com/ghosh2 > http://manning.com/ghosh > > Twttr: @debasishg > Blog: http://debasishg.blogspot.com > Code: http://github.com/debasishg > -- Debasish Ghosh http://manning.com/ghosh2 http://manning.com/ghosh Twttr: @debasishg Blog: http://debasishg.blogspot.com Code: http://github.com/debasishg