True, that is tricky. The user code does not necessarily respect the non-reuse mode. That may be true for any user code. Can the list accumulator immediately serialize the objects and send over a byte array? That should since it reliably without adding overhead (serialization will happen anyways). Am 21.01.2015 11:04 schrieb "Ufuk Celebi" <u...@apache.org>:
> The thing is that the DefaultCrossFunction always uses the same holder > Tuple2 object, which is then handed over to the chained collect helper > flatMap(). I can see why it is OK to keep the default functions to reuse > "holder" objects, but when they are chained to an operator it becomes > problematic. > > On 21 Jan 2015, at 17:12, Ufuk Celebi <u...@apache.org> wrote: > > > I just checked out your branch and ran it with a break point set at the > CollectHelper. If you look into the (list) accumulator you see that always > the same object is added to it. Strangely enough, object re-use is disabled > in the config. I don't have time to look further into it now, but it seems > to be a problem with the object re-use mode. > > > > – Ufuk > > > > On 20 Jan 2015, at 20:53, Max Michels <m...@data-artisans.com> wrote: > > > >> Hi everyone, > >> > >> I'm running into some problems implementing a Accumulator for > >> returning a list of a DataSet. > >> > >> https://github.com/mxm/flink/tree/count/collect > >> > >> Basically, it works fine in this test case: > >> > >> > >> ExecutionEnvironment env = > ExecutionEnvironment.getExecutionEnvironment(); > >> > >> Integer[] input = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10}; > >> > >> DataSet<Integer> data = env.fromElements(input); > >> > >> // count > >> long numEntries = data.count(); > >> assertEquals(10, numEntries); > >> > >> // collect > >> ArrayList<Integer> list = (ArrayList<Integer>) data.collect(); > >> assertArrayEquals(input, list.toArray()); > >> > >> > >> But with non-primitive objects strange results occur: > >> > >> ExecutionEnvironment env = > ExecutionEnvironment.getExecutionEnvironment(); > >> env.getConfig().disableObjectReuse(); > >> > >> DataSet<Integer> data = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); > >> DataSet<Integer> data2 = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9, > 10); > >> > >> DataSet<Tuple2<Integer, Integer>> data3 = data.cross(data2); > >> > >> // count > >> long numEntries = data3.count(); > >> assertEquals(100, numEntries); > >> > >> // collect > >> ArrayList<Tuple2<Integer, Integer>> list = (ArrayList<Tuple2<Integer, > >> Integer>>) data3.collect(); > >> > >> System.out.println(list) > >> > >> .... > >> > >> Output: > >> > >> [(10,10), (10,10), (10,10), (10,10), (10,10), (10,10), (10,10), > >> (10,10), (10,10), (10,10), (10,10), (10,10), (10,10), (10,10), > >> (10,10), (10,10), (10,10), (10,10), (10,10), (10,10), (10,6), (10,6), > >> (10,6), (10,6), (10,6), (10,6), (10,6), (10,6), (10,6), (10,6), > >> (10,6), (10,6), (10,6), (10,6), (10,6), (10,6), (10,6), (10,6), > >> (10,6), (10,6), (10,7), (10,7), (10,7), (10,7), (10,7), (10,7), > >> (10,7), (10,7), (10,7), (10,7), (10,7), (10,7), (10,7), (10,7), > >> (10,7), (10,7), (10,7), (10,7), (10,7), (10,7), (10,9), (10,9), > >> (10,9), (10,9), (10,9), (10,9), (10,9), (10,9), (10,9), (10,9), > >> (10,9), (10,9), (10,9), (10,9), (10,9), (10,9), (10,9), (10,9), > >> (10,9), (10,9), (10,8), (10,8), (10,8), (10,8), (10,8), (10,8), > >> (10,8), (10,8), (10,8), (10,8), (10,8), (10,8), (10,8), (10,8), > >> (10,8), (10,8), (10,8), (10,8), (10,8), (10,8)] > >> > >> I assume, the problem is the clone() method of the ListAccumulator > >> where we just create a shallow copy. This is fine for accumulators > >> which use primitive objects, like IntCounter but here we have a real > >> object. > >> > >> How do we work around this problem? > >> > >> Best regards, > >> Max > > > >