[ https://issues.apache.org/jira/browse/KAFKA-8905?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Eduard Wirch resolved KAFKA-8905. --------------------------------- Resolution: Not A Problem > Stream DSL: tasks should take serdes from upstream tasks > -------------------------------------------------------- > > Key: KAFKA-8905 > URL: https://issues.apache.org/jira/browse/KAFKA-8905 > Project: Kafka > Issue Type: Improvement > Components: streams > Affects Versions: 2.3.0 > Reporter: Eduard Wirch > Priority: Major > Labels: usability > > {code:java} > final Properties props = new Properties(); > props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wordcount"); > props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); > props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); > final StreamsBuilder builder = new StreamsBuilder(); > final KStream<String, String> source = builder.stream( > "streams-plaintext-input", > Consumed.with(Serdes.String(), Serdes.String()) > ); > final KTable<String, Long> counts = source > .flatMapValues(value -> > Arrays.asList(value.toLowerCase(Locale.getDefault()).split(" "))) > .groupBy( > (key, value) -> value > ) > .count(); > // need to override value serde to Long type > counts.toStream().to("streams-wordcount-output", > Produced.with(Serdes.String(), Serdes.Long())); > final KafkaStreams streams = new KafkaStreams(builder.build(), props);{code} > Original code taken from code sample > [https://github.com/apache/kafka/blob/2.3/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountDemo.java] > I removed the {{DEFAULT_KEY_SERDE_CLASS_CONFIG}} and > {{DEFAULT_VALUE_SERDE_CLASS_CONFIG}} settings to make my point clear. This > application will fail: > {code:java} > Caused by: java.lang.ClassCastException: java.lang.String incompatible with > [BCaused by: java.lang.ClassCastException: java.lang.String incompatible with > [B at > org.apache.kafka.common.serialization.ByteArraySerializer.serialize(ByteArraySerializer.java:19) > at > org.apache.kafka.common.serialization.Serializer.serialize(Serializer.java:62) > at > org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:161) > at > org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:102) > at > org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:89) > {code} > Adjusting this part of the code: > {code:java} > .groupBy( > (key, value) -> value, > Grouped.with(Serdes.String(), Serdes.String()) > ) {code} > Will make the application run properly. > This explicit serde specification is unnecessarily, since the serde are > already known from upstream source task. Relying on default serde works in > this simple example, but fails for more complex scenarios. > Please make the DSL more usable by taking the serde configuration from > upstream tasks. -- This message was sent by Atlassian Jira (v8.3.2#803003)