Just to give some more information, the ProcessorContext that gets passed
to the init method of the custom store has a null RecordContext. Gave the
following debug statement ..

println(context.asInstanceOf[ProcessorContextImpl].recordContext)

and got null.

regards.

On Sat, Jul 1, 2017 at 9:41 PM, Debasish Ghosh <ghosh.debas...@gmail.com>
wrote:

> Hi -
>
> I have implemented a custom state store named BFStore with a change
> logger as follows:
>
> class BFStoreChangeLogger[K, V](val storeName: String,
>                                 val context: ProcessorContext,
>                                 val partition: Int,
>                                 val serialization: StateSerdes[K, V]) {
>
>   private val topic = 
> ProcessorStateManager.storeChangelogTopic(context.applicationId,
> storeName)
>   private val collector = context.asInstanceOf[RecordCollector.Supplier].
> recordCollector
>
>   def this(storeName: String, context: ProcessorContext, serialization:
> StateSerdes[K, V]) {
>     this(storeName, context, context.taskId.partition, serialization)
>   }
>
>   def logChange(key: K, value: V): Unit = {
>     if (collector != null) {
>       val keySerializer = serialization.keySerializer
>       val valueSerializer = serialization.valueSerializer
>       collector.send(this.topic, key, value, this.partition,
> context.timestamp, keySerializer, valueSerializer)  //**//
>     }
>   }
> }
>
> In my driver program I build the topology and start the streams as follows:
>
> val builder: TopologyBuilder = new TopologyBuilder()
>
> builder.addSource("Source", config.fromTopic)
>        .addProcessor("Process", () => new WeblogProcessor(), "Source")
>        .addStateStore(new BFStoreSupplier[String](LOG_COUNT_STATE_STORE,
> stringSerde, true, changelogConfig), "Process")
>        .addSink("Sink", "weblog-count-topic", "Process")
>
> val streams = new KafkaStreams(builder, streamingConfig)
> streams.start()
>
> When I run the program, immediately I get the following exception ..
>
> Exception in thread "StreamThread-1" 
> org.apache.kafka.streams.errors.ProcessorStateException:
> task [0_0] Failed to flush state store log-counts
> at org.apache.kafka.streams.processor.internals.
> ProcessorStateManager.flush(ProcessorStateManager.java:337)
> at org.apache.kafka.streams.processor.internals.
> StreamTask$1.run(StreamTask.java:72)
> at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.
> measureLatencyNs(StreamsMetricsImpl.java:188)
> at org.apache.kafka.streams.processor.internals.
> StreamTask.commit(StreamTask.java:280)
> at org.apache.kafka.streams.processor.internals.StreamThread.commitOne(
> StreamThread.java:807)
> at org.apache.kafka.streams.processor.internals.StreamThread.commitAll(
> StreamThread.java:794)
> at org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(
> StreamThread.java:769)
> at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> StreamThread.java:647)
> at org.apache.kafka.streams.processor.internals.
> StreamThread.run(StreamThread.java:361)
> *Caused by: java.lang.IllegalStateException: This should not happen as
> timestamp() should only be called while a record is processed*
> at org.apache.kafka.streams.processor.internals.AbstractProcessorContext.
> timestamp(AbstractProcessorContext.java:150)
> at com.lightbend.fdp.sample.kstream.processor.
> BFStoreChangeLogger.logChange(BFStoreChangeLogger.scala:24)
> at com.lightbend.fdp.sample.kstream.processor.BFStore.
> flush(BFStore.scala:86)
> at org.apache.kafka.streams.processor.internals.
> ProcessorStateManager.flush(ProcessorStateManager.java:335)
> ... 8 more
>
> Not sure I understand the whole trace but looks like this may be related
> to https://issues.apache.org/jira/browse/KAFKA-5528. It comes from the
> class BFStoreChangeLogger in the line I marked above with //**//.
>
> Any help / workaround will be appreciated ..
>
> regards.
> --
> Debasish Ghosh
> http://manning.com/ghosh2
> http://manning.com/ghosh
>
> Twttr: @debasishg
> Blog: http://debasishg.blogspot.com
> Code: http://github.com/debasishg
>



-- 
Debasish Ghosh
http://manning.com/ghosh2
http://manning.com/ghosh

Twttr: @debasishg
Blog: http://debasishg.blogspot.com
Code: http://github.com/debasishg

Reply via email to