Your observation is correct and it's expected behavior.

As mentioned originally, Kafka Streams follows a continuous update
processing model, ie, each time an input record is processed, the
aggregation result is updated and emitted downstream. (Did you read the
blog post?)

Thus, if you aggregate 5 records into an array, the `KTable#toStream()`
operation returns:

input: <k,a>, <k,b>, <k,c>, <k,d>, <k,e>
output: <k,[a]>, <k,[a,b]>, <k,[a,b,c]>, <k,[a,b,c,d]>, <k,[a,b,c,d,e]>

You might not see all updates due to caching:
https://kafka.apache.org/21/documentation/streams/developer-guide/memory-mgmt.html

Since 2.1, Kafka Streams added a new `suppress()` operator that you can
use to get only one result:
https://kafka.apache.org/21/documentation/streams/developer-guide/dsl-api.html#window-final-results


-Matthias


On 1/27/19 3:40 PM, Vincenzo D'Amore wrote:
> Hi Matthias, thanks for your reply. Let me to explain better what I'm
> trying to say, in the meantime I've played with this problem and I think
> now I have a more clear view, though I haven't still a solution.
> 
> I've an input topic A which is a stream of message where each message
> contains just an ID. Those IDs (messages) can be thousands of even millions
> but in my test (proof of concept) are all different.
> 
> In order to process them, for each one I have to retrive few data that are
> stored in a nosql database, as you can understand querying one ID each time
> is not a good solution, I mean for performance reason, so I need to
> aggregate them and here comes the problem.
> 
> So from the source topic A I have created a new topic B where for each
> message now has a key which is a number that change X milliseconds (say
> 500ms).
> Now I can have a group by key and an aggregate. I suppose that each list
> returned by aggregate() does not contains duplicates.
> The output of this aggregate process is saved in the topic C.
> Topic C contains arrays of IDs of different size and the key is the number
> created to group them.
> 
> And here I have my big surprise, in the topic C there are a lot of ID that
> are present at the same time in different messages.
> Those messages have the same key but arrays of ID with different size, and
> each array partially contains ID present in other messages.
> 
> I suppose this should be impossible.
> 
> So, for example, if I have a stream with the following list of messages:
> 
> key - value
> --------------
> 0 - 1
> 0 - 2
> 0 - 3
> 0 - 4
> 0 - 5
> 1 - 6
> 1 - 7
> 1 - 8
> 1 - 9
> 1 - 10
> 
> I suppose the groupByKey() and aggregate() should return
> 
> key - value
> ----------------
> 0 - [1,2,3,4,5]
> 1 - [6,7,8,9,10]
> 
> But instead I found something like:
> 
> key - value
> ----------------
> 0 - [1,2,3,4,5]
> 0 - [2,3,4,5]
> 1 - [6,7,8,9]
> 1 - [6,7,8,9,10]
> 
> So the question is, did I do something wrong trying to aggregate them? how
> can avoid those duplicates?
> 
> 
> On Sat, Jan 26, 2019 at 9:01 PM Matthias J. Sax <matth...@confluent.io>
> wrote:
> 
>> I am not 100% sure, what you mean by
>>
>>>> I've a input topic where I'm 100% sure there are no duplicate keys or
>> messaged
>>
>> If this is the case (ie, each key is unique), it would imply that each
>> window contains exactly one record per key. Hence, why do you aggregate?
>> Each aggregate would consist of only one message making an aggregation
>> step unnecessary.
>>
>> Can you be a little bit more specific and provide a sample input
>> (key,value,timestamp), observed output, and expected output?
>>
>> I suspect (but I am not sure), that you might "struggle" with Kafka
>> Streams' continuous output model. Maybe this blog post sheds some light:
>> https://www.confluent.io/blog/watermarks-tables-event-time-dataflow-model/
>>
>>
>> -Matthias
>>
>> On 1/25/19 9:31 AM, Vincenzo D'Amore wrote:
>>> Hi all,
>>>
>>> I write here because it's a couple of days I'm struggling trying to
>>> understand why I've so much duplicates during the messages processing
>> with
>>> kafka streams.
>>>
>>> I've a input topic where I'm 100% sure there are no duplicate keys or
>>> messages,
>>>
>>> During the process I've to aggregate the messages using
>>> groupByKey, windowedBy and aggregate:
>>>
>>>                 .map((v1, v2) -> {
>>>                     Long currentSecond = System.currentTimeMillis() /
>> 500;
>>>                     return new KeyValue<>(currentSecond.toString(), v2);
>>>                 })
>>>                 .groupByKey(Serialized.with(Serdes.String(), new
>>> JsonSerde()))
>>>                 .windowedBy(TimeWindows.of(500))
>>>                 .aggregate(() -> new ArrayList<StreamEntry<String,
>>> JsonNode>>(),
>>>                         (aggKey, newValue, aggValue) -> {
>>>                             final StreamEntry<String, JsonNode>
>>> kvSimpleEntry = new StreamEntry<>(aggKey, newValue);
>>>                             aggValue.add(kvSimpleEntry);
>>>                             return aggValue;
>>>                         }, Materialized.with(Serdes.String(), new
>>> ListKVJsonNodeSerde()))
>>>
>>> Even during this process I'm 100% sure there are no duplicates, but
>>> surprisingly after this I see that mapValues can be called with the same
>>> messages more  than once. Even hundred of times.
>>>
>>>                .mapValues(vv -> {
>>>                    // here the list vv contains the many
>>>                    ....
>>>                })
>>>
>>> Looking around I've found this project that seems to reproduce the
>> problem:
>>> https://github.com/westec/ks-aggregate-debug
>>>
>>> Given that I am using non-overlapping gap-less windows in kstream, the
>>> correct output should NOT contain duplicate messages between windows?
>>> Any ideas why the duplicates?
>>>
>>>
>>
>>
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to