HI,

I am trying to group messages by message name, timestamp and then perform 
aggregation on message value. My window function looks like below

metrics.keyBy("metricName")
    .keyBy("timeStamp")
    .timeWindow(Time.seconds(30))
    .trigger(ProcessingTimeTrigger.create())
    .fold(new Tuple3<String,Long,Double>("",0L,0.0), new FoldFunction<Metric, 
Tuple3<String,Long,Double>>() {

      @Override
      public Tuple3<String, Long, Double> fold(Tuple3<String, Long, Double> 
arg0, Metric arg1) throws Exception {
        double count = 0.0;
        long timeStamp = arg1.getTimeStamp();
        String metricName = arg1.getMetricName();
        count =+ arg1.getValue();
        return new Tuple3<String,Long,Double>(metricName,timeStamp,count);
      }
    }).print();

My intention is to aggregate all the messages of a particular type that 
occurred at a timestamp t across all partitions to calculate running mean. I 
see that some aggregation is happening but for each partition the intermediate 
values are being printed. My question is how do I get one aggregated value 
across all partitions? Kindly help.

P.S I am getting messages from Kafka with 10 partitions.

Thanks,
Naveen

Reply via email to