That exception is gone .. Thanks for the suggestion. I followed the example from https://github.com/confluentinc/examples/blob/3.2.x/kafka-streams/src/main/scala/io/confluent/examples/streams/algebird/CMSStore.scala#L258 ..
regards. On Mon, Jul 3, 2017 at 3:23 PM, Damian Guy <damian....@gmail.com> wrote: > Remove the` logChange` from `flush` and do it when you write to the store. > > i.e, in the BFStore + function > > > On Mon, 3 Jul 2017 at 10:43, Debasish Ghosh <ghosh.debas...@gmail.com> > wrote: > >> Ok, so I make the following change .. Is this the change that u suggested >> ? >> >> // remove commit from process(). So process now looks as follows: >> override def process(dummy: String, record: String): Unit = >> LogParseUtil.parseLine(record) match { >> case Success(r) => { >> bfStore + r.host >> bfStore.flush() >> } >> case Failure(ex) => throw ex >> } >> >> Still I get the same exception. Just as a test, I removed the flush as >> well from process() .. >> >> override def process(dummy: String, record: String): Unit = >> LogParseUtil.parseLine(record) match { >> case Success(r) => { >> bfStore + r.host >> } >> case Failure(ex) => throw ex >> } >> >> and still get the same exception as it does call flush after commit from >> within .. here's the trace .. >> >> Exception in thread "StreamThread-1" >> org.apache.kafka.streams.errors.ProcessorStateException: >> task [0_0] Failed to flush state store log-counts >> at org.apache.kafka.streams.processor.internals. >> ProcessorStateManager.flush(ProcessorStateManager.java:337) >> at org.apache.kafka.streams.processor.internals. >> StreamTask$1.run(StreamTask.java:72) >> at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl. >> measureLatencyNs(StreamsMetricsImpl.java:188) >> at org.apache.kafka.streams.processor.internals. >> StreamTask.commit(StreamTask.java:280) >> at org.apache.kafka.streams.processor.internals.StreamThread.commitOne( >> StreamThread.java:807) >> at org.apache.kafka.streams.processor.internals.StreamThread.commitAll( >> StreamThread.java:794) >> at org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit( >> StreamThread.java:769) >> at org.apache.kafka.streams.processor.internals.StreamThread.runLoop( >> StreamThread.java:647) >> at org.apache.kafka.streams.processor.internals. >> StreamThread.run(StreamThread.java:361) >> Caused by: java.lang.IllegalStateException: This should not happen as >> timestamp() should only be called while a record is processed >> at org.apache.kafka.streams.processor.internals.AbstractProcessorContext. >> timestamp(AbstractProcessorContext.java:150) >> at com.lightbend.fdp.sample.kstream.processor. >> BFStoreChangeLogger.logChange(BFStoreChangeLogger.scala:25) >> at com.lightbend.fdp.sample.kstream.processor.BFStore. >> flush(BFStore.scala:89) >> at org.apache.kafka.streams.processor.internals. >> ProcessorStateManager.flush(ProcessorStateManager.java:335) >> >> regards. >> >> >> >> On Mon, Jul 3, 2017 at 2:55 PM, Damian Guy <damian....@gmail.com> wrote: >> >>> `commit` is called by streams, you can see it in your stack trace above: >>> >>> > org.apache.kafka.streams.processor.internals. >>> StreamTask.commit(StreamTask.java:280) >>> >>> `commit` will subsequently call `flush` on any stores. At this point, >>> though, there will be no `RecordContext` as there are no records being >>> processed. Note, that calling `context.commit()` from your Processor isn't >>> actually performing the commit, it is just signalling that a commit is >>> necessary after this record has been processed. You may not want to do that >>> as it probably will impact throughput. >>> >>> You should log the change when you write to the store, i.e, i think when >>> you do: >>> bfStore + r.host >>> >>> >>> Does that help? >>> >>> Thanks, >>> Damian >>> >>> >>> On Mon, 3 Jul 2017 at 10:12 Debasish Ghosh <ghosh.debas...@gmail.com> >>> wrote: >>> >>>> The only place where I am doing commit is from Processor.process() .. >>>> Here it is .. >>>> >>>> class WeblogProcessor extends AbstractProcessor[String, String] { >>>> private var bfStore: BFStore[String] = _ >>>> >>>> override def init(context: ProcessorContext): Unit = { >>>> super.init(context) >>>> this.context.schedule(1000) >>>> bfStore = this.context.getStateStore(WeblogDriver.LOG_COUNT_STATE_ >>>> STORE).asInstanceOf[BFStore[String]] >>>> } >>>> >>>> override def process(dummy: String, record: String): Unit = >>>> LogParseUtil.parseLine(record) match { >>>> case Success(r) => { >>>> bfStore + r.host >>>> context.commit() >>>> context.forward(dummy, r.host) >>>> } >>>> case Failure(ex) => throw ex >>>> } >>>> >>>> override def punctuate(timestamp: Long): Unit = >>>> super.punctuate(timestamp) >>>> override def close(): Unit = {} >>>> } >>>> >>>> The commit invokes the flush() of my Store. Here is the flush() method >>>> of my store .. >>>> >>>> override def flush(): Unit = { >>>> if (loggingEnabled) { >>>> changeLogger.logChange(changelogKey, bf >>>> } >>>> } >>>> >>>> which in turn calls logChange that gives the error. >>>> >>>> Am I missing something ? >>>> >>>> regards. >>>> >>>> On Mon, Jul 3, 2017 at 2:27 PM, Damian Guy <damian....@gmail.com> >>>> wrote: >>>> >>>>> Hi, >>>>> >>>>> It is because you are calling `context.timestamp` during `commit`. At >>>>> this point there is no `RecordContext` associated with the >>>>> `ProcessorContext`, hence the null pointer. The `RecordContext` is only >>>>> set >>>>> when streams is processing a record. You probably want to log the change >>>>> when you write to the store. >>>>> >>>>> Thanks, >>>>> Damian >>>>> >>>>> On Sat, 1 Jul 2017 at 19:14 Debasish Ghosh <ghosh.debas...@gmail.com> >>>>> wrote: >>>>> >>>>>> Just to give some more information, the ProcessorContext that gets >>>>>> passed >>>>>> to the init method of the custom store has a null RecordContext. Gave >>>>>> the >>>>>> following debug statement .. >>>>>> >>>>>> println(context.asInstanceOf[ProcessorContextImpl].recordContext) >>>>>> >>>>>> and got null. >>>>>> >>>>>> regards. >>>>>> >>>>>> On Sat, Jul 1, 2017 at 9:41 PM, Debasish Ghosh < >>>>>> ghosh.debas...@gmail.com> >>>>>> wrote: >>>>>> >>>>>> > Hi - >>>>>> > >>>>>> > I have implemented a custom state store named BFStore with a change >>>>>> > logger as follows: >>>>>> > >>>>>> > class BFStoreChangeLogger[K, V](val storeName: String, >>>>>> > val context: ProcessorContext, >>>>>> > val partition: Int, >>>>>> > val serialization: StateSerdes[K, >>>>>> V]) { >>>>>> > >>>>>> > private val topic = ProcessorStateManager. >>>>>> storeChangelogTopic(context.applicationId, >>>>>> > storeName) >>>>>> > private val collector = context.asInstanceOf[ >>>>>> RecordCollector.Supplier]. >>>>>> > recordCollector >>>>>> > >>>>>> > def this(storeName: String, context: ProcessorContext, >>>>>> serialization: >>>>>> > StateSerdes[K, V]) { >>>>>> > this(storeName, context, context.taskId.partition, >>>>>> serialization) >>>>>> > } >>>>>> > >>>>>> > def logChange(key: K, value: V): Unit = { >>>>>> > if (collector != null) { >>>>>> > val keySerializer = serialization.keySerializer >>>>>> > val valueSerializer = serialization.valueSerializer >>>>>> > collector.send(this.topic, key, value, this.partition, >>>>>> > context.timestamp, keySerializer, valueSerializer) //**// >>>>>> > } >>>>>> > } >>>>>> > } >>>>>> > >>>>>> > In my driver program I build the topology and start the streams as >>>>>> follows: >>>>>> > >>>>>> > val builder: TopologyBuilder = new TopologyBuilder() >>>>>> > >>>>>> > builder.addSource("Source", config.fromTopic) >>>>>> > .addProcessor("Process", () => new WeblogProcessor(), >>>>>> "Source") >>>>>> > .addStateStore(new BFStoreSupplier[String](LOG_ >>>>>> COUNT_STATE_STORE, >>>>>> > stringSerde, true, changelogConfig), "Process") >>>>>> > .addSink("Sink", "weblog-count-topic", "Process") >>>>>> > >>>>>> > val streams = new KafkaStreams(builder, streamingConfig) >>>>>> > streams.start() >>>>>> > >>>>>> > When I run the program, immediately I get the following exception .. >>>>>> > >>>>>> > Exception in thread "StreamThread-1" org.apache.kafka.streams. >>>>>> errors.ProcessorStateException: >>>>>> > task [0_0] Failed to flush state store log-counts >>>>>> > at org.apache.kafka.streams.processor.internals. >>>>>> > ProcessorStateManager.flush(ProcessorStateManager.java:337) >>>>>> > at org.apache.kafka.streams.processor.internals. >>>>>> > StreamTask$1.run(StreamTask.java:72) >>>>>> > at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl. >>>>>> > measureLatencyNs(StreamsMetricsImpl.java:188) >>>>>> > at org.apache.kafka.streams.processor.internals. >>>>>> > StreamTask.commit(StreamTask.java:280) >>>>>> > at org.apache.kafka.streams.processor.internals. >>>>>> StreamThread.commitOne( >>>>>> > StreamThread.java:807) >>>>>> > at org.apache.kafka.streams.processor.internals. >>>>>> StreamThread.commitAll( >>>>>> > StreamThread.java:794) >>>>>> > at org.apache.kafka.streams.processor.internals. >>>>>> StreamThread.maybeCommit( >>>>>> > StreamThread.java:769) >>>>>> > at org.apache.kafka.streams.processor.internals. >>>>>> StreamThread.runLoop( >>>>>> > StreamThread.java:647) >>>>>> > at org.apache.kafka.streams.processor.internals. >>>>>> > StreamThread.run(StreamThread.java:361) >>>>>> > *Caused by: java.lang.IllegalStateException: This should not >>>>>> happen as >>>>>> > timestamp() should only be called while a record is processed* >>>>>> > at org.apache.kafka.streams.processor.internals. >>>>>> AbstractProcessorContext. >>>>>> > timestamp(AbstractProcessorContext.java:150) >>>>>> > at com.lightbend.fdp.sample.kstream.processor. >>>>>> > BFStoreChangeLogger.logChange(BFStoreChangeLogger.scala:24) >>>>>> > at com.lightbend.fdp.sample.kstream.processor.BFStore. >>>>>> > flush(BFStore.scala:86) >>>>>> > at org.apache.kafka.streams.processor.internals. >>>>>> > ProcessorStateManager.flush(ProcessorStateManager.java:335) >>>>>> > ... 8 more >>>>>> > >>>>>> > Not sure I understand the whole trace but looks like this may be >>>>>> related >>>>>> > to https://issues.apache.org/jira/browse/KAFKA-5528. It comes from >>>>>> the >>>>>> > class BFStoreChangeLogger in the line I marked above with //**//. >>>>>> > >>>>>> > Any help / workaround will be appreciated .. >>>>>> > >>>>>> > regards. >>>>>> > -- >>>>>> > Debasish Ghosh >>>>>> > http://manning.com/ghosh2 >>>>>> > http://manning.com/ghosh >>>>>> > >>>>>> > Twttr: @debasishg >>>>>> > Blog: http://debasishg.blogspot.com >>>>>> > Code: http://github.com/debasishg >>>>>> > >>>>>> >>>>>> >>>>>> >>>>>> -- >>>>>> Debasish Ghosh >>>>>> http://manning.com/ghosh2 >>>>>> http://manning.com/ghosh >>>>>> >>>>>> Twttr: @debasishg >>>>>> Blog: http://debasishg.blogspot.com >>>>>> Code: http://github.com/debasishg >>>>>> >>>>> >>>> >>>> >>>> -- >>>> Debasish Ghosh >>>> http://manning.com/ghosh2 >>>> http://manning.com/ghosh >>>> >>>> Twttr: @debasishg >>>> Blog: http://debasishg.blogspot.com >>>> Code: http://github.com/debasishg >>>> >>> >> >> >> -- >> Debasish Ghosh >> http://manning.com/ghosh2 >> http://manning.com/ghosh >> >> Twttr: @debasishg >> Blog: http://debasishg.blogspot.com >> Code: http://github.com/debasishg >> > -- Debasish Ghosh http://manning.com/ghosh2 http://manning.com/ghosh Twttr: @debasishg Blog: http://debasishg.blogspot.com Code: http://github.com/debasishg