[ 
https://issues.apache.org/jira/browse/KAFKA-8905?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Eduard Wirch resolved KAFKA-8905.
---------------------------------
    Resolution: Not A Problem

> Stream DSL: tasks should take serdes from upstream tasks
> --------------------------------------------------------
>
>                 Key: KAFKA-8905
>                 URL: https://issues.apache.org/jira/browse/KAFKA-8905
>             Project: Kafka
>          Issue Type: Improvement
>          Components: streams
>    Affects Versions: 2.3.0
>            Reporter: Eduard Wirch
>            Priority: Major
>              Labels: usability
>
> {code:java}
> final Properties props = new Properties();
> props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wordcount");
> props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
> props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
> final StreamsBuilder builder = new StreamsBuilder();
> final KStream<String, String> source = builder.stream(
>       "streams-plaintext-input",
>       Consumed.with(Serdes.String(), Serdes.String())
> );
> final KTable<String, Long> counts = source
>       .flatMapValues(value -> 
> Arrays.asList(value.toLowerCase(Locale.getDefault()).split(" ")))
>       .groupBy(
>             (key, value) -> value
>       )
>       .count();
> // need to override value serde to Long type
> counts.toStream().to("streams-wordcount-output", 
> Produced.with(Serdes.String(), Serdes.Long()));
> final KafkaStreams streams = new KafkaStreams(builder.build(), props);{code}
> Original code taken from code sample 
> [https://github.com/apache/kafka/blob/2.3/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountDemo.java]
> I removed the {{DEFAULT_KEY_SERDE_CLASS_CONFIG}} and 
> {{DEFAULT_VALUE_SERDE_CLASS_CONFIG}} settings to make my point clear. This 
> application will fail:
> {code:java}
> Caused by: java.lang.ClassCastException: java.lang.String incompatible with 
> [BCaused by: java.lang.ClassCastException: java.lang.String incompatible with 
> [B at 
> org.apache.kafka.common.serialization.ByteArraySerializer.serialize(ByteArraySerializer.java:19)
>  at 
> org.apache.kafka.common.serialization.Serializer.serialize(Serializer.java:62)
>  at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:161)
>  at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:102)
>  at 
> org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:89)
>  {code}
> Adjusting this part of the code:
> {code:java}
> .groupBy(
>       (key, value) -> value,
>       Grouped.with(Serdes.String(), Serdes.String())
> ) {code}
> Will make the application run properly. 
> This explicit serde specification is unnecessarily, since the serde are 
> already known from upstream source task. Relying on default serde works in 
> this simple example, but fails for more complex scenarios.
> Please make the DSL more usable by taking the serde configuration from 
> upstream tasks.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)

Reply via email to