Remove the` logChange` from `flush` and do it when you write to the store. i.e, in the BFStore + function
On Mon, 3 Jul 2017 at 10:43, Debasish Ghosh <ghosh.debas...@gmail.com> wrote: > Ok, so I make the following change .. Is this the change that u suggested ? > > // remove commit from process(). So process now looks as follows: > override def process(dummy: String, record: String): Unit = > LogParseUtil.parseLine(record) match { > case Success(r) => { > bfStore + r.host > bfStore.flush() > } > case Failure(ex) => throw ex > } > > Still I get the same exception. Just as a test, I removed the flush as > well from process() .. > > override def process(dummy: String, record: String): Unit = > LogParseUtil.parseLine(record) match { > case Success(r) => { > bfStore + r.host > } > case Failure(ex) => throw ex > } > > and still get the same exception as it does call flush after commit from > within .. here's the trace .. > > Exception in thread "StreamThread-1" > org.apache.kafka.streams.errors.ProcessorStateException: task [0_0] Failed > to flush state store log-counts > at > org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:337) > at > org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:72) > at > org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:188) > at > org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:280) > at > org.apache.kafka.streams.processor.internals.StreamThread.commitOne(StreamThread.java:807) > at > org.apache.kafka.streams.processor.internals.StreamThread.commitAll(StreamThread.java:794) > at > org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:769) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:647) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361) > Caused by: java.lang.IllegalStateException: This should not happen as > timestamp() should only be called while a record is processed > at > org.apache.kafka.streams.processor.internals.AbstractProcessorContext.timestamp(AbstractProcessorContext.java:150) > at > com.lightbend.fdp.sample.kstream.processor.BFStoreChangeLogger.logChange(BFStoreChangeLogger.scala:25) > at > com.lightbend.fdp.sample.kstream.processor.BFStore.flush(BFStore.scala:89) > at > org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:335) > > regards. > > > > On Mon, Jul 3, 2017 at 2:55 PM, Damian Guy <damian....@gmail.com> wrote: > >> `commit` is called by streams, you can see it in your stack trace above: >> >> > >> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:280) >> >> `commit` will subsequently call `flush` on any stores. At this point, >> though, there will be no `RecordContext` as there are no records being >> processed. Note, that calling `context.commit()` from your Processor isn't >> actually performing the commit, it is just signalling that a commit is >> necessary after this record has been processed. You may not want to do that >> as it probably will impact throughput. >> >> You should log the change when you write to the store, i.e, i think when >> you do: >> bfStore + r.host >> >> >> Does that help? >> >> Thanks, >> Damian >> >> >> On Mon, 3 Jul 2017 at 10:12 Debasish Ghosh <ghosh.debas...@gmail.com> >> wrote: >> >>> The only place where I am doing commit is from Processor.process() .. >>> Here it is .. >>> >>> class WeblogProcessor extends AbstractProcessor[String, String] { >>> private var bfStore: BFStore[String] = _ >>> >>> override def init(context: ProcessorContext): Unit = { >>> super.init(context) >>> this.context.schedule(1000) >>> bfStore = >>> this.context.getStateStore(WeblogDriver.LOG_COUNT_STATE_STORE).asInstanceOf[BFStore[String]] >>> } >>> >>> override def process(dummy: String, record: String): Unit = >>> LogParseUtil.parseLine(record) match { >>> case Success(r) => { >>> bfStore + r.host >>> context.commit() >>> context.forward(dummy, r.host) >>> } >>> case Failure(ex) => throw ex >>> } >>> >>> override def punctuate(timestamp: Long): Unit = >>> super.punctuate(timestamp) >>> override def close(): Unit = {} >>> } >>> >>> The commit invokes the flush() of my Store. Here is the flush() method >>> of my store .. >>> >>> override def flush(): Unit = { >>> if (loggingEnabled) { >>> changeLogger.logChange(changelogKey, bf >>> } >>> } >>> >>> which in turn calls logChange that gives the error. >>> >>> Am I missing something ? >>> >>> regards. >>> >>> On Mon, Jul 3, 2017 at 2:27 PM, Damian Guy <damian....@gmail.com> wrote: >>> >>>> Hi, >>>> >>>> It is because you are calling `context.timestamp` during `commit`. At >>>> this point there is no `RecordContext` associated with the >>>> `ProcessorContext`, hence the null pointer. The `RecordContext` is only set >>>> when streams is processing a record. You probably want to log the change >>>> when you write to the store. >>>> >>>> Thanks, >>>> Damian >>>> >>>> On Sat, 1 Jul 2017 at 19:14 Debasish Ghosh <ghosh.debas...@gmail.com> >>>> wrote: >>>> >>>>> Just to give some more information, the ProcessorContext that gets >>>>> passed >>>>> to the init method of the custom store has a null RecordContext. Gave >>>>> the >>>>> following debug statement .. >>>>> >>>>> println(context.asInstanceOf[ProcessorContextImpl].recordContext) >>>>> >>>>> and got null. >>>>> >>>>> regards. >>>>> >>>>> On Sat, Jul 1, 2017 at 9:41 PM, Debasish Ghosh < >>>>> ghosh.debas...@gmail.com> >>>>> wrote: >>>>> >>>>> > Hi - >>>>> > >>>>> > I have implemented a custom state store named BFStore with a change >>>>> > logger as follows: >>>>> > >>>>> > class BFStoreChangeLogger[K, V](val storeName: String, >>>>> > val context: ProcessorContext, >>>>> > val partition: Int, >>>>> > val serialization: StateSerdes[K, >>>>> V]) { >>>>> > >>>>> > private val topic = >>>>> ProcessorStateManager.storeChangelogTopic(context.applicationId, >>>>> > storeName) >>>>> > private val collector = >>>>> context.asInstanceOf[RecordCollector.Supplier]. >>>>> > recordCollector >>>>> > >>>>> > def this(storeName: String, context: ProcessorContext, >>>>> serialization: >>>>> > StateSerdes[K, V]) { >>>>> > this(storeName, context, context.taskId.partition, serialization) >>>>> > } >>>>> > >>>>> > def logChange(key: K, value: V): Unit = { >>>>> > if (collector != null) { >>>>> > val keySerializer = serialization.keySerializer >>>>> > val valueSerializer = serialization.valueSerializer >>>>> > collector.send(this.topic, key, value, this.partition, >>>>> > context.timestamp, keySerializer, valueSerializer) //**// >>>>> > } >>>>> > } >>>>> > } >>>>> > >>>>> > In my driver program I build the topology and start the streams as >>>>> follows: >>>>> > >>>>> > val builder: TopologyBuilder = new TopologyBuilder() >>>>> > >>>>> > builder.addSource("Source", config.fromTopic) >>>>> > .addProcessor("Process", () => new WeblogProcessor(), >>>>> "Source") >>>>> > .addStateStore(new >>>>> BFStoreSupplier[String](LOG_COUNT_STATE_STORE, >>>>> > stringSerde, true, changelogConfig), "Process") >>>>> > .addSink("Sink", "weblog-count-topic", "Process") >>>>> > >>>>> > val streams = new KafkaStreams(builder, streamingConfig) >>>>> > streams.start() >>>>> > >>>>> > When I run the program, immediately I get the following exception .. >>>>> > >>>>> > Exception in thread "StreamThread-1" >>>>> org.apache.kafka.streams.errors.ProcessorStateException: >>>>> > task [0_0] Failed to flush state store log-counts >>>>> > at org.apache.kafka.streams.processor.internals. >>>>> > ProcessorStateManager.flush(ProcessorStateManager.java:337) >>>>> > at org.apache.kafka.streams.processor.internals. >>>>> > StreamTask$1.run(StreamTask.java:72) >>>>> > at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl. >>>>> > measureLatencyNs(StreamsMetricsImpl.java:188) >>>>> > at org.apache.kafka.streams.processor.internals. >>>>> > StreamTask.commit(StreamTask.java:280) >>>>> > at >>>>> org.apache.kafka.streams.processor.internals.StreamThread.commitOne( >>>>> > StreamThread.java:807) >>>>> > at >>>>> org.apache.kafka.streams.processor.internals.StreamThread.commitAll( >>>>> > StreamThread.java:794) >>>>> > at >>>>> org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit( >>>>> > StreamThread.java:769) >>>>> > at org.apache.kafka.streams.processor.internals.StreamThread.runLoop( >>>>> > StreamThread.java:647) >>>>> > at org.apache.kafka.streams.processor.internals. >>>>> > StreamThread.run(StreamThread.java:361) >>>>> > *Caused by: java.lang.IllegalStateException: This should not happen >>>>> as >>>>> > timestamp() should only be called while a record is processed* >>>>> > at >>>>> org.apache.kafka.streams.processor.internals.AbstractProcessorContext. >>>>> > timestamp(AbstractProcessorContext.java:150) >>>>> > at com.lightbend.fdp.sample.kstream.processor. >>>>> > BFStoreChangeLogger.logChange(BFStoreChangeLogger.scala:24) >>>>> > at com.lightbend.fdp.sample.kstream.processor.BFStore. >>>>> > flush(BFStore.scala:86) >>>>> > at org.apache.kafka.streams.processor.internals. >>>>> > ProcessorStateManager.flush(ProcessorStateManager.java:335) >>>>> > ... 8 more >>>>> > >>>>> > Not sure I understand the whole trace but looks like this may be >>>>> related >>>>> > to https://issues.apache.org/jira/browse/KAFKA-5528. It comes from >>>>> the >>>>> > class BFStoreChangeLogger in the line I marked above with //**//. >>>>> > >>>>> > Any help / workaround will be appreciated .. >>>>> > >>>>> > regards. >>>>> > -- >>>>> > Debasish Ghosh >>>>> > http://manning.com/ghosh2 >>>>> > http://manning.com/ghosh >>>>> > >>>>> > Twttr: @debasishg >>>>> > Blog: http://debasishg.blogspot.com >>>>> > Code: http://github.com/debasishg >>>>> > >>>>> >>>>> >>>>> >>>>> -- >>>>> Debasish Ghosh >>>>> http://manning.com/ghosh2 >>>>> http://manning.com/ghosh >>>>> >>>>> Twttr: @debasishg >>>>> Blog: http://debasishg.blogspot.com >>>>> Code: http://github.com/debasishg >>>>> >>>> >>> >>> >>> -- >>> Debasish Ghosh >>> http://manning.com/ghosh2 >>> http://manning.com/ghosh >>> >>> Twttr: @debasishg >>> Blog: http://debasishg.blogspot.com >>> Code: http://github.com/debasishg >>> >> > > > -- > Debasish Ghosh > http://manning.com/ghosh2 > http://manning.com/ghosh > > Twttr: @debasishg > Blog: http://debasishg.blogspot.com > Code: http://github.com/debasishg >