Hi Kohki, Note that Streams execute operations based on the "timestamp" of the record, i.e. in your case it is the "event time" not the processing time. When you received
00:00:00,metric,2 After the long pause, it is considered as a "late arrived record" which happens at 00:00:00 but received late. Hence it will still be aggregated under the old window bucket of 1487894400000 based on its timestamp and hence the result, containing two records. Does that make sense? Guozhang On Fri, Feb 24, 2017 at 9:59 AM, Kohki Nishio <tarop...@gmail.com> wrote: > Thanks for the info, however there's an alarming functionality, duplicate > message is a tricky thing to manage.. I thought 'retention-period' could > work for that purpose, however here's the result > > My TimeWindow is > > TimeWindows.of(60000).until(60000), > > And here's the input > > 00:00:00,metric,1 > 00:01:00,metric,1 > 00:03:00,metric,1 > 00:04:00,metric,1 > 00:05:00,metric,1 > 00:06:00,metric,1 > > <long pause> > > 00:00:00,metric,2 > > <long pause> > > 00:00:00,metric,3 > > The output below > > [metric@1487894400000] , Map(1.0 -> 1) > [metric@1487894460000] , Map(1.0 -> 1) > [metric@1487894580000] , Map(1.0 -> 1) > [metric@1487894640000] , Map(1.0 -> 1) > [metric@1487894700000] , Map(1.0 -> 1) > [metric@1487894760000] , Map(1.0 -> 1) > [metric@1487894400000] , Map(2.0 -> 1, 1.0 -> 1) <======== ?? > [metric@1487894400000] , Map(2.0 -> 1, 1.0 -> 1, 3.0 -> 1) <====== ?? > > I don't understand why the last two happens ... I'm looking into the source > code, however I wonder if I'm doing something wrong .. > > > On Fri, Feb 24, 2017 at 8:33 AM, Eno Thereska <eno.there...@gmail.com> > wrote: > > > Hi Kohki, > > > > As you mentioned, this is expected behavior. However, if you are willing > > to tolerate some more latency, you can improve the chance that a message > > with the same key is overwritten by increasing the commit time. By > default > > it is 30 seconds, but you can increase it: > > > > streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, > 40000); > > > > This will make the dedup cache work better (for documentation see > > http://docs.confluent.io/3.1.2/streams/developer-guide. > > html#memory-management <http://docs.confluent.io/3.1. > > 2/streams/developer-guide.html#memory-management>). However, this does > > not guarantee deduplicates do not happen. > > > > Thanks > > Eno > > > > > > > On 24 Feb 2017, at 15:20, Kohki Nishio <tarop...@gmail.com> wrote: > > > > > > Hello Kafka experts > > > > > > I'm trying to do windowed aggregation with Kafka Stream, however I'm > > > getting multiple messages for the same time window, I know this is an > > > expected behavior, however I really want to have a single message for > > given > > > time window. > > > > > > my test code looks like below > > > > > > builder.stream("test-stream") > > > .groupByKey() > > > .aggregate( > > > new DataPointsInitializer, > > > new DataPointsAggregator, > > > TimeWindows.of(60000).until(60000), > > > new DataPointsSerde, > > > "test-stream") > > > .toStream() > > > .print() > > > > > > But if data arrives like this (it has its own time field) > > > > > > 01:38:20,Metric1,10 > > > 01:38:21,Metric1,10 > > > > > > < long pause > > > > > > > 01:38:22,Metric1,10 > > > > > > Then I get output like this > > > > > > [KTABLE-TOSTREAM-0000000002]: [Metric1@1487813880000] , Map(10.0 -> 2) > > > [KTABLE-TOSTREAM-0000000002]: [Metric1@1487813880000] , Map(10.0 -> 3) > > > > > > I want to drop the last one so that I don't have duplicate messages, > > Thanks > > > -- > > > Kohki Nishio > > > > > > > -- > Kohki Nishio > -- -- Guozhang