[ https://issues.apache.org/jira/browse/KAFKA-7066?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16517550#comment-16517550 ]
ASF GitHub Bot commented on KAFKA-7066: --------------------------------------- guozhangwang closed pull request #5239: KAFKA-7066 added better logging in case of Serialisation issue URL: https://github.com/apache/kafka/pull/5239 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/streams/src/main/java/org/apache/kafka/streams/state/StateSerdes.java b/streams/src/main/java/org/apache/kafka/streams/state/StateSerdes.java index f1de82fd199..ec7803a7a07 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/StateSerdes.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/StateSerdes.java @@ -20,6 +20,7 @@ import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.Serializer; +import org.apache.kafka.streams.errors.StreamsException; import java.util.Objects; @@ -165,7 +166,18 @@ public V valueFrom(byte[] rawValue) { * @return the serialized key */ public byte[] rawKey(K key) { - return keySerde.serializer().serialize(topic, key); + try { + return keySerde.serializer().serialize(topic, key); + } catch (final ClassCastException e) { + final String keyClass = key == null ? "unknown because key is null" : key.getClass().getName(); + throw new StreamsException( + String.format("A serializer (key: %s) is not compatible to the actual key type " + + "(key type: %s). Change the default Serdes in StreamConfig or " + + "provide correct Serdes via method parameters.", + keySerializer().getClass().getName(), + keyClass), + e); + } } /** @@ -175,6 +187,17 @@ public V valueFrom(byte[] rawValue) { * @return the serialized value */ public byte[] rawValue(V value) { - return valueSerde.serializer().serialize(topic, value); + try { + return valueSerde.serializer().serialize(topic, value); + } catch (final ClassCastException e) { + final String valueClass = value == null ? "unknown because value is null" : value.getClass().getName(); + throw new StreamsException( + String.format("A serializer (value: %s) is not compatible to the actual value type " + + "(value type: %s). Change the default Serdes in StreamConfig or " + + "provide correct Serdes via method parameters.", + valueSerializer().getClass().getName(), + valueClass), + e); + } } } diff --git a/streams/src/test/java/org/apache/kafka/streams/state/StateSerdesTest.java b/streams/src/test/java/org/apache/kafka/streams/state/StateSerdesTest.java index 6f298886bc7..714ce187500 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/StateSerdesTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/StateSerdesTest.java @@ -18,6 +18,7 @@ import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.errors.StreamsException; import org.junit.Assert; import org.junit.Test; @@ -86,4 +87,20 @@ public void shouldThrowIfValueClassIsNull() { new StateSerdes<>("anyName", Serdes.ByteArray(), null); } + @Test(expected = StreamsException.class) + public void shouldThrowIfIncompatibleSerdeForValue() throws ClassNotFoundException { + Class myClass = Class.forName("java.lang.String"); + StateSerdes<Object, Object> stateSerdes = new StateSerdes<Object, Object>("anyName", Serdes.serdeFrom(myClass), Serdes.serdeFrom(myClass)); + Integer myInt = 123; + stateSerdes.rawValue(myInt); + } + + @Test(expected = StreamsException.class) + public void shouldThrowIfIncompatibleSerdeForKey() throws ClassNotFoundException { + Class myClass = Class.forName("java.lang.String"); + StateSerdes<Object, Object> stateSerdes = new StateSerdes<Object, Object>("anyName", Serdes.serdeFrom(myClass), Serdes.serdeFrom(myClass)); + Integer myInt = 123; + stateSerdes.rawKey(myInt); + } + } ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Make Streams Runtime Error User Friendly in Case of Serialisation exception > --------------------------------------------------------------------------- > > Key: KAFKA-7066 > URL: https://issues.apache.org/jira/browse/KAFKA-7066 > Project: Kafka > Issue Type: Improvement > Components: streams > Affects Versions: 1.1.0 > Reporter: Stephane Maarek > Assignee: Stephane Maarek > Priority: Major > Fix For: 2.0.0 > > > This kind of exception can be cryptic for the beginner: > {code:java} > ERROR stream-thread > [favourite-colors-application-a336770d-6ba6-4bbb-8681-3c8ea91bd12e-StreamThread-1] > Failed to process stream task 2_0 due to the following error: > (org.apache.kafka.streams.processor.internals.AssignedStreamsTasks:105) > java.lang.ClassCastException: java.lang.Long cannot be cast to > java.lang.String > at > org.apache.kafka.common.serialization.StringSerializer.serialize(StringSerializer.java:28) > at org.apache.kafka.streams.state.StateSerdes.rawValue(StateSerdes.java:178) > at > org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore$1.innerValue(MeteredKeyValueBytesStore.java:66) > at > org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore$1.innerValue(MeteredKeyValueBytesStore.java:57) > at > org.apache.kafka.streams.state.internals.InnerMeteredKeyValueStore.put(InnerMeteredKeyValueStore.java:198) > at > org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore.put(MeteredKeyValueBytesStore.java:117) > at > org.apache.kafka.streams.kstream.internals.KTableAggregate$KTableAggregateProcessor.process(KTableAggregate.java:95) > at > org.apache.kafka.streams.kstream.internals.KTableAggregate$KTableAggregateProcessor.process(KTableAggregate.java:56) > at > org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:46) > at > org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208) > at > org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:124) > at > org.apache.kafka.streams.processor.internals.AbstractProcessorContext.forward(AbstractProcessorContext.java:174) > at > org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:80) > at > org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:224) > at > org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:94) > at > org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:411) > at > org.apache.kafka.streams.processor.internals.StreamThread.processAndMaybeCommit(StreamThread.java:918) > at > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:798) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:750) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:720){code} > We should add more detailed logging already present in SinkNode to assist the > user into solving this issue -- This message was sent by Atlassian JIRA (v7.6.3#76005)