Your observation is correct and it's expected behavior. As mentioned originally, Kafka Streams follows a continuous update processing model, ie, each time an input record is processed, the aggregation result is updated and emitted downstream. (Did you read the blog post?)
Thus, if you aggregate 5 records into an array, the `KTable#toStream()` operation returns: input: <k,a>, <k,b>, <k,c>, <k,d>, <k,e> output: <k,[a]>, <k,[a,b]>, <k,[a,b,c]>, <k,[a,b,c,d]>, <k,[a,b,c,d,e]> You might not see all updates due to caching: https://kafka.apache.org/21/documentation/streams/developer-guide/memory-mgmt.html Since 2.1, Kafka Streams added a new `suppress()` operator that you can use to get only one result: https://kafka.apache.org/21/documentation/streams/developer-guide/dsl-api.html#window-final-results -Matthias On 1/27/19 3:40 PM, Vincenzo D'Amore wrote: > Hi Matthias, thanks for your reply. Let me to explain better what I'm > trying to say, in the meantime I've played with this problem and I think > now I have a more clear view, though I haven't still a solution. > > I've an input topic A which is a stream of message where each message > contains just an ID. Those IDs (messages) can be thousands of even millions > but in my test (proof of concept) are all different. > > In order to process them, for each one I have to retrive few data that are > stored in a nosql database, as you can understand querying one ID each time > is not a good solution, I mean for performance reason, so I need to > aggregate them and here comes the problem. > > So from the source topic A I have created a new topic B where for each > message now has a key which is a number that change X milliseconds (say > 500ms). > Now I can have a group by key and an aggregate. I suppose that each list > returned by aggregate() does not contains duplicates. > The output of this aggregate process is saved in the topic C. > Topic C contains arrays of IDs of different size and the key is the number > created to group them. > > And here I have my big surprise, in the topic C there are a lot of ID that > are present at the same time in different messages. > Those messages have the same key but arrays of ID with different size, and > each array partially contains ID present in other messages. > > I suppose this should be impossible. > > So, for example, if I have a stream with the following list of messages: > > key - value > -------------- > 0 - 1 > 0 - 2 > 0 - 3 > 0 - 4 > 0 - 5 > 1 - 6 > 1 - 7 > 1 - 8 > 1 - 9 > 1 - 10 > > I suppose the groupByKey() and aggregate() should return > > key - value > ---------------- > 0 - [1,2,3,4,5] > 1 - [6,7,8,9,10] > > But instead I found something like: > > key - value > ---------------- > 0 - [1,2,3,4,5] > 0 - [2,3,4,5] > 1 - [6,7,8,9] > 1 - [6,7,8,9,10] > > So the question is, did I do something wrong trying to aggregate them? how > can avoid those duplicates? > > > On Sat, Jan 26, 2019 at 9:01 PM Matthias J. Sax <matth...@confluent.io> > wrote: > >> I am not 100% sure, what you mean by >> >>>> I've a input topic where I'm 100% sure there are no duplicate keys or >> messaged >> >> If this is the case (ie, each key is unique), it would imply that each >> window contains exactly one record per key. Hence, why do you aggregate? >> Each aggregate would consist of only one message making an aggregation >> step unnecessary. >> >> Can you be a little bit more specific and provide a sample input >> (key,value,timestamp), observed output, and expected output? >> >> I suspect (but I am not sure), that you might "struggle" with Kafka >> Streams' continuous output model. Maybe this blog post sheds some light: >> https://www.confluent.io/blog/watermarks-tables-event-time-dataflow-model/ >> >> >> -Matthias >> >> On 1/25/19 9:31 AM, Vincenzo D'Amore wrote: >>> Hi all, >>> >>> I write here because it's a couple of days I'm struggling trying to >>> understand why I've so much duplicates during the messages processing >> with >>> kafka streams. >>> >>> I've a input topic where I'm 100% sure there are no duplicate keys or >>> messages, >>> >>> During the process I've to aggregate the messages using >>> groupByKey, windowedBy and aggregate: >>> >>> .map((v1, v2) -> { >>> Long currentSecond = System.currentTimeMillis() / >> 500; >>> return new KeyValue<>(currentSecond.toString(), v2); >>> }) >>> .groupByKey(Serialized.with(Serdes.String(), new >>> JsonSerde())) >>> .windowedBy(TimeWindows.of(500)) >>> .aggregate(() -> new ArrayList<StreamEntry<String, >>> JsonNode>>(), >>> (aggKey, newValue, aggValue) -> { >>> final StreamEntry<String, JsonNode> >>> kvSimpleEntry = new StreamEntry<>(aggKey, newValue); >>> aggValue.add(kvSimpleEntry); >>> return aggValue; >>> }, Materialized.with(Serdes.String(), new >>> ListKVJsonNodeSerde())) >>> >>> Even during this process I'm 100% sure there are no duplicates, but >>> surprisingly after this I see that mapValues can be called with the same >>> messages more than once. Even hundred of times. >>> >>> .mapValues(vv -> { >>> // here the list vv contains the many >>> .... >>> }) >>> >>> Looking around I've found this project that seems to reproduce the >> problem: >>> https://github.com/westec/ks-aggregate-debug >>> >>> Given that I am using non-overlapping gap-less windows in kstream, the >>> correct output should NOT contain duplicate messages between windows? >>> Any ideas why the duplicates? >>> >>> >> >> >
signature.asc
Description: OpenPGP digital signature