Re: IllegalStateException with custom state store ..

2017-07-20 Thread Matthias J. Sax
https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-WhydoIgetanIllegalStateExceptionwhenaccessingrecordmetadata? -Matthias On 7/1/17 8:13 PM, Debasish Ghosh wrote: > Just to give some more information, the ProcessorContext that gets passed > to the init method of the custom store has a null

Re: IllegalStateException with custom state store ..

2017-07-03 Thread Debasish Ghosh
That exception is gone .. Thanks for the suggestion. I followed the example from https://github.com/confluentinc/examples/blob/3.2.x/kafka-streams/src/main/scala/io/confluent/examples/streams/algebird/CMSStore.scala#L258 .. regards. On Mon, Jul 3, 2017 at 3:23 PM, Damian Guy wrote: > Remove th

Re: IllegalStateException with custom state store ..

2017-07-03 Thread Damian Guy
Remove the` logChange` from `flush` and do it when you write to the store. i.e, in the BFStore + function On Mon, 3 Jul 2017 at 10:43, Debasish Ghosh wrote: > Ok, so I make the following change .. Is this the change that u suggested ? > > // remove commit from process(). So process now looks as

Re: IllegalStateException with custom state store ..

2017-07-03 Thread Debasish Ghosh
Ok, so I make the following change .. Is this the change that u suggested ? // remove commit from process(). So process now looks as follows: override def process(dummy: String, record: String): Unit = LogParseUtil.parseLine(record) match { case Success(r) => { bfStore + r.host bfStore.f

Re: IllegalStateException with custom state store ..

2017-07-03 Thread Damian Guy
`commit` is called by streams, you can see it in your stack trace above: > org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:280) `commit` will subsequently call `flush` on any stores. At this point, though, there will be no `RecordContext` as there are no records bei

Re: IllegalStateException with custom state store ..

2017-07-03 Thread Debasish Ghosh
The only place where I am doing commit is from Processor.process() .. Here it is .. class WeblogProcessor extends AbstractProcessor[String, String] { private var bfStore: BFStore[String] = _ override def init(context: ProcessorContext): Unit = { super.init(context) this.context.schedu

Re: IllegalStateException with custom state store ..

2017-07-03 Thread Damian Guy
Hi, It is because you are calling `context.timestamp` during `commit`. At this point there is no `RecordContext` associated with the `ProcessorContext`, hence the null pointer. The `RecordContext` is only set when streams is processing a record. You probably want to log the change when you write t

Re: IllegalStateException with custom state store ..

2017-07-01 Thread Debasish Ghosh
Just to give some more information, the ProcessorContext that gets passed to the init method of the custom store has a null RecordContext. Gave the following debug statement .. println(context.asInstanceOf[ProcessorContextImpl].recordContext) and got null. regards. On Sat, Jul 1, 2017 at 9:41 P

IllegalStateException with custom state store ..

2017-07-01 Thread Debasish Ghosh
Hi - I have implemented a custom state store named BFStore with a change logger as follows: class BFStoreChangeLogger[K, V](val storeName: String, val context: ProcessorContext, val partition: Int, val