Hi everyone,

I'm running into some problems implementing a Accumulator for
returning a list of a DataSet.

https://github.com/mxm/flink/tree/count/collect

Basically, it works fine in this test case:


    ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

    Integer[] input = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10};

    DataSet<Integer> data = env.fromElements(input);

    // count
    long numEntries = data.count();
    assertEquals(10, numEntries);

    // collect
    ArrayList<Integer> list = (ArrayList<Integer>) data.collect();
    assertArrayEquals(input, list.toArray());


But with non-primitive objects strange results occur:

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.getConfig().disableObjectReuse();

DataSet<Integer> data = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
DataSet<Integer> data2 = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);

DataSet<Tuple2<Integer, Integer>> data3 = data.cross(data2);

// count
long numEntries = data3.count();
assertEquals(100, numEntries);

// collect
ArrayList<Tuple2<Integer, Integer>> list = (ArrayList<Tuple2<Integer,
Integer>>) data3.collect();

System.out.println(list)

....

Output:

[(10,10), (10,10), (10,10), (10,10), (10,10), (10,10), (10,10),
(10,10), (10,10), (10,10), (10,10), (10,10), (10,10), (10,10),
(10,10), (10,10), (10,10), (10,10), (10,10), (10,10), (10,6), (10,6),
(10,6), (10,6), (10,6), (10,6), (10,6), (10,6), (10,6), (10,6),
(10,6), (10,6), (10,6), (10,6), (10,6), (10,6), (10,6), (10,6),
(10,6), (10,6), (10,7), (10,7), (10,7), (10,7), (10,7), (10,7),
(10,7), (10,7), (10,7), (10,7), (10,7), (10,7), (10,7), (10,7),
(10,7), (10,7), (10,7), (10,7), (10,7), (10,7), (10,9), (10,9),
(10,9), (10,9), (10,9), (10,9), (10,9), (10,9), (10,9), (10,9),
(10,9), (10,9), (10,9), (10,9), (10,9), (10,9), (10,9), (10,9),
(10,9), (10,9), (10,8), (10,8), (10,8), (10,8), (10,8), (10,8),
(10,8), (10,8), (10,8), (10,8), (10,8), (10,8), (10,8), (10,8),
(10,8), (10,8), (10,8), (10,8), (10,8), (10,8)]

I assume, the problem is the clone() method of the ListAccumulator
where we just create a shallow copy. This is fine for accumulators
which use primitive objects, like IntCounter but here we have a real
object.

How do we work around this problem?

Best regards,
Max

Reply via email to