I am not 100% sure, what you mean by

>> I've a input topic where I'm 100% sure there are no duplicate keys or 
>> messaged

If this is the case (ie, each key is unique), it would imply that each
window contains exactly one record per key. Hence, why do you aggregate?
Each aggregate would consist of only one message making an aggregation
step unnecessary.

Can you be a little bit more specific and provide a sample input
(key,value,timestamp), observed output, and expected output?

I suspect (but I am not sure), that you might "struggle" with Kafka
Streams' continuous output model. Maybe this blog post sheds some light:
https://www.confluent.io/blog/watermarks-tables-event-time-dataflow-model/


-Matthias

On 1/25/19 9:31 AM, Vincenzo D'Amore wrote:
> Hi all,
> 
> I write here because it's a couple of days I'm struggling trying to
> understand why I've so much duplicates during the messages processing with
> kafka streams.
> 
> I've a input topic where I'm 100% sure there are no duplicate keys or
> messages,
> 
> During the process I've to aggregate the messages using
> groupByKey, windowedBy and aggregate:
> 
>                 .map((v1, v2) -> {
>                     Long currentSecond = System.currentTimeMillis() / 500;
>                     return new KeyValue<>(currentSecond.toString(), v2);
>                 })
>                 .groupByKey(Serialized.with(Serdes.String(), new
> JsonSerde()))
>                 .windowedBy(TimeWindows.of(500))
>                 .aggregate(() -> new ArrayList<StreamEntry<String,
> JsonNode>>(),
>                         (aggKey, newValue, aggValue) -> {
>                             final StreamEntry<String, JsonNode>
> kvSimpleEntry = new StreamEntry<>(aggKey, newValue);
>                             aggValue.add(kvSimpleEntry);
>                             return aggValue;
>                         }, Materialized.with(Serdes.String(), new
> ListKVJsonNodeSerde()))
> 
> Even during this process I'm 100% sure there are no duplicates, but
> surprisingly after this I see that mapValues can be called with the same
> messages more  than once. Even hundred of times.
> 
>                .mapValues(vv -> {
>                    // here the list vv contains the many
>                    ....
>                })
> 
> Looking around I've found this project that seems to reproduce the problem:
> https://github.com/westec/ks-aggregate-debug
> 
> Given that I am using non-overlapping gap-less windows in kstream, the
> correct output should NOT contain duplicate messages between windows?
> Any ideas why the duplicates?
> 
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to