Hi all,
I write here because it's a couple of days I'm struggling trying to
understand why I've so much duplicates during the messages processing with
kafka streams.
I've a input topic where I'm 100% sure there are no duplicate keys or
messages,
During the process I've to aggregate the messages using
groupByKey, windowedBy and aggregate:
.map((v1, v2) -> {
Long currentSecond = System.currentTimeMillis() / 500;
return new KeyValue<>(currentSecond.toString(), v2);
})
.groupByKey(Serialized.with(Serdes.String(), new
JsonSerde()))
.windowedBy(TimeWindows.of(500))
.aggregate(() -> new ArrayList<StreamEntry<String,
JsonNode>>(),
(aggKey, newValue, aggValue) -> {
final StreamEntry<String, JsonNode>
kvSimpleEntry = new StreamEntry<>(aggKey, newValue);
aggValue.add(kvSimpleEntry);
return aggValue;
}, Materialized.with(Serdes.String(), new
ListKVJsonNodeSerde()))
Even during this process I'm 100% sure there are no duplicates, but
surprisingly after this I see that mapValues can be called with the same
messages more than once. Even hundred of times.
.mapValues(vv -> {
// here the list vv contains the many
....
})
Looking around I've found this project that seems to reproduce the problem:
https://github.com/westec/ks-aggregate-debug
Given that I am using non-overlapping gap-less windows in kstream, the
correct output should NOT contain duplicate messages between windows?
Any ideas why the duplicates?
--
Vincenzo D'Amore