The thing is that the DefaultCrossFunction always uses the same holder Tuple2 object, which is then handed over to the chained collect helper flatMap(). I can see why it is OK to keep the default functions to reuse "holder" objects, but when they are chained to an operator it becomes problematic.
On 21 Jan 2015, at 17:12, Ufuk Celebi <u...@apache.org> wrote: > I just checked out your branch and ran it with a break point set at the > CollectHelper. If you look into the (list) accumulator you see that always > the same object is added to it. Strangely enough, object re-use is disabled > in the config. I don't have time to look further into it now, but it seems to > be a problem with the object re-use mode. > > – Ufuk > > On 20 Jan 2015, at 20:53, Max Michels <m...@data-artisans.com> wrote: > >> Hi everyone, >> >> I'm running into some problems implementing a Accumulator for >> returning a list of a DataSet. >> >> https://github.com/mxm/flink/tree/count/collect >> >> Basically, it works fine in this test case: >> >> >> ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); >> >> Integer[] input = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10}; >> >> DataSet<Integer> data = env.fromElements(input); >> >> // count >> long numEntries = data.count(); >> assertEquals(10, numEntries); >> >> // collect >> ArrayList<Integer> list = (ArrayList<Integer>) data.collect(); >> assertArrayEquals(input, list.toArray()); >> >> >> But with non-primitive objects strange results occur: >> >> ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); >> env.getConfig().disableObjectReuse(); >> >> DataSet<Integer> data = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); >> DataSet<Integer> data2 = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); >> >> DataSet<Tuple2<Integer, Integer>> data3 = data.cross(data2); >> >> // count >> long numEntries = data3.count(); >> assertEquals(100, numEntries); >> >> // collect >> ArrayList<Tuple2<Integer, Integer>> list = (ArrayList<Tuple2<Integer, >> Integer>>) data3.collect(); >> >> System.out.println(list) >> >> .... >> >> Output: >> >> [(10,10), (10,10), (10,10), (10,10), (10,10), (10,10), (10,10), >> (10,10), (10,10), (10,10), (10,10), (10,10), (10,10), (10,10), >> (10,10), (10,10), (10,10), (10,10), (10,10), (10,10), (10,6), (10,6), >> (10,6), (10,6), (10,6), (10,6), (10,6), (10,6), (10,6), (10,6), >> (10,6), (10,6), (10,6), (10,6), (10,6), (10,6), (10,6), (10,6), >> (10,6), (10,6), (10,7), (10,7), (10,7), (10,7), (10,7), (10,7), >> (10,7), (10,7), (10,7), (10,7), (10,7), (10,7), (10,7), (10,7), >> (10,7), (10,7), (10,7), (10,7), (10,7), (10,7), (10,9), (10,9), >> (10,9), (10,9), (10,9), (10,9), (10,9), (10,9), (10,9), (10,9), >> (10,9), (10,9), (10,9), (10,9), (10,9), (10,9), (10,9), (10,9), >> (10,9), (10,9), (10,8), (10,8), (10,8), (10,8), (10,8), (10,8), >> (10,8), (10,8), (10,8), (10,8), (10,8), (10,8), (10,8), (10,8), >> (10,8), (10,8), (10,8), (10,8), (10,8), (10,8)] >> >> I assume, the problem is the clone() method of the ListAccumulator >> where we just create a shallow copy. This is fine for accumulators >> which use primitive objects, like IntCounter but here we have a real >> object. >> >> How do we work around this problem? >> >> Best regards, >> Max >