[ 
https://issues.apache.org/jira/browse/KAFKA-12963?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17406600#comment-17406600
 ] 

Rasmus Helbig Hansen commented on KAFKA-12963:
----------------------------------------------

[~ableegoldman] first of all thanks for looking into this and providing a patch.

I do understand that the data flowing into a processor doesn't need to have 
originated from a topic. A punctuator could be sending messages, as an example.

However, in my particular case, where we (mostly) use the high level API, I was 
naively thinking that e.g.
{code:java}
.selectKey((k, v) -> new MyKey(v.getLocation())
.leftJoin(businessCentres, ...)
{code}
 

should be able to say something about the underlying topic, rather than the 
programmer inspecting the low level topology, in order to make a qualified 
guess at where the type misalignment might be.

The inclusion of the processor name is fine for me. I'm happy with that. But I 
can imagine that a class cast exception of types A and B for processor 
KSTREAM-TRANSFORM-0000000016 would be a bit of a head-scratcher for programmers 
getting started with Streams.

 

 

> Improve error message for Class cast exception
> ----------------------------------------------
>
>                 Key: KAFKA-12963
>                 URL: https://issues.apache.org/jira/browse/KAFKA-12963
>             Project: Kafka
>          Issue Type: Improvement
>          Components: streams
>    Affects Versions: 2.7.0
>            Reporter: Rasmus Helbig Hansen
>            Assignee: Andrew Lapidas
>            Priority: Minor
>             Fix For: 3.1.0
>
>
> After a topology change and starting the application again, we got this type 
> of error message:
>  [g9z-StreamThread-1] ERROR 
> org.apache.kafka.streams.processor.internals.TaskManager  - stream-thread 
> [g9z-StreamThread-1] Failed to process stream task 1_12 due to the following 
> error:
>  org.apache.kafka.streams.errors.StreamsException: ClassCastException 
> invoking Processor. Do the Processor's input types match the deserialized 
> types? Check the Serde setup and change the default Serdes in StreamConfig or 
> provide correct Serdes via method parameters. Make sure the Processor can 
> accept the deserialized input of type key: org.acme.SomeKey, and value: 
> org.acme.SomeValue.
>  Note that although incorrect Serdes are a common cause of error, the cast 
> exception might have another cause (in user code, for example). For example, 
> if a processor wires in a store, but casts the generics incorrectly, a class 
> cast exception could be raised during processing, but the cause would not be 
> wrong Serdes.
>      at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:185)
>      at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:273)
>      at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:252)
>      at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219)
>      at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:172)
>      at 
> org.apache.kafka.streams.kstream.internals.KStreamJoinWindow$KStreamJoinWindowProcessor.process(KStreamJoinWindow.java:55)
>      at 
> org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71)
>      at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181)
>      at 
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:883)
>      at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181)
>      at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:273)
>      at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:252)
>      at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219)
>      at 
> org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:86)
>      at 
> org.apache.kafka.streams.processor.internals.StreamTask.lambda$process$1(StreamTask.java:703)
>      at 
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:883)
>      at 
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:703)
>      at 
> org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1105)
>      at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:647)
>      at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:553)
>      at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:512)
>  Caused by: java.lang.ClassCastException: class org.acme.SomeValue cannot be 
> cast to class org.acme.OtherValue (org.acme.SomeValue and org.acme.OtherValue 
> are in unnamed module of loader 'app')
>      at 
> org.apache.kafka.streams.kstream.internals.KStreamKStreamJoin$KStreamKStreamJoinProcessor.process(KStreamKStreamJoin.java:112)
>      at 
> org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71)
>      at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181)
>      at 
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:883)
>      at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181)
>      ... 20 more
>  [g9z-StreamThread-1] ERROR 
> org.apache.kafka.streams.processor.internals.StreamThread  - stream-thread 
> [g9z-StreamThread-1] Encountered the following exception during processing 
> and the thread is going to shut down: 
>  org.apache.kafka.streams.errors.StreamsException: ClassCastException 
> invoking Processor. Do the Processor's input types match the deserialized 
> types? Check the Serde setup and change the default Serdes in StreamConfig or 
> provide correct Serdes via method parameters. Make sure the Processor can 
> accept the deserialized input of type key: org.acme.SomeKey, and value: 
> org.acme.SomeValue.
>  Note that although incorrect Serdes are a common cause of error, the cast 
> exception might have another cause (in user code, for example). For example, 
> if a processor wires in a store, but casts the generics incorrectly, a class 
> cast exception could be raised during processing, but the cause would not be 
> wrong Serdes.
>  
>  It doesn't give enough context, like processor name and topic, which made 
> troubleshooting unnecessary tricky.
>   
>  Very similar to KAFKA-8884 which was fixed in 2.4.0. It seems like a 
> regression.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to