Hi Naveen,

you would have to apply an all reduce after you’ve aggregated the values by
key.

metrics...
 timeWindowAll(Time.seconds(30)).fold(..., ...);

However, be aware that this all window fold operation will be executed by a
single operator (parallelism 1). You could also do this all window
operation right away without applying the keyed window.

Cheers,
Till
​

On Sat, Dec 10, 2016 at 3:46 AM, Naveen Tirupattur <ntirupat...@maprtech.com
> wrote:

> HI,
>
> I am trying to group messages by message name, timestamp and then perform
> aggregation on message value. My window function looks like below
>
> metrics.keyBy("metricName")
>     .keyBy("timeStamp")
>     .timeWindow(Time.seconds(30))
>     .trigger(ProcessingTimeTrigger.create())
>     .fold(new Tuple3<String,Long,Double>("",0L,0.0), new
> FoldFunction<Metric, Tuple3<String,Long,Double>>() {
>
>       @Override
>       public Tuple3<String, Long, Double> fold(Tuple3<String, Long,
> Double> arg0, Metric arg1) throws Exception {
>         double count = 0.0;
>         long timeStamp = arg1.getTimeStamp();
>         String metricName = arg1.getMetricName();
>         count =+ arg1.getValue();
>         return new Tuple3<String,Long,Double>(metricName,timeStamp,count);
>       }
>     }).print();
>
> My intention is to aggregate all the messages of a particular type that
> occurred at a timestamp t across all partitions to calculate running mean.
> I see that some aggregation is happening but for each partition the
> intermediate values are being printed. My question is how do I get one
> aggregated value across all partitions? Kindly help.
>
> P.S I am getting messages from Kafka with 10 partitions.
>
> Thanks,
> Naveen

Reply via email to