Hi Kohki,

As you mentioned, this is expected behavior. However, if you are willing to 
tolerate some more latency, you can improve the chance that a message with the 
same key is overwritten by increasing the commit time. By default it is 30 
seconds, but you can increase it:

streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 40000);

This will make the dedup cache work better (for documentation see 
http://docs.confluent.io/3.1.2/streams/developer-guide.html#memory-management 
<http://docs.confluent.io/3.1.2/streams/developer-guide.html#memory-management>).
 However, this does not guarantee deduplicates do not happen.

Thanks
Eno


> On 24 Feb 2017, at 15:20, Kohki Nishio <tarop...@gmail.com> wrote:
> 
> Hello Kafka experts
> 
> I'm trying to do windowed aggregation with Kafka Stream, however I'm
> getting multiple messages for the same time window, I know this is an
> expected behavior, however I really want to have a single message for given
> time window.
> 
> my test code looks like below
> 
>    builder.stream("test-stream")
>      .groupByKey()
>      .aggregate(
>        new DataPointsInitializer,
>        new DataPointsAggregator,
>        TimeWindows.of(60000).until(60000),
>        new DataPointsSerde,
>        "test-stream")
>      .toStream()
>      .print()
> 
> But if data arrives like this (it has its own time field)
> 
> 01:38:20,Metric1,10
> 01:38:21,Metric1,10
> 
> < long pause >
> 
> 01:38:22,Metric1,10
> 
> Then I get output like this
> 
> [KTABLE-TOSTREAM-0000000002]: [Metric1@1487813880000] , Map(10.0 -> 2)
> [KTABLE-TOSTREAM-0000000002]: [Metric1@1487813880000] , Map(10.0 -> 3)
> 
> I want to drop the last one so that I don't have duplicate messages, Thanks
> -- 
> Kohki Nishio

Reply via email to