Thank you for your help, Ufuk and Stephan. I made some changes to
immediately serialize the stored objects.

On Thu, Jan 22, 2015 at 2:58 AM, Stephan Ewen <se...@apache.org> wrote:
> True, that is tricky. The user code does not necessarily respect the
> non-reuse mode. That may be true for any user code. Can the list
> accumulator immediately serialize the objects and send over a byte array?
> That should since it reliably without adding overhead (serialization will
> happen anyways).
> Am 21.01.2015 11:04 schrieb "Ufuk Celebi" <u...@apache.org>:
>
>> The thing is that the DefaultCrossFunction always uses the same holder
>> Tuple2 object, which is then handed over to the chained collect helper
>> flatMap(). I can see why it is OK to keep the default functions to reuse
>> "holder" objects, but when they are chained to an operator it becomes
>> problematic.
>>
>> On 21 Jan 2015, at 17:12, Ufuk Celebi <u...@apache.org> wrote:
>>
>> > I just checked out your branch and ran it with a break point set at the
>> CollectHelper. If you look into the (list) accumulator you see that always
>> the same object is added to it. Strangely enough, object re-use is disabled
>> in the config. I don't have time to look further into it now, but it seems
>> to be a problem with the object re-use mode.
>> >
>> > – Ufuk
>> >
>> > On 20 Jan 2015, at 20:53, Max Michels <m...@data-artisans.com> wrote:
>> >
>> >> Hi everyone,
>> >>
>> >> I'm running into some problems implementing a Accumulator for
>> >> returning a list of a DataSet.
>> >>
>> >> https://github.com/mxm/flink/tree/count/collect
>> >>
>> >> Basically, it works fine in this test case:
>> >>
>> >>
>> >>   ExecutionEnvironment env =
>> ExecutionEnvironment.getExecutionEnvironment();
>> >>
>> >>   Integer[] input = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10};
>> >>
>> >>   DataSet<Integer> data = env.fromElements(input);
>> >>
>> >>   // count
>> >>   long numEntries = data.count();
>> >>   assertEquals(10, numEntries);
>> >>
>> >>   // collect
>> >>   ArrayList<Integer> list = (ArrayList<Integer>) data.collect();
>> >>   assertArrayEquals(input, list.toArray());
>> >>
>> >>
>> >> But with non-primitive objects strange results occur:
>> >>
>> >> ExecutionEnvironment env =
>> ExecutionEnvironment.getExecutionEnvironment();
>> >> env.getConfig().disableObjectReuse();
>> >>
>> >> DataSet<Integer> data = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
>> >> DataSet<Integer> data2 = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9,
>> 10);
>> >>
>> >> DataSet<Tuple2<Integer, Integer>> data3 = data.cross(data2);
>> >>
>> >> // count
>> >> long numEntries = data3.count();
>> >> assertEquals(100, numEntries);
>> >>
>> >> // collect
>> >> ArrayList<Tuple2<Integer, Integer>> list = (ArrayList<Tuple2<Integer,
>> >> Integer>>) data3.collect();
>> >>
>> >> System.out.println(list)
>> >>
>> >> ....
>> >>
>> >> Output:
>> >>
>> >> [(10,10), (10,10), (10,10), (10,10), (10,10), (10,10), (10,10),
>> >> (10,10), (10,10), (10,10), (10,10), (10,10), (10,10), (10,10),
>> >> (10,10), (10,10), (10,10), (10,10), (10,10), (10,10), (10,6), (10,6),
>> >> (10,6), (10,6), (10,6), (10,6), (10,6), (10,6), (10,6), (10,6),
>> >> (10,6), (10,6), (10,6), (10,6), (10,6), (10,6), (10,6), (10,6),
>> >> (10,6), (10,6), (10,7), (10,7), (10,7), (10,7), (10,7), (10,7),
>> >> (10,7), (10,7), (10,7), (10,7), (10,7), (10,7), (10,7), (10,7),
>> >> (10,7), (10,7), (10,7), (10,7), (10,7), (10,7), (10,9), (10,9),
>> >> (10,9), (10,9), (10,9), (10,9), (10,9), (10,9), (10,9), (10,9),
>> >> (10,9), (10,9), (10,9), (10,9), (10,9), (10,9), (10,9), (10,9),
>> >> (10,9), (10,9), (10,8), (10,8), (10,8), (10,8), (10,8), (10,8),
>> >> (10,8), (10,8), (10,8), (10,8), (10,8), (10,8), (10,8), (10,8),
>> >> (10,8), (10,8), (10,8), (10,8), (10,8), (10,8)]
>> >>
>> >> I assume, the problem is the clone() method of the ListAccumulator
>> >> where we just create a shallow copy. This is fine for accumulators
>> >> which use primitive objects, like IntCounter but here we have a real
>> >> object.
>> >>
>> >> How do we work around this problem?
>> >>
>> >> Best regards,
>> >> Max
>> >
>>
>>

Reply via email to