Remove the` logChange` from `flush` and do it when you write to the store.

i.e, in the BFStore + function

> Ok, so I make the following change .. Is this the change that u suggested ?
> // remove commit from process(). So process now looks as follows:
> override def process(dummy: String, record: String): Unit =
> LogParseUtil.parseLine(record) match {
>   case Success(r) => {
>     bfStore +
>     bfStore.flush()
>   }
>   case Failure(ex) => throw ex
> }
> Still I get the same exception. Just as a test, I removed the flush as
> well from process() ..
> override def process(dummy: String, record: String): Unit =
> LogParseUtil.parseLine(record) match {
>   case Success(r) => {
>     bfStore +
>   }
>   case Failure(ex) => throw ex
> }
> and still get the same exception as it does call flush after commit from
> within .. here's the trace ..
> Exception in thread "StreamThread-1"
> org.apache.kafka.streams.errors.ProcessorStateException: task [0_0] Failed
> to flush state store log-counts
> at
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(
> at
> org.apache.kafka.streams.processor.internals.StreamTask$
> at
> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(
> at
> org.apache.kafka.streams.processor.internals.StreamTask.commit(
> at
> org.apache.kafka.streams.processor.internals.StreamThread.commitOne(
> at
> org.apache.kafka.streams.processor.internals.StreamThread.commitAll(
> at
> org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(
> at
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> at
> Caused by: java.lang.IllegalStateException: This should not happen as
> timestamp() should only be called while a record is processed
> at
> org.apache.kafka.streams.processor.internals.AbstractProcessorContext.timestamp(
> at
> com.lightbend.fdp.sample.kstream.processor.BFStoreChangeLogger.logChange(BFStoreChangeLogger.scala:25)
> at
> com.lightbend.fdp.sample.kstream.processor.BFStore.flush(BFStore.scala:89)
> at
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(
>> `commit` is called by streams, you can see it in your stack trace above:
>> >
>> org.apache.kafka.streams.processor.internals.StreamTask.commit(
>> `commit` will subsequently call `flush` on any stores. At this point,
>> though, there will be no `RecordContext` as there are no records being
>> processed. Note, that calling `context.commit()` from your Processor isn't
>> actually performing the commit, it is just signalling that a commit is
>> necessary after this record has been processed. You may not want to do that
>> as it probably will impact throughput.
>> You should log the change when you write to the store, i.e, i think when
>> you do:
>> bfStore +
>> Does that help?
>>> The only place where I am doing commit is from Processor.process() ..
>>> Here it is ..
>>> class WeblogProcessor extends AbstractProcessor[String, String] {
>>>   private var bfStore: BFStore[String] = _
>>>   override def init(context: ProcessorContext): Unit = {
>>>     super.init(context)
>>>     this.context.schedule(1000)
>>>     bfStore =
>>> this.context.getStateStore(WeblogDriver.LOG_COUNT_STATE_STORE).asInstanceOf[BFStore[String]]
>>>   }
>>>   override def process(dummy: String, record: String): Unit =
>>> LogParseUtil.parseLine(record) match {
>>>     case Success(r) => {
>>>       bfStore +
>>>       context.commit()
>>>       context.forward(dummy,
>>>     }
>>>     case Failure(ex) => throw ex
>>>   }
>>>   override def punctuate(timestamp: Long): Unit =
>>> super.punctuate(timestamp)
>>>   override def close(): Unit = {}
>>> }
>>> The commit invokes the flush() of my Store. Here is the flush() method
>>> of my store ..
>>> override def flush(): Unit = {
>>>   if (loggingEnabled) {
>>>     changeLogger.logChange(changelogKey, bf
>>>   }
>>> }
>>> which in turn calls logChange that gives the error.
>>> Am I missing something ?
>>>> Hi,
>>>> It is because you are calling `context.timestamp` during `commit`. At
>>>> this point there is no `RecordContext` associated with the
>>>> `ProcessorContext`, hence the null pointer. The `RecordContext` is only set
>>>> when streams is processing a record. You probably want to log the change
>>>> when you write to the store.
>>>>> Just to give some more information, the ProcessorContext that gets
>>>>> passed
>>>>> to the init method of the custom store has a null RecordContext. Gave
>>>>> the
>>>>> following debug statement ..
>>>>> println(context.asInstanceOf[ProcessorContextImpl].recordContext)
>>>>> and got null.
>>>>> > Hi -
>>>>> >
>>>>> > I have implemented a custom state store named BFStore with a change
>>>>> > logger as follows:
>>>>> >
>>>>> > class BFStoreChangeLogger[K, V](val storeName: String,
>>>>> >                                 val context: ProcessorContext,
>>>>> >                                 val partition: Int,
>>>>> >                                 val serialization: StateSerdes[K,
>>>>> V]) {
>>>>> >
>>>>> >   private val topic =
>>>>> ProcessorStateManager.storeChangelogTopic(context.applicationId,
>>>>> > storeName)
>>>>> >   private val collector =
>>>>> context.asInstanceOf[RecordCollector.Supplier].
>>>>> > recordCollector
>>>>> >
>>>>> >   def this(storeName: String, context: ProcessorContext,
>>>>> serialization:
>>>>> > StateSerdes[K, V]) {
>>>>> >     this(storeName, context, context.taskId.partition, serialization)
>>>>> >   }
>>>>> >
>>>>> >   def logChange(key: K, value: V): Unit = {
>>>>> >     if (collector != null) {
>>>>> >       val keySerializer = serialization.keySerializer
>>>>> >       val valueSerializer = serialization.valueSerializer
>>>>> >       collector.send(this.topic, key, value, this.partition,
>>>>> > context.timestamp, keySerializer, valueSerializer)  //**//
>>>>> >     }
>>>>> >   }
>>>>> > }
>>>>> >
>>>>> > In my driver program I build the topology and start the streams as
>>>>> follows:
>>>>> >
>>>>> > val builder: TopologyBuilder = new TopologyBuilder()
>>>>> >
>>>>> > builder.addSource("Source", config.fromTopic)
>>>>> >        .addProcessor("Process", () => new WeblogProcessor(),
>>>>> "Source")
>>>>> >        .addStateStore(new
>>>>> BFStoreSupplier[String](LOG_COUNT_STATE_STORE,
>>>>> > stringSerde, true, changelogConfig), "Process")
>>>>> >        .addSink("Sink", "weblog-count-topic", "Process")
>>>>> >
>>>>> > val streams = new KafkaStreams(builder, streamingConfig)
>>>>> > streams.start()
>>>>> >
>>>>> > When I run the program, immediately I get the following exception ..
>>>>> >
>>>>> > Exception in thread "StreamThread-1"
>>>>> org.apache.kafka.streams.errors.ProcessorStateException:
>>>>> > task [0_0] Failed to flush state store log-counts
>>>>> > at org.apache.kafka.streams.processor.internals.
>>>>> > ProcessorStateManager.flush(
>>>>> > at org.apache.kafka.streams.processor.internals.
>>>>> > StreamTask$
>>>>> > at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.
>>>>> > measureLatencyNs(
>>>>> > at org.apache.kafka.streams.processor.internals.
>>>>> > StreamTask.commit(
>>>>> > at
>>>>> org.apache.kafka.streams.processor.internals.StreamThread.commitOne(
>>>>> >
>>>>> > at
>>>>> org.apache.kafka.streams.processor.internals.StreamThread.commitAll(
>>>>> >
>>>>> > at
>>>>> org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(
>>>>> >
>>>>> > at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
>>>>> >
>>>>> > at org.apache.kafka.streams.processor.internals.
>>>>> >
>>>>> > *Caused by: java.lang.IllegalStateException: This should not happen
>>>>> as
>>>>> > timestamp() should only be called while a record is processed*
>>>>> > at
>>>>> org.apache.kafka.streams.processor.internals.AbstractProcessorContext.
>>>>> > timestamp(
>>>>> > at com.lightbend.fdp.sample.kstream.processor.
>>>>> > BFStoreChangeLogger.logChange(BFStoreChangeLogger.scala:24)
>>>>> > at com.lightbend.fdp.sample.kstream.processor.BFStore.
>>>>> > flush(BFStore.scala:86)
>>>>> > at org.apache.kafka.streams.processor.internals.
>>>>> > ProcessorStateManager.flush(
>>>>> > ... 8 more
>>>>> >
>>>>> > Not sure I understand the whole trace but looks like this may be
>>>>> related
>>>>> > to It comes from
>>>>> the
>>>>> > class BFStoreChangeLogger in the line I marked above with //**//.
>>>>> >
>>>>> > Any help / workaround will be appreciated ..
>>>>> >
