Thank you for your help, Ufuk and Stephan. I made some changes to immediately serialize the stored objects.
On Thu, Jan 22, 2015 at 2:58 AM, Stephan Ewen <se...@apache.org> wrote: > True, that is tricky. The user code does not necessarily respect the > non-reuse mode. That may be true for any user code. Can the list > accumulator immediately serialize the objects and send over a byte array? > That should since it reliably without adding overhead (serialization will > happen anyways). > Am 21.01.2015 11:04 schrieb "Ufuk Celebi" <u...@apache.org>: > >> The thing is that the DefaultCrossFunction always uses the same holder >> Tuple2 object, which is then handed over to the chained collect helper >> flatMap(). I can see why it is OK to keep the default functions to reuse >> "holder" objects, but when they are chained to an operator it becomes >> problematic. >> >> On 21 Jan 2015, at 17:12, Ufuk Celebi <u...@apache.org> wrote: >> >> > I just checked out your branch and ran it with a break point set at the >> CollectHelper. If you look into the (list) accumulator you see that always >> the same object is added to it. Strangely enough, object re-use is disabled >> in the config. I don't have time to look further into it now, but it seems >> to be a problem with the object re-use mode. >> > >> > – Ufuk >> > >> > On 20 Jan 2015, at 20:53, Max Michels <m...@data-artisans.com> wrote: >> > >> >> Hi everyone, >> >> >> >> I'm running into some problems implementing a Accumulator for >> >> returning a list of a DataSet. >> >> >> >> https://github.com/mxm/flink/tree/count/collect >> >> >> >> Basically, it works fine in this test case: >> >> >> >> >> >> ExecutionEnvironment env = >> ExecutionEnvironment.getExecutionEnvironment(); >> >> >> >> Integer[] input = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10}; >> >> >> >> DataSet<Integer> data = env.fromElements(input); >> >> >> >> // count >> >> long numEntries = data.count(); >> >> assertEquals(10, numEntries); >> >> >> >> // collect >> >> ArrayList<Integer> list = (ArrayList<Integer>) data.collect(); >> >> assertArrayEquals(input, list.toArray()); >> >> >> >> >> >> But with non-primitive objects strange results occur: >> >> >> >> ExecutionEnvironment env = >> ExecutionEnvironment.getExecutionEnvironment(); >> >> env.getConfig().disableObjectReuse(); >> >> >> >> DataSet<Integer> data = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); >> >> DataSet<Integer> data2 = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9, >> 10); >> >> >> >> DataSet<Tuple2<Integer, Integer>> data3 = data.cross(data2); >> >> >> >> // count >> >> long numEntries = data3.count(); >> >> assertEquals(100, numEntries); >> >> >> >> // collect >> >> ArrayList<Tuple2<Integer, Integer>> list = (ArrayList<Tuple2<Integer, >> >> Integer>>) data3.collect(); >> >> >> >> System.out.println(list) >> >> >> >> .... >> >> >> >> Output: >> >> >> >> [(10,10), (10,10), (10,10), (10,10), (10,10), (10,10), (10,10), >> >> (10,10), (10,10), (10,10), (10,10), (10,10), (10,10), (10,10), >> >> (10,10), (10,10), (10,10), (10,10), (10,10), (10,10), (10,6), (10,6), >> >> (10,6), (10,6), (10,6), (10,6), (10,6), (10,6), (10,6), (10,6), >> >> (10,6), (10,6), (10,6), (10,6), (10,6), (10,6), (10,6), (10,6), >> >> (10,6), (10,6), (10,7), (10,7), (10,7), (10,7), (10,7), (10,7), >> >> (10,7), (10,7), (10,7), (10,7), (10,7), (10,7), (10,7), (10,7), >> >> (10,7), (10,7), (10,7), (10,7), (10,7), (10,7), (10,9), (10,9), >> >> (10,9), (10,9), (10,9), (10,9), (10,9), (10,9), (10,9), (10,9), >> >> (10,9), (10,9), (10,9), (10,9), (10,9), (10,9), (10,9), (10,9), >> >> (10,9), (10,9), (10,8), (10,8), (10,8), (10,8), (10,8), (10,8), >> >> (10,8), (10,8), (10,8), (10,8), (10,8), (10,8), (10,8), (10,8), >> >> (10,8), (10,8), (10,8), (10,8), (10,8), (10,8)] >> >> >> >> I assume, the problem is the clone() method of the ListAccumulator >> >> where we just create a shallow copy. This is fine for accumulators >> >> which use primitive objects, like IntCounter but here we have a real >> >> object. >> >> >> >> How do we work around this problem? >> >> >> >> Best regards, >> >> Max >> > >> >>