Eduard Wirch created KAFKA-8905: ----------------------------------- Summary: Stream DSL: tasks should take serdes from upstream tasks Key: KAFKA-8905 URL: https://issues.apache.org/jira/browse/KAFKA-8905 Project: Kafka Issue Type: Improvement Components: streams Affects Versions: 2.3.0 Reporter: Eduard Wirch
{code:java} final Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wordcount"); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); final StreamsBuilder builder = new StreamsBuilder(); final KStream<String, String> source = builder.stream( "streams-plaintext-input", Consumed.with(Serdes.String(), Serdes.String()) ); final KTable<String, Long> counts = source .flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.getDefault()).split(" "))) .groupBy( (key, value) -> value ) .count(); // need to override value serde to Long type counts.toStream().to("streams-wordcount-output", Produced.with(Serdes.String(), Serdes.Long())); final KafkaStreams streams = new KafkaStreams(builder.build(), props);{code} Original code taken from code sample [https://github.com/apache/kafka/blob/2.3/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountDemo.java] I removed the {{DEFAULT_KEY_SERDE_CLASS_CONFIG}} and {{DEFAULT_VALUE_SERDE_CLASS_CONFIG}} settings to make my point clear. This application will fail: {code:java} Caused by: java.lang.ClassCastException: java.lang.String incompatible with [BCaused by: java.lang.ClassCastException: java.lang.String incompatible with [B at org.apache.kafka.common.serialization.ByteArraySerializer.serialize(ByteArraySerializer.java:19) at org.apache.kafka.common.serialization.Serializer.serialize(Serializer.java:62) at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:161) at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:102) at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:89) {code} Adjusting this part of the code: {code:java} .groupBy( (key, value) -> value, Grouped.with(Serdes.String(), Serdes.String()) ) {code} Will make the application run properly. This explicit serde specification is unnecessarily, since the serde are already known from upstream source task. Relying on default serde works in this simple example, but fails for more complex scenarios. Please make the DSL more usable by taking the serde configuration from upstream tasks. -- This message was sent by Atlassian Jira (v8.3.2#803003)