Hi Naveen, you would have to apply an all reduce after you’ve aggregated the values by key.
metrics... timeWindowAll(Time.seconds(30)).fold(..., ...); However, be aware that this all window fold operation will be executed by a single operator (parallelism 1). You could also do this all window operation right away without applying the keyed window. Cheers, Till On Sat, Dec 10, 2016 at 3:46 AM, Naveen Tirupattur <ntirupat...@maprtech.com > wrote: > HI, > > I am trying to group messages by message name, timestamp and then perform > aggregation on message value. My window function looks like below > > metrics.keyBy("metricName") > .keyBy("timeStamp") > .timeWindow(Time.seconds(30)) > .trigger(ProcessingTimeTrigger.create()) > .fold(new Tuple3<String,Long,Double>("",0L,0.0), new > FoldFunction<Metric, Tuple3<String,Long,Double>>() { > > @Override > public Tuple3<String, Long, Double> fold(Tuple3<String, Long, > Double> arg0, Metric arg1) throws Exception { > double count = 0.0; > long timeStamp = arg1.getTimeStamp(); > String metricName = arg1.getMetricName(); > count =+ arg1.getValue(); > return new Tuple3<String,Long,Double>(metricName,timeStamp,count); > } > }).print(); > > My intention is to aggregate all the messages of a particular type that > occurred at a timestamp t across all partitions to calculate running mean. > I see that some aggregation is happening but for each partition the > intermediate values are being printed. My question is how do I get one > aggregated value across all partitions? Kindly help. > > P.S I am getting messages from Kafka with 10 partitions. > > Thanks, > Naveen