[ https://issues.apache.org/jira/browse/KAFKA-12396?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Matthias J. Sax reassigned KAFKA-12396: --------------------------------------- Assignee: Valery Kokorev > Dedicated exception for kstreams when null key received > ------------------------------------------------------- > > Key: KAFKA-12396 > URL: https://issues.apache.org/jira/browse/KAFKA-12396 > Project: Kafka > Issue Type: Improvement > Components: streams > Affects Versions: 2.6.0 > Reporter: Veniamin Kalegin > Assignee: Valery Kokorev > Priority: Trivial > Labels: beginner, newbie > > If kstreams application received null as a key (thanks to QA), kstream app > gives long and confusing stack trace, it would be nice to have shorter and > specific exception instead of > {{org.apache.kafka.streams.errors.StreamsException: Exception caught in > process. taskId=0_0, processor=KSTREAM-SOURCE-0000000000, topic=(hidden), > partition=0, offset=3722, stacktrace=java.lang.NullPointerException}} > at > org.apache.kafka.streams.state.internals.RocksDBStore.get(RocksDBStore.java:286) > at > org.apache.kafka.streams.state.internals.RocksDBStore.get(RocksDBStore.java:74) > at > org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.get(ChangeLoggingKeyValueBytesStore.java:94) > at > org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.get(ChangeLoggingKeyValueBytesStore.java:29) > at > org.apache.kafka.streams.state.internals.MeteredKeyValueStore.lambda$get$2(MeteredKeyValueStore.java:133) > at > org.apache.kafka.streams.state.internals.MeteredKeyValueStore$$Lambda$1048/0x0000000060630fd0.get(Unknown > Source) > at > org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:851) > at > org.apache.kafka.streams.state.internals.MeteredKeyValueStore.get(MeteredKeyValueStore.java:133) > at > org.apache.kafka.streams.processor.internals.AbstractReadWriteDecorator$KeyValueStoreReadWriteDecorator.get(AbstractReadWriteDecorator.java:78) > at > org.apache.kafka.streams.kstream.internals.KStreamTransformValues$KStreamTransformValuesProcessor.process(KStreamTransformValues.java:64) > at > org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142) > at > org.apache.kafka.streams.processor.internals.ProcessorNode$$Lambda$1047/0x0000000060630b10.run(Unknown > Source) > at > org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836) > at > org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:236) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:216) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:168) > at > org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:96) > at > org.apache.kafka.streams.processor.internals.StreamTask.lambda$process$1(StreamTask.java:679) > at > org.apache.kafka.streams.processor.internals.StreamTask$$Lambda$1046/0x00000000605250f0.run(Unknown > Source) > at > org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836) > at > org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:679) > at > org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1033) > at > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:690) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:551) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:510) > at > org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:696) > at > org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1033) > at > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:690) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:551) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:510) > Caused by: java.lang.NullPointerException > at > org.apache.kafka.streams.state.internals.RocksDBStore.get(RocksDBStore.java:286) > at > org.apache.kafka.streams.state.internals.RocksDBStore.get(RocksDBStore.java:74) > at > org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.get(ChangeLoggingKeyValueBytesStore.java:94) > at > org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.get(ChangeLoggingKeyValueBytesStore.java:29) > at > org.apache.kafka.streams.state.internals.MeteredKeyValueStore.lambda$get$2(MeteredKeyValueStore.java:133) > at > org.apache.kafka.streams.state.internals.MeteredKeyValueStore$$Lambda$1048/0x0000000060630fd0.get(Unknown > Source) > at > org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:851) > at > org.apache.kafka.streams.state.internals.MeteredKeyValueStore.get(MeteredKeyValueStore.java:133) > at > org.apache.kafka.streams.processor.internals.AbstractReadWriteDecorator$KeyValueStoreReadWriteDecorator.get(AbstractReadWriteDecorator.java:78) > at > HPX.Utilities.Java.Streams.MessageProcessing.ErrorHandlingAggregateTransformer.transform(ErrorHandlingAggregateTransformer.java:103) > at > HPX.Utilities.Java.Streams.MessageProcessing.ErrorHandlingAggregateTransformer.transform(ErrorHandlingAggregateTransformer.java:29) > at > org.apache.kafka.streams.kstream.internals.KStreamTransformValues$KStreamTransformValuesProcessor.process(KStreamTransformValues.java:64) > at > org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142) > at > org.apache.kafka.streams.processor.internals.ProcessorNode$$Lambda$1047/0x0000000060630b10.run(Unknown > Source) > at > org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836) > at > org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:236) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:216) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:168) > at > org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:96) > at > org.apache.kafka.streams.processor.internals.StreamTask.lambda$process$1(StreamTask.java:679) > at > org.apache.kafka.streams.processor.internals.StreamTask$$Lambda$1046/0x00000000605250f0.run(Unknown > Source) > at > org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836) > at > org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:679) > ... 4 more -- This message was sent by Atlassian Jira (v8.3.4#803005)