HI,
I am trying to group messages by message name, timestamp and then perform
aggregation on message value. My window function looks like below
metrics.keyBy("metricName")
.keyBy("timeStamp")
.timeWindow(Time.seconds(30))
.trigger(ProcessingTimeTrigger.create())
.fold(new Tuple3<String,Long,Double>("",0L,0.0), new FoldFunction<Metric,
Tuple3<String,Long,Double>>() {
@Override
public Tuple3<String, Long, Double> fold(Tuple3<String, Long, Double>
arg0, Metric arg1) throws Exception {
double count = 0.0;
long timeStamp = arg1.getTimeStamp();
String metricName = arg1.getMetricName();
count =+ arg1.getValue();
return new Tuple3<String,Long,Double>(metricName,timeStamp,count);
}
}).print();
My intention is to aggregate all the messages of a particular type that
occurred at a timestamp t across all partitions to calculate running mean. I
see that some aggregation is happening but for each partition the intermediate
values are being printed. My question is how do I get one aggregated value
across all partitions? Kindly help.
P.S I am getting messages from Kafka with 10 partitions.
Thanks,
Naveen