Hi all,

I write here because it's a couple of days I'm struggling trying to
understand why I've so much duplicates during the messages processing with
kafka streams.

I've a input topic where I'm 100% sure there are no duplicate keys or
messages,

During the process I've to aggregate the messages using
groupByKey, windowedBy and aggregate:

                .map((v1, v2) -> {
                    Long currentSecond = System.currentTimeMillis() / 500;
                    return new KeyValue<>(currentSecond.toString(), v2);
                })
                .groupByKey(Serialized.with(Serdes.String(), new
JsonSerde()))
                .windowedBy(TimeWindows.of(500))
                .aggregate(() -> new ArrayList<StreamEntry<String,
JsonNode>>(),
                        (aggKey, newValue, aggValue) -> {
                            final StreamEntry<String, JsonNode>
kvSimpleEntry = new StreamEntry<>(aggKey, newValue);
                            aggValue.add(kvSimpleEntry);
                            return aggValue;
                        }, Materialized.with(Serdes.String(), new
ListKVJsonNodeSerde()))

Even during this process I'm 100% sure there are no duplicates, but
surprisingly after this I see that mapValues can be called with the same
messages more  than once. Even hundred of times.

               .mapValues(vv -> {
                   // here the list vv contains the many
                   ....
               })

Looking around I've found this project that seems to reproduce the problem:
https://github.com/westec/ks-aggregate-debug

Given that I am using non-overlapping gap-less windows in kstream, the
correct output should NOT contain duplicate messages between windows?
Any ideas why the duplicates?


-- 
Vincenzo D'Amore

Reply via email to