Hi everyone, I'm running into some problems implementing a Accumulator for returning a list of a DataSet.
https://github.com/mxm/flink/tree/count/collect Basically, it works fine in this test case: ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); Integer[] input = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10}; DataSet<Integer> data = env.fromElements(input); // count long numEntries = data.count(); assertEquals(10, numEntries); // collect ArrayList<Integer> list = (ArrayList<Integer>) data.collect(); assertArrayEquals(input, list.toArray()); But with non-primitive objects strange results occur: ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); env.getConfig().disableObjectReuse(); DataSet<Integer> data = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); DataSet<Integer> data2 = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); DataSet<Tuple2<Integer, Integer>> data3 = data.cross(data2); // count long numEntries = data3.count(); assertEquals(100, numEntries); // collect ArrayList<Tuple2<Integer, Integer>> list = (ArrayList<Tuple2<Integer, Integer>>) data3.collect(); System.out.println(list) .... Output: [(10,10), (10,10), (10,10), (10,10), (10,10), (10,10), (10,10), (10,10), (10,10), (10,10), (10,10), (10,10), (10,10), (10,10), (10,10), (10,10), (10,10), (10,10), (10,10), (10,10), (10,6), (10,6), (10,6), (10,6), (10,6), (10,6), (10,6), (10,6), (10,6), (10,6), (10,6), (10,6), (10,6), (10,6), (10,6), (10,6), (10,6), (10,6), (10,6), (10,6), (10,7), (10,7), (10,7), (10,7), (10,7), (10,7), (10,7), (10,7), (10,7), (10,7), (10,7), (10,7), (10,7), (10,7), (10,7), (10,7), (10,7), (10,7), (10,7), (10,7), (10,9), (10,9), (10,9), (10,9), (10,9), (10,9), (10,9), (10,9), (10,9), (10,9), (10,9), (10,9), (10,9), (10,9), (10,9), (10,9), (10,9), (10,9), (10,9), (10,9), (10,8), (10,8), (10,8), (10,8), (10,8), (10,8), (10,8), (10,8), (10,8), (10,8), (10,8), (10,8), (10,8), (10,8), (10,8), (10,8), (10,8), (10,8), (10,8), (10,8)] I assume, the problem is the clone() method of the ListAccumulator where we just create a shallow copy. This is fine for accumulators which use primitive objects, like IntCounter but here we have a real object. How do we work around this problem? Best regards, Max