https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-WhydoIgetanIllegalStateExceptionwhenaccessingrecordmetadata?

-Matthias

On 7/1/17 8:13 PM, Debasish Ghosh wrote:
> Just to give some more information, the ProcessorContext that gets passed
> to the init method of the custom store has a null RecordContext. Gave the
> following debug statement ..
> 
> println(context.asInstanceOf[ProcessorContextImpl].recordContext)
> 
> and got null.
> 
> regards.
> 
> On Sat, Jul 1, 2017 at 9:41 PM, Debasish Ghosh <ghosh.debas...@gmail.com>
> wrote:
> 
>> Hi -
>>
>> I have implemented a custom state store named BFStore with a change
>> logger as follows:
>>
>> class BFStoreChangeLogger[K, V](val storeName: String,
>>                                 val context: ProcessorContext,
>>                                 val partition: Int,
>>                                 val serialization: StateSerdes[K, V]) {
>>
>>   private val topic = 
>> ProcessorStateManager.storeChangelogTopic(context.applicationId,
>> storeName)
>>   private val collector = context.asInstanceOf[RecordCollector.Supplier].
>> recordCollector
>>
>>   def this(storeName: String, context: ProcessorContext, serialization:
>> StateSerdes[K, V]) {
>>     this(storeName, context, context.taskId.partition, serialization)
>>   }
>>
>>   def logChange(key: K, value: V): Unit = {
>>     if (collector != null) {
>>       val keySerializer = serialization.keySerializer
>>       val valueSerializer = serialization.valueSerializer
>>       collector.send(this.topic, key, value, this.partition,
>> context.timestamp, keySerializer, valueSerializer)  //**//
>>     }
>>   }
>> }
>>
>> In my driver program I build the topology and start the streams as follows:
>>
>> val builder: TopologyBuilder = new TopologyBuilder()
>>
>> builder.addSource("Source", config.fromTopic)
>>        .addProcessor("Process", () => new WeblogProcessor(), "Source")
>>        .addStateStore(new BFStoreSupplier[String](LOG_COUNT_STATE_STORE,
>> stringSerde, true, changelogConfig), "Process")
>>        .addSink("Sink", "weblog-count-topic", "Process")
>>
>> val streams = new KafkaStreams(builder, streamingConfig)
>> streams.start()
>>
>> When I run the program, immediately I get the following exception ..
>>
>> Exception in thread "StreamThread-1" 
>> org.apache.kafka.streams.errors.ProcessorStateException:
>> task [0_0] Failed to flush state store log-counts
>> at org.apache.kafka.streams.processor.internals.
>> ProcessorStateManager.flush(ProcessorStateManager.java:337)
>> at org.apache.kafka.streams.processor.internals.
>> StreamTask$1.run(StreamTask.java:72)
>> at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.
>> measureLatencyNs(StreamsMetricsImpl.java:188)
>> at org.apache.kafka.streams.processor.internals.
>> StreamTask.commit(StreamTask.java:280)
>> at org.apache.kafka.streams.processor.internals.StreamThread.commitOne(
>> StreamThread.java:807)
>> at org.apache.kafka.streams.processor.internals.StreamThread.commitAll(
>> StreamThread.java:794)
>> at org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(
>> StreamThread.java:769)
>> at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
>> StreamThread.java:647)
>> at org.apache.kafka.streams.processor.internals.
>> StreamThread.run(StreamThread.java:361)
>> *Caused by: java.lang.IllegalStateException: This should not happen as
>> timestamp() should only be called while a record is processed*
>> at org.apache.kafka.streams.processor.internals.AbstractProcessorContext.
>> timestamp(AbstractProcessorContext.java:150)
>> at com.lightbend.fdp.sample.kstream.processor.
>> BFStoreChangeLogger.logChange(BFStoreChangeLogger.scala:24)
>> at com.lightbend.fdp.sample.kstream.processor.BFStore.
>> flush(BFStore.scala:86)
>> at org.apache.kafka.streams.processor.internals.
>> ProcessorStateManager.flush(ProcessorStateManager.java:335)
>> ... 8 more
>>
>> Not sure I understand the whole trace but looks like this may be related
>> to https://issues.apache.org/jira/browse/KAFKA-5528. It comes from the
>> class BFStoreChangeLogger in the line I marked above with //**//.
>>
>> Any help / workaround will be appreciated ..
>>
>> regards.
>> --
>> Debasish Ghosh
>> http://manning.com/ghosh2
>> http://manning.com/ghosh
>>
>> Twttr: @debasishg
>> Blog: http://debasishg.blogspot.com
>> Code: http://github.com/debasishg
>>
> 
> 
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to