[ 
https://issues.apache.org/jira/browse/KAFKA-8884?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Antony Stubbs updated KAFKA-8884:
---------------------------------
    Description: 
[https://github.com/apache/kafka/pull/7309]

 

If a processor causes a class cast exception, atm you get a bit of a cryptic 
error if you're not used to them, and without a context sensitive suggestion as 
to what could be wrong. Often these can be cause by missconfigured Serdes 
(defaults).

 

As an example of the improvement over the case exception:

{{{{ }}{{org.apache.kafka.streams.errors.StreamsException: Exception caught in 
process. taskId=0_0, processor=KSTREAM-SOURCE-0000000000, 
topic=streams-plaintext-input, partition=0, offset=0, 
stacktrace=org.apache.kafka.streams.errors.StreamsException: ClassCastException 
invoking Processor. Do the Processor's input types match the deserialized 
types? Check the Serde setup and change the default Serdes in StreamConfig or 
provide correct Serdes via method parameters. Deserialized input types are: 
key: [B, value: [B}}}}
 \{{ {{ at 
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:123)}}}}
 \{{ {{ at 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201)}}}}
 \{{ {{ at 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180)}}}}
 \{{ {{ at 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133)}}}}
 \{{ {{ at 
org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:87)}}}}
 \{{ {{ at 
org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:366)}}}}
 \{{ {{ at 
org.apache.kafka.streams.TopologyTestDriver.pipeInput(TopologyTestDriver.java:419)}}}}
 \{{ {{ at 
org.apache.kafka.streams.processor.internals.ProcessorNodeTest.testTopologyLevelClassCastException(ProcessorNodeTest.java:176)}}}}
 \{{ {{ at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
Method)}}}}
 \{{ {{ at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)}}}}
 \{{ {{ at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)}}}}
 \{{ {{ at java.base/java.lang.reflect.Method.invoke(Method.java:566)}}}}
 \{{ {{ at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)}}}}
 \{{ {{ at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)}}}}
 \{{ {{ at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)}}}}
 \{{ {{ at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)}}}}
 \{{ {{ at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:305)}}}}
 \{{ {{ at 
org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)}}}}
 \{{ {{ at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:365)}}}}
 \{{ {{ at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)}}}}
 \{{ {{ at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)}}}}
 \{{ {{ at org.junit.runners.ParentRunner$4.run(ParentRunner.java:330)}}}}
 \{{ {{ at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:78)}}}}
 \{{ {{ at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:328)}}}}
 \{{ {{ at org.junit.runners.ParentRunner.access$100(ParentRunner.java:65)}}}}
 \{{ {{ at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:292)}}}}
 \{{ {{ at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:305)}}}}
 \{{ {{ at org.junit.runners.ParentRunner.run(ParentRunner.java:412)}}}}
 \{{ {{ at org.junit.runner.JUnitCore.run(JUnitCore.java:137)}}}}
 \{{ {{ at 
com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)}}}}
 \{{ {{ at 
com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47)}}}}
 \{{ {{ at 
com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242)}}}}
 \{{ {{ at 
com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)}}}}
 \{{ {{ Caused by: java.lang.ClassCastException: class [B cannot be cast to 
class java.lang.String ([B and java.lang.String are in module java.base of 
loader 'bootstrap')}}}}
 \{{ {{ at 
org.apache.kafka.streams.kstream.internals.AbstractStream.lambda$withKey$1(AbstractStream.java:103)}}}}
 \{{ {{ at 
org.apache.kafka.streams.kstream.internals.KStreamFlatMapValues$KStreamFlatMapValuesProcessor.process(KStreamFlatMapValues.java:40)}}}}
 \{{ {{ at 
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:119)}}}}
 {{ {

{ ... 32 more}

}}}

  was:
If a processor causes a class cast exception, atm you get a bit of a cryptic 
error if you're not used to them, and without a context sensitive suggestion as 
to what could be wrong. Often these can be cause by missconfigured Serdes 
(defaults).

 

As an example of the improvement over the case exception:

{{{{ }}{{org.apache.kafka.streams.errors.StreamsException: Exception caught in 
process. taskId=0_0, processor=KSTREAM-SOURCE-0000000000, 
topic=streams-plaintext-input, partition=0, offset=0, 
stacktrace=org.apache.kafka.streams.errors.StreamsException: ClassCastException 
invoking Processor. Do the Processor's input types match the deserialized 
types? Check the Serde setup and change the default Serdes in StreamConfig or 
provide correct Serdes via method parameters. Deserialized input types are: 
key: [B, value: [B}}}}
{{ \{{ at 
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:123)}}}}
{{ \{{ at 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201)}}}}
{{ \{{ at 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180)}}}}
{{ \{{ at 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133)}}}}
{{ \{{ at 
org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:87)}}}}
{{ \{{ at 
org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:366)}}}}
{{ \{{ at 
org.apache.kafka.streams.TopologyTestDriver.pipeInput(TopologyTestDriver.java:419)}}}}
{{ \{{ at 
org.apache.kafka.streams.processor.internals.ProcessorNodeTest.testTopologyLevelClassCastException(ProcessorNodeTest.java:176)}}}}
{{ \{{ at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
Method)}}}}
{{ \{{ at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)}}}}
{{ \{{ at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)}}}}
{{ \{{ at java.base/java.lang.reflect.Method.invoke(Method.java:566)}}}}
{{ \{{ at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)}}}}
{{ \{{ at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)}}}}
{{ \{{ at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)}}}}
{{ \{{ at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)}}}}
{{ \{{ at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:305)}}}}
{{ \{{ at 
org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)}}}}
{{ \{{ at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:365)}}}}
{{ \{{ at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)}}}}
{{ \{{ at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)}}}}
{{ \{{ at org.junit.runners.ParentRunner$4.run(ParentRunner.java:330)}}}}
{{ \{{ at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:78)}}}}
{{ \{{ at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:328)}}}}
{{ \{{ at org.junit.runners.ParentRunner.access$100(ParentRunner.java:65)}}}}
{{ \{{ at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:292)}}}}
{{ \{{ at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:305)}}}}
{{ \{{ at org.junit.runners.ParentRunner.run(ParentRunner.java:412)}}}}
{{ \{{ at org.junit.runner.JUnitCore.run(JUnitCore.java:137)}}}}
{{ \{{ at 
com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)}}}}
{{ \{{ at 
com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47)}}}}
{{ \{{ at 
com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242)}}}}
{{ \{{ at 
com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)}}}}
{{ \{{ Caused by: java.lang.ClassCastException: class [B cannot be cast to 
class java.lang.String ([B and java.lang.String are in module java.base of 
loader 'bootstrap')}}}}
{{ \{{ at 
org.apache.kafka.streams.kstream.internals.AbstractStream.lambda$withKey$1(AbstractStream.java:103)}}}}
{{ \{{ at 
org.apache.kafka.streams.kstream.internals.KStreamFlatMapValues$KStreamFlatMapValuesProcessor.process(KStreamFlatMapValues.java:40)}}}}
{{ \{{ at 
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:119)}}}}
{{ \{{ ... 32 more}}}}


> Improve streams errors on class cast exception in ProcessorsNodes
> -----------------------------------------------------------------
>
>                 Key: KAFKA-8884
>                 URL: https://issues.apache.org/jira/browse/KAFKA-8884
>             Project: Kafka
>          Issue Type: Improvement
>          Components: streams
>    Affects Versions: 2.3.1
>            Reporter: Antony Stubbs
>            Priority: Minor
>
> [https://github.com/apache/kafka/pull/7309]
>  
> If a processor causes a class cast exception, atm you get a bit of a cryptic 
> error if you're not used to them, and without a context sensitive suggestion 
> as to what could be wrong. Often these can be cause by missconfigured Serdes 
> (defaults).
>  
> As an example of the improvement over the case exception:
> {{{{ }}{{org.apache.kafka.streams.errors.StreamsException: Exception caught 
> in process. taskId=0_0, processor=KSTREAM-SOURCE-0000000000, 
> topic=streams-plaintext-input, partition=0, offset=0, 
> stacktrace=org.apache.kafka.streams.errors.StreamsException: 
> ClassCastException invoking Processor. Do the Processor's input types match 
> the deserialized types? Check the Serde setup and change the default Serdes 
> in StreamConfig or provide correct Serdes via method parameters. Deserialized 
> input types are: key: [B, value: [B}}}}
>  \{{ {{ at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:123)}}}}
>  \{{ {{ at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201)}}}}
>  \{{ {{ at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180)}}}}
>  \{{ {{ at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133)}}}}
>  \{{ {{ at 
> org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:87)}}}}
>  \{{ {{ at 
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:366)}}}}
>  \{{ {{ at 
> org.apache.kafka.streams.TopologyTestDriver.pipeInput(TopologyTestDriver.java:419)}}}}
>  \{{ {{ at 
> org.apache.kafka.streams.processor.internals.ProcessorNodeTest.testTopologyLevelClassCastException(ProcessorNodeTest.java:176)}}}}
>  \{{ {{ at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
> Method)}}}}
>  \{{ {{ at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)}}}}
>  \{{ {{ at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)}}}}
>  \{{ {{ at java.base/java.lang.reflect.Method.invoke(Method.java:566)}}}}
>  \{{ {{ at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)}}}}
>  \{{ {{ at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)}}}}
>  \{{ {{ at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)}}}}
>  \{{ {{ at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)}}}}
>  \{{ {{ at 
> org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:305)}}}}
>  \{{ {{ at 
> org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)}}}}
>  \{{ {{ at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:365)}}}}
>  \{{ {{ at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)}}}}
>  \{{ {{ at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)}}}}
>  \{{ {{ at org.junit.runners.ParentRunner$4.run(ParentRunner.java:330)}}}}
>  \{{ {{ at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:78)}}}}
>  \{{ {{ at 
> org.junit.runners.ParentRunner.runChildren(ParentRunner.java:328)}}}}
>  \{{ {{ at org.junit.runners.ParentRunner.access$100(ParentRunner.java:65)}}}}
>  \{{ {{ at 
> org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:292)}}}}
>  \{{ {{ at 
> org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:305)}}}}
>  \{{ {{ at org.junit.runners.ParentRunner.run(ParentRunner.java:412)}}}}
>  \{{ {{ at org.junit.runner.JUnitCore.run(JUnitCore.java:137)}}}}
>  \{{ {{ at 
> com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)}}}}
>  \{{ {{ at 
> com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47)}}}}
>  \{{ {{ at 
> com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242)}}}}
>  \{{ {{ at 
> com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)}}}}
>  \{{ {{ Caused by: java.lang.ClassCastException: class [B cannot be cast to 
> class java.lang.String ([B and java.lang.String are in module java.base of 
> loader 'bootstrap')}}}}
>  \{{ {{ at 
> org.apache.kafka.streams.kstream.internals.AbstractStream.lambda$withKey$1(AbstractStream.java:103)}}}}
>  \{{ {{ at 
> org.apache.kafka.streams.kstream.internals.KStreamFlatMapValues$KStreamFlatMapValuesProcessor.process(KStreamFlatMapValues.java:40)}}}}
>  \{{ {{ at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:119)}}}}
>  {{ {
> { ... 32 more}
> }}}



--
This message was sent by Atlassian Jira
(v8.3.2#803003)

Reply via email to