[ 
https://issues.apache.org/jira/browse/KAFKA-7066?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16517550#comment-16517550
 ] 

ASF GitHub Bot commented on KAFKA-7066:
---------------------------------------

guozhangwang closed pull request #5239: KAFKA-7066 added better logging in case 
of Serialisation issue
URL: https://github.com/apache/kafka/pull/5239
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/StateSerdes.java 
b/streams/src/main/java/org/apache/kafka/streams/state/StateSerdes.java
index f1de82fd199..ec7803a7a07 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/StateSerdes.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/StateSerdes.java
@@ -20,6 +20,7 @@
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.streams.errors.StreamsException;
 
 import java.util.Objects;
 
@@ -165,7 +166,18 @@ public V valueFrom(byte[] rawValue) {
      * @return     the serialized key
      */
     public byte[] rawKey(K key) {
-        return keySerde.serializer().serialize(topic, key);
+        try {
+            return keySerde.serializer().serialize(topic, key);
+        } catch (final ClassCastException e) {
+            final String keyClass = key == null ? "unknown because key is 
null" : key.getClass().getName();
+            throw new StreamsException(
+                    String.format("A serializer (key: %s) is not compatible to 
the actual key type " +
+                                    "(key type: %s). Change the default Serdes 
in StreamConfig or " +
+                                    "provide correct Serdes via method 
parameters.",
+                            keySerializer().getClass().getName(),
+                            keyClass),
+                    e);
+        }
     }
 
     /**
@@ -175,6 +187,17 @@ public V valueFrom(byte[] rawValue) {
      * @return       the serialized value
      */
     public byte[] rawValue(V value) {
-        return valueSerde.serializer().serialize(topic, value);
+        try {
+            return valueSerde.serializer().serialize(topic, value);
+        } catch (final ClassCastException e) {
+            final String valueClass = value == null ? "unknown because value 
is null" : value.getClass().getName();
+            throw new StreamsException(
+                    String.format("A serializer (value: %s) is not compatible 
to the actual value type " +
+                                    "(value type: %s). Change the default 
Serdes in StreamConfig or " +
+                                    "provide correct Serdes via method 
parameters.",
+                            valueSerializer().getClass().getName(),
+                            valueClass),
+                    e);
+        }
     }
 }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/StateSerdesTest.java 
b/streams/src/test/java/org/apache/kafka/streams/state/StateSerdesTest.java
index 6f298886bc7..714ce187500 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/StateSerdesTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/StateSerdesTest.java
@@ -18,6 +18,7 @@
 
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.errors.StreamsException;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -86,4 +87,20 @@ public void shouldThrowIfValueClassIsNull() {
         new StateSerdes<>("anyName", Serdes.ByteArray(), null);
     }
 
+    @Test(expected = StreamsException.class)
+    public void shouldThrowIfIncompatibleSerdeForValue() throws 
ClassNotFoundException {
+        Class myClass = Class.forName("java.lang.String");
+        StateSerdes<Object, Object> stateSerdes = new StateSerdes<Object, 
Object>("anyName", Serdes.serdeFrom(myClass), Serdes.serdeFrom(myClass));
+        Integer myInt = 123;
+        stateSerdes.rawValue(myInt);
+    }
+
+    @Test(expected = StreamsException.class)
+    public void shouldThrowIfIncompatibleSerdeForKey() throws 
ClassNotFoundException {
+        Class myClass = Class.forName("java.lang.String");
+        StateSerdes<Object, Object> stateSerdes = new StateSerdes<Object, 
Object>("anyName", Serdes.serdeFrom(myClass), Serdes.serdeFrom(myClass));
+        Integer myInt = 123;
+        stateSerdes.rawKey(myInt);
+    }
+
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Make Streams Runtime Error User Friendly in Case of Serialisation exception
> ---------------------------------------------------------------------------
>
>                 Key: KAFKA-7066
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7066
>             Project: Kafka
>          Issue Type: Improvement
>          Components: streams
>    Affects Versions: 1.1.0
>            Reporter: Stephane Maarek
>            Assignee: Stephane Maarek
>            Priority: Major
>             Fix For: 2.0.0
>
>
> This kind of exception can be cryptic for the beginner:
> {code:java}
> ERROR stream-thread 
> [favourite-colors-application-a336770d-6ba6-4bbb-8681-3c8ea91bd12e-StreamThread-1]
>  Failed to process stream task 2_0 due to the following error: 
> (org.apache.kafka.streams.processor.internals.AssignedStreamsTasks:105)
> java.lang.ClassCastException: java.lang.Long cannot be cast to 
> java.lang.String
> at 
> org.apache.kafka.common.serialization.StringSerializer.serialize(StringSerializer.java:28)
> at org.apache.kafka.streams.state.StateSerdes.rawValue(StateSerdes.java:178)
> at 
> org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore$1.innerValue(MeteredKeyValueBytesStore.java:66)
> at 
> org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore$1.innerValue(MeteredKeyValueBytesStore.java:57)
> at 
> org.apache.kafka.streams.state.internals.InnerMeteredKeyValueStore.put(InnerMeteredKeyValueStore.java:198)
> at 
> org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore.put(MeteredKeyValueBytesStore.java:117)
> at 
> org.apache.kafka.streams.kstream.internals.KTableAggregate$KTableAggregateProcessor.process(KTableAggregate.java:95)
> at 
> org.apache.kafka.streams.kstream.internals.KTableAggregate$KTableAggregateProcessor.process(KTableAggregate.java:56)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:46)
> at 
> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:124)
> at 
> org.apache.kafka.streams.processor.internals.AbstractProcessorContext.forward(AbstractProcessorContext.java:174)
> at 
> org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:80)
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:224)
> at 
> org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:94)
> at 
> org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:411)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.processAndMaybeCommit(StreamThread.java:918)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:798)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:750)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:720){code}
> We should add more detailed logging already present in SinkNode to assist the 
> user into solving this issue



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to