I just checked out your branch and ran it with a break point set at the CollectHelper. If you look into the (list) accumulator you see that always the same object is added to it. Strangely enough, object re-use is disabled in the config. I don't have time to look further into it now, but it seems to be a problem with the object re-use mode.
– Ufuk On 20 Jan 2015, at 20:53, Max Michels <m...@data-artisans.com> wrote: > Hi everyone, > > I'm running into some problems implementing a Accumulator for > returning a list of a DataSet. > > https://github.com/mxm/flink/tree/count/collect > > Basically, it works fine in this test case: > > > ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); > > Integer[] input = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10}; > > DataSet<Integer> data = env.fromElements(input); > > // count > long numEntries = data.count(); > assertEquals(10, numEntries); > > // collect > ArrayList<Integer> list = (ArrayList<Integer>) data.collect(); > assertArrayEquals(input, list.toArray()); > > > But with non-primitive objects strange results occur: > > ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); > env.getConfig().disableObjectReuse(); > > DataSet<Integer> data = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); > DataSet<Integer> data2 = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); > > DataSet<Tuple2<Integer, Integer>> data3 = data.cross(data2); > > // count > long numEntries = data3.count(); > assertEquals(100, numEntries); > > // collect > ArrayList<Tuple2<Integer, Integer>> list = (ArrayList<Tuple2<Integer, > Integer>>) data3.collect(); > > System.out.println(list) > > .... > > Output: > > [(10,10), (10,10), (10,10), (10,10), (10,10), (10,10), (10,10), > (10,10), (10,10), (10,10), (10,10), (10,10), (10,10), (10,10), > (10,10), (10,10), (10,10), (10,10), (10,10), (10,10), (10,6), (10,6), > (10,6), (10,6), (10,6), (10,6), (10,6), (10,6), (10,6), (10,6), > (10,6), (10,6), (10,6), (10,6), (10,6), (10,6), (10,6), (10,6), > (10,6), (10,6), (10,7), (10,7), (10,7), (10,7), (10,7), (10,7), > (10,7), (10,7), (10,7), (10,7), (10,7), (10,7), (10,7), (10,7), > (10,7), (10,7), (10,7), (10,7), (10,7), (10,7), (10,9), (10,9), > (10,9), (10,9), (10,9), (10,9), (10,9), (10,9), (10,9), (10,9), > (10,9), (10,9), (10,9), (10,9), (10,9), (10,9), (10,9), (10,9), > (10,9), (10,9), (10,8), (10,8), (10,8), (10,8), (10,8), (10,8), > (10,8), (10,8), (10,8), (10,8), (10,8), (10,8), (10,8), (10,8), > (10,8), (10,8), (10,8), (10,8), (10,8), (10,8)] > > I assume, the problem is the clone() method of the ListAccumulator > where we just create a shallow copy. This is fine for accumulators > which use primitive objects, like IntCounter but here we have a real > object. > > How do we work around this problem? > > Best regards, > Max