https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-WhydoIgetanIllegalStateExceptionwhenaccessingrecordmetadata?
-Matthias
On 7/1/17 8:13 PM, Debasish Ghosh wrote:
> Just to give some more information, the ProcessorContext that gets passed
> to the init method of the custom store has a null
That exception is gone .. Thanks for the suggestion.
I followed the example from
https://github.com/confluentinc/examples/blob/3.2.x/kafka-streams/src/main/scala/io/confluent/examples/streams/algebird/CMSStore.scala#L258
..
regards.
On Mon, Jul 3, 2017 at 3:23 PM, Damian Guy wrote:
> Remove th
Remove the` logChange` from `flush` and do it when you write to the store.
i.e, in the BFStore + function
On Mon, 3 Jul 2017 at 10:43, Debasish Ghosh
wrote:
> Ok, so I make the following change .. Is this the change that u suggested ?
>
> // remove commit from process(). So process now looks as
Ok, so I make the following change .. Is this the change that u suggested ?
// remove commit from process(). So process now looks as follows:
override def process(dummy: String, record: String): Unit =
LogParseUtil.parseLine(record) match {
case Success(r) => {
bfStore + r.host
bfStore.f
`commit` is called by streams, you can see it in your stack trace above:
>
org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:280)
`commit` will subsequently call `flush` on any stores. At this point,
though, there will be no `RecordContext` as there are no records bei
The only place where I am doing commit is from Processor.process() .. Here
it is ..
class WeblogProcessor extends AbstractProcessor[String, String] {
private var bfStore: BFStore[String] = _
override def init(context: ProcessorContext): Unit = {
super.init(context)
this.context.schedu
Hi,
It is because you are calling `context.timestamp` during `commit`. At this
point there is no `RecordContext` associated with the `ProcessorContext`,
hence the null pointer. The `RecordContext` is only set when streams is
processing a record. You probably want to log the change when you write t
Just to give some more information, the ProcessorContext that gets passed
to the init method of the custom store has a null RecordContext. Gave the
following debug statement ..
println(context.asInstanceOf[ProcessorContextImpl].recordContext)
and got null.
regards.
On Sat, Jul 1, 2017 at 9:41 P
Hi -
I have implemented a custom state store named BFStore with a change logger
as follows:
class BFStoreChangeLogger[K, V](val storeName: String,
val context: ProcessorContext,
val partition: Int,
val