Hi Matthias, thanks for your reply. Let me to explain better what I'm
trying to say, in the meantime I've played with this problem and I think
now I have a more clear view, though I haven't still a solution.

I've an input topic A which is a stream of message where each message
contains just an ID. Those IDs (messages) can be thousands of even millions
but in my test (proof of concept) are all different.

In order to process them, for each one I have to retrive few data that are
stored in a nosql database, as you can understand querying one ID each time
is not a good solution, I mean for performance reason, so I need to
aggregate them and here comes the problem.

So from the source topic A I have created a new topic B where for each
message now has a key which is a number that change X milliseconds (say
500ms).
Now I can have a group by key and an aggregate. I suppose that each list
returned by aggregate() does not contains duplicates.
The output of this aggregate process is saved in the topic C.
Topic C contains arrays of IDs of different size and the key is the number
created to group them.

And here I have my big surprise, in the topic C there are a lot of ID that
are present at the same time in different messages.
Those messages have the same key but arrays of ID with different size, and
each array partially contains ID present in other messages.

I suppose this should be impossible.

So, for example, if I have a stream with the following list of messages:

key - value
--------------
0 - 1
0 - 2
0 - 3
0 - 4
0 - 5
1 - 6
1 - 7
1 - 8
1 - 9
1 - 10

I suppose the groupByKey() and aggregate() should return

key - value
----------------
0 - [1,2,3,4,5]
1 - [6,7,8,9,10]

But instead I found something like:

key - value
----------------
0 - [1,2,3,4,5]
0 - [2,3,4,5]
1 - [6,7,8,9]
1 - [6,7,8,9,10]

So the question is, did I do something wrong trying to aggregate them? how
can avoid those duplicates?


On Sat, Jan 26, 2019 at 9:01 PM Matthias J. Sax <matth...@confluent.io>
wrote:

> I am not 100% sure, what you mean by
>
> >> I've a input topic where I'm 100% sure there are no duplicate keys or
> messaged
>
> If this is the case (ie, each key is unique), it would imply that each
> window contains exactly one record per key. Hence, why do you aggregate?
> Each aggregate would consist of only one message making an aggregation
> step unnecessary.
>
> Can you be a little bit more specific and provide a sample input
> (key,value,timestamp), observed output, and expected output?
>
> I suspect (but I am not sure), that you might "struggle" with Kafka
> Streams' continuous output model. Maybe this blog post sheds some light:
> https://www.confluent.io/blog/watermarks-tables-event-time-dataflow-model/
>
>
> -Matthias
>
> On 1/25/19 9:31 AM, Vincenzo D'Amore wrote:
> > Hi all,
> >
> > I write here because it's a couple of days I'm struggling trying to
> > understand why I've so much duplicates during the messages processing
> with
> > kafka streams.
> >
> > I've a input topic where I'm 100% sure there are no duplicate keys or
> > messages,
> >
> > During the process I've to aggregate the messages using
> > groupByKey, windowedBy and aggregate:
> >
> >                 .map((v1, v2) -> {
> >                     Long currentSecond = System.currentTimeMillis() /
> 500;
> >                     return new KeyValue<>(currentSecond.toString(), v2);
> >                 })
> >                 .groupByKey(Serialized.with(Serdes.String(), new
> > JsonSerde()))
> >                 .windowedBy(TimeWindows.of(500))
> >                 .aggregate(() -> new ArrayList<StreamEntry<String,
> > JsonNode>>(),
> >                         (aggKey, newValue, aggValue) -> {
> >                             final StreamEntry<String, JsonNode>
> > kvSimpleEntry = new StreamEntry<>(aggKey, newValue);
> >                             aggValue.add(kvSimpleEntry);
> >                             return aggValue;
> >                         }, Materialized.with(Serdes.String(), new
> > ListKVJsonNodeSerde()))
> >
> > Even during this process I'm 100% sure there are no duplicates, but
> > surprisingly after this I see that mapValues can be called with the same
> > messages more  than once. Even hundred of times.
> >
> >                .mapValues(vv -> {
> >                    // here the list vv contains the many
> >                    ....
> >                })
> >
> > Looking around I've found this project that seems to reproduce the
> problem:
> > https://github.com/westec/ks-aggregate-debug
> >
> > Given that I am using non-overlapping gap-less windows in kstream, the
> > correct output should NOT contain duplicate messages between windows?
> > Any ideas why the duplicates?
> >
> >
>
>

-- 
Vincenzo D'Amore

Reply via email to