https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-WhydoIgetanIllegalStateExceptionwhenaccessingrecordmetadata?
-Matthias On 7/1/17 8:13 PM, Debasish Ghosh wrote: > Just to give some more information, the ProcessorContext that gets passed > to the init method of the custom store has a null RecordContext. Gave the > following debug statement .. > > println(context.asInstanceOf[ProcessorContextImpl].recordContext) > > and got null. > > regards. > > On Sat, Jul 1, 2017 at 9:41 PM, Debasish Ghosh <ghosh.debas...@gmail.com> > wrote: > >> Hi - >> >> I have implemented a custom state store named BFStore with a change >> logger as follows: >> >> class BFStoreChangeLogger[K, V](val storeName: String, >> val context: ProcessorContext, >> val partition: Int, >> val serialization: StateSerdes[K, V]) { >> >> private val topic = >> ProcessorStateManager.storeChangelogTopic(context.applicationId, >> storeName) >> private val collector = context.asInstanceOf[RecordCollector.Supplier]. >> recordCollector >> >> def this(storeName: String, context: ProcessorContext, serialization: >> StateSerdes[K, V]) { >> this(storeName, context, context.taskId.partition, serialization) >> } >> >> def logChange(key: K, value: V): Unit = { >> if (collector != null) { >> val keySerializer = serialization.keySerializer >> val valueSerializer = serialization.valueSerializer >> collector.send(this.topic, key, value, this.partition, >> context.timestamp, keySerializer, valueSerializer) //**// >> } >> } >> } >> >> In my driver program I build the topology and start the streams as follows: >> >> val builder: TopologyBuilder = new TopologyBuilder() >> >> builder.addSource("Source", config.fromTopic) >> .addProcessor("Process", () => new WeblogProcessor(), "Source") >> .addStateStore(new BFStoreSupplier[String](LOG_COUNT_STATE_STORE, >> stringSerde, true, changelogConfig), "Process") >> .addSink("Sink", "weblog-count-topic", "Process") >> >> val streams = new KafkaStreams(builder, streamingConfig) >> streams.start() >> >> When I run the program, immediately I get the following exception .. >> >> Exception in thread "StreamThread-1" >> org.apache.kafka.streams.errors.ProcessorStateException: >> task [0_0] Failed to flush state store log-counts >> at org.apache.kafka.streams.processor.internals. >> ProcessorStateManager.flush(ProcessorStateManager.java:337) >> at org.apache.kafka.streams.processor.internals. >> StreamTask$1.run(StreamTask.java:72) >> at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl. >> measureLatencyNs(StreamsMetricsImpl.java:188) >> at org.apache.kafka.streams.processor.internals. >> StreamTask.commit(StreamTask.java:280) >> at org.apache.kafka.streams.processor.internals.StreamThread.commitOne( >> StreamThread.java:807) >> at org.apache.kafka.streams.processor.internals.StreamThread.commitAll( >> StreamThread.java:794) >> at org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit( >> StreamThread.java:769) >> at org.apache.kafka.streams.processor.internals.StreamThread.runLoop( >> StreamThread.java:647) >> at org.apache.kafka.streams.processor.internals. >> StreamThread.run(StreamThread.java:361) >> *Caused by: java.lang.IllegalStateException: This should not happen as >> timestamp() should only be called while a record is processed* >> at org.apache.kafka.streams.processor.internals.AbstractProcessorContext. >> timestamp(AbstractProcessorContext.java:150) >> at com.lightbend.fdp.sample.kstream.processor. >> BFStoreChangeLogger.logChange(BFStoreChangeLogger.scala:24) >> at com.lightbend.fdp.sample.kstream.processor.BFStore. >> flush(BFStore.scala:86) >> at org.apache.kafka.streams.processor.internals. >> ProcessorStateManager.flush(ProcessorStateManager.java:335) >> ... 8 more >> >> Not sure I understand the whole trace but looks like this may be related >> to https://issues.apache.org/jira/browse/KAFKA-5528. It comes from the >> class BFStoreChangeLogger in the line I marked above with //**//. >> >> Any help / workaround will be appreciated .. >> >> regards. >> -- >> Debasish Ghosh >> http://manning.com/ghosh2 >> http://manning.com/ghosh >> >> Twttr: @debasishg >> Blog: http://debasishg.blogspot.com >> Code: http://github.com/debasishg >> > > >
signature.asc
Description: OpenPGP digital signature