HI, I am trying to group messages by message name, timestamp and then perform aggregation on message value. My window function looks like below
metrics.keyBy("metricName") .keyBy("timeStamp") .timeWindow(Time.seconds(30)) .trigger(ProcessingTimeTrigger.create()) .fold(new Tuple3<String,Long,Double>("",0L,0.0), new FoldFunction<Metric, Tuple3<String,Long,Double>>() { @Override public Tuple3<String, Long, Double> fold(Tuple3<String, Long, Double> arg0, Metric arg1) throws Exception { double count = 0.0; long timeStamp = arg1.getTimeStamp(); String metricName = arg1.getMetricName(); count =+ arg1.getValue(); return new Tuple3<String,Long,Double>(metricName,timeStamp,count); } }).print(); My intention is to aggregate all the messages of a particular type that occurred at a timestamp t across all partitions to calculate running mean. I see that some aggregation is happening but for each partition the intermediate values are being printed. My question is how do I get one aggregated value across all partitions? Kindly help. P.S I am getting messages from Kafka with 10 partitions. Thanks, Naveen