Hi experts,
I believe to understand there is the need to set the serde for the
Double type after/in the map function for a re-partition task.
I can't figure out where to specified. I've already tried to find the
answer on documentation and article but I failed.
The following code
KStream<String, Triplet<String, Double, String>> sum_data = ...
KStream<String, String> aggr_stream =
sum_data.selectKey((key,value) -> value.getValue0())
.mapValues((key,value) -> value.getValue1())
.groupByKey()
.windowedBy(TimeWindows.of(Duration.ofSeconds(10)))
.reduce((v1,v2) -> v1 + v2 )
.toStream()
.map((key,value) -> new
KeyValue<String,String>(key.toString(),key.toString()+"->"+value.toString()));
produces the a StreamException
Caused by: org.apache.kafka.streams.errors.StreamsException: A
serializer (key: org.apache.kafka.common.serialization.StringSerializer
/ value: org.apache.kafka.common.serialization.StringSerializer) is not
compatible to the actual key or value type (key type: java.lang.String /
value type: java.lang.Double). Change the default Serdes in StreamConfig
or provide correct Serdes via method parameters.
Since the default Serdes are both (key and value ) String
Of course if I force the String type, for example using the following code
KStream<String, String> aggr_stream = sum_data.selectKey((key,value) ->
value.getValue0())
.mapValues((key,value) -> value.getValue1()*.toString(*))
.groupByKey()
.windowedBy(TimeWindows.of(Duration.ofSeconds(10)))
.reduce((v1,v2) -> *new Double( Double.parseDouble(v1) +
Double.parseDouble(v2) ).toString()* )
.toStream()
.map((key,value) -> new
KeyValue<String,String>(key.toString(),key.toString()+"->"+value.toString()));
all works, but of course it's not the best way to do it.
Someone could help me?
Thanks in advance,
Gioacchino