Hi,
I want to implement wordcount example with reduce function in KTable.
However, I get the error:
Exception in thread "StreamThread-1"
org.apache.kafka.common.errors.SerializationException: Size of data
received by LongDeserializer is not 8
Here is my code:
KTable<Long, String> source = builder.table("topic1"); // here we
have WordID and Word itself
KTable<String, Long> counts = source.reduce(new Reducer<Long>() {
@Override
public Long apply(Long value1, Long value2) {
// TODO Auto-generated method stub
return value1+value2;
}
},
new Reducer<Long>() {
@Override
public Long apply(Long value1, Long value2) {
// TODO Auto-generated method stub
return value1-value2;
}
}
, new KeyValueMapper<Long, String, KeyValue<String,Long>>() {
@Override
public KeyValue<String, Long> apply(Long key, String value) {
// TODO Auto-generated method stub
return new KeyValue<String, Long>(value, new Long(1));
}
}, stringSerde, longSerde, "count");
counts.to(Serdes.String(), Serdes.Long(), "topic2");
KafkaStreams streams = new KafkaStreams(builder, props);
streams.start();
Moreover, I think the error messages should be more informative to better
deal with such situations.
- Adrienne