Hi Biplob, Flink is a distributed, data parallel system which means that there are several instances of you ReduceFunction running in parallel, each with its own timestamp counter. If you want to have a unique timestamp, you have to set the parallelism of the reduce operator to 1, but then the program might become inefficient.
Maybe DataSetUtils.zipWithIndex() or DataSetUtils.zipWithUniqueId() are helpful for you use case. Best, Fabian 2016-04-26 17:12 GMT+02:00 Biplob Biswas <revolutioni...@gmail.com>: > Hi, > > I am using a groupreduce function to aggregate the content of the objects > but at the same time i need to return a unique counter from the function > but > my attempts are failing and the identifiers are somehow very random and > getting duplicated. > > Following is the part of my code which is supposed to generate a unique > counter and return it with out.collect. > > > public static class sumReducer implements > GroupReduceFunction<Tuple2<Integer, Point>, > Tuple5<Integer,Point, Point, > Long, Long>> { > > double sum[] = null; > double sumOfSquare[] = null; > long timestamp = 0; > @Override > public void reduce(Iterable<Tuple2<Integer, Point>> in, > Collector<Tuple5<Integer,Point, Point, Long, Long>> out) > throws Exception { > > int id = 0; > long count = 0; > boolean flag = true; > for(Tuple2<Integer, Point> i:in) > { > if(flag) > { > timestamp++; > System.out.println("uniqueid: " + > i.f0 + ", t: " + timestamp ); > sum = new double[i.f1.pt.length]; > sumOfSquare = new > double[sum.length]; > id = i.f0; > for(int j=0;j<sum.length;j++) > { > sum[j] = i.f1.pt[j]; > sumOfSquare[j] = i.f1.pt[j] > * i.f1.pt[j]; > } > flag = false; > } > else > { > int len = i.f1.pt.length; > for(int j=0;j<len;j++) > { > sum[j] += i.f1.pt[j]; > sumOfSquare[j] += (i.f1.pt[j] > * i.f1.pt[j]); > } > } > count++; > } > out.collect(new Tuple5<Integer,Point, Point, > Long, Long>(id,new > Point(sum), new Point(sumOfSquare),count, timestamp)); > } > > I want the timestamp to be unique, but even though the code > "System.out.println("uniqueid: " + i.f0 + ", t: " + timestamp );" executes > once for each of the identifier (given by i.f0) by which it is grouped and > then the groupReducce function is called still I get the following output > for the above println statement. > > uniqueid: 2, t: 1 > uniqueid: 1, t: 1 > uniqueid: 7, t: 2 > uniqueid: 9, t: 3 > uniqueid: 6, t: 2 > uniqueid: 3, t: 1 > uniqueid: 5, t: 2 > uniqueid: 8, t: 3 > > I dont really get why I am getting this discrepancy, probably I am missing > some Flink concept, I am relatively very new to the flink platform and any > help is appreciated. Thanks a lot. > > Thanks and Regards > > > > -- > View this message in context: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Return-unique-counter-using-groupReduceFunction-tp6452.html > Sent from the Apache Flink User Mailing List archive. mailing list archive > at Nabble.com. >