Peter Pringle created KAFKA-10246: ------------------------------------- Summary: AbstractProcessorContext topic() throws NullPointerException when modifying a state store within the DSL from a punctuator Key: KAFKA-10246 URL: https://issues.apache.org/jira/browse/KAFKA-10246 Project: Kafka Issue Type: Bug Components: streams Affects Versions: 2.5.0 Environment: linux, windows, java 11 Reporter: Peter Pringle
It seems valid for AbstractProcessorContext.topic() to return null; however the check below returns a NullPointerException before a null can be returned. {quote}if (topic.equals(NONEXIST_TOPIC)) { {quote} Make a local fix to reverse the ordering of the check (i.e. avoid the null) and this appears to fix the issue and sends the change to the state stores change log topic. {quote}if (NONEXIST_TOPIC.equals(topic)) { {quote} Stacktrace below seen when deleting from a previously declared ktable materialized state store which is being called from a punctuator added to the topology using either process/valueTransform within the init method. {{2020-07-02 07:29:46,829 [ABC_aggregator-551a90c1-d7c3-4357-a608-3ea79951f4e8-StreamThread-5] ERROR [o.a.k.s.p.i.StreamThread]: stream-thread [ABC_aggregator-5}} {{51a90c1-d7c3-4357-a608-3ea79951f4e8-StreamThread-5] Encountered the following error during processing:}} {{java.lang.NullPointerException: null}} {{ at org.apache.kafka.streams.processor.internals.AbstractProcessorContext.topic(AbstractProcessorContext.java:115)}} {{ at org.apache.kafka.streams.state.internals.CachingKeyValueStore.putInternal(CachingKeyValueStore.java:141)}} {{ at org.apache.kafka.streams.state.internals.CachingKeyValueStore.put(CachingKeyValueStore.java:123)}} {{ at org.apache.kafka.streams.state.internals.CachingKeyValueStore.put(CachingKeyValueStore.java:36)}} {{ at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.lambda$put$3(MeteredKeyValueStore.java:144)}} {{ at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:806)}} {{ at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.put(MeteredKeyValueStore.java:144)}} {{ at org.apache.kafka.streams.processor.internals.ProcessorContextImpl$KeyValueStoreReadWriteDecorator.put(ProcessorContextImpl.java:487)}} {{ at org.apache.kafka.streams.kstream.internals.KTableKTableJoinMerger$KTableKTableJoinMergeProcessor.process(KTableKTableJoinMerger.java:118)}} {{ at org.apache.kafka.streams.kstream.internals.KTableKTableJoinMerger$KTableKTableJoinMergeProcessor.process(KTableKTableJoinMerger.java:97)}} {{ at org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142)}} {{ at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:806)}} {{ at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142)}} {{ at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201)}} {{ at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180)}} {{ at org.apache.kafka.streams.kstream.internals.KTableKTableOuterJoin$KTableKTableOuterJoinProcessor.process(KTableKTableOuterJoin.java:118)}} {{ at org.apache.kafka.streams.kstream.internals.KTableKTableOuterJoin$KTableKTableOuterJoinProcessor.process(KTableKTableOuterJoin.java:65)}} {{ at org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142)}} {{ at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:806)}} {{ at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142)}} {{ at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201)}} {{ at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180)}} {{ at org.apache.kafka.streams.kstream.internals.TimestampedCacheFlushListener.apply(TimestampedCacheFlushListener.java:45)}} {{ at org.apache.kafka.streams.kstream.internals.TimestampedCacheFlushListener.apply(TimestampedCacheFlushListener.java:28)}} {{ at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.lambda$setFlushListener$1(MeteredKeyValueStore.java:119)}} {{ at org.apache.kafka.streams.state.internals.CachingKeyValueStore.putAndMaybeForward(CachingKeyValueStore.java:92)}} {{ at org.apache.kafka.streams.state.internals.CachingKeyValueStore.lambda$initInternal$0(CachingKeyValueStore.java:72)}} {{ at org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:151)}} {{ at org.apache.kafka.streams.state.internals.NamedCache.evict(NamedCache.java:244)}} {{ at org.apache.kafka.streams.state.internals.ThreadCache.maybeEvict(ThreadCache.java:240)}} {{ at org.apache.kafka.streams.state.internals.ThreadCache.put(ThreadCache.java:150)}} {{ at org.apache.kafka.streams.state.internals.CachingKeyValueStore.putInternal(CachingKeyValueStore.java:131)}} {{ at org.apache.kafka.streams.state.internals.CachingKeyValueStore.put(CachingKeyValueStore.java:123)}} {{ at org.apache.kafka.streams.state.internals.CachingKeyValueStore.put(CachingKeyValueStore.java:36)}} {{ at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.lambda$put$3(MeteredKeyValueStore.java:144)}} {{ at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:806)}} {{ at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.put(MeteredKeyValueStore.java:144)}} {{ at org.apache.kafka.streams.processor.internals.ProcessorContextImpl$KeyValueStoreReadWriteDecorator.put(ProcessorContextImpl.java:487)}} {{ at com.pjp1981.streambuilder.StreamsBuilderHelper$1.lambda$init$0(StreamsBuilderHelper.java:55) // punctuated lambda - user code}} {{ at java.base/java.util.Iterator.forEachRemaining(Iterator.java:133) //iterates over the state store and cleans up old items}} {{ at com.pjp1981.streambuilder.StreamsBuilderHelper$1.lambda$init$1(StreamsBuilderHelper.java:47)}} {{ at org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$punctuate$3(ProcessorNode.java:161)}} {{ at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:806)}} {{ at org.apache.kafka.streams.processor.internals.ProcessorNode.punctuate(ProcessorNode.java:161)}} {{ at org.apache.kafka.streams.processor.internals.StreamTask.lambda$punctuate$4(StreamTask.java:445)}} {{ at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:806)}} {{ at org.apache.kafka.streams.processor.internals.StreamTask.punctuate(StreamTask.java:445)}} {{ at org.apache.kafka.streams.processor.internals.PunctuationQueue.mayPunctuate(PunctuationQueue.java:54)}} {{ at org.apache.kafka.streams.processor.internals.StreamTask.maybePunctuateSystemTime(StreamTask.java:868)}} {{ at org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.punctuate(AssignedStreamsTasks.java:502)}} {{ at org.apache.kafka.streams.processor.internals.TaskManager.punctuate(TaskManager.java:557)}} {{ at org.apache.kafka.streams.processor.internals.StreamThread.maybePunctuate(StreamThread.java:951)}} {{ at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:823)}} {{ at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:697)}} {{ at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:670)}} -- This message was sent by Atlassian Jira (v8.3.4#803005)