True, that is tricky. The user code does not necessarily respect the
non-reuse mode. That may be true for any user code. Can the list
accumulator immediately serialize the objects and send over a byte array?
That should since it reliably without adding overhead (serialization will
happen anyways).
Am 21.01.2015 11:04 schrieb "Ufuk Celebi" <u...@apache.org>:

> The thing is that the DefaultCrossFunction always uses the same holder
> Tuple2 object, which is then handed over to the chained collect helper
> flatMap(). I can see why it is OK to keep the default functions to reuse
> "holder" objects, but when they are chained to an operator it becomes
> problematic.
>
> On 21 Jan 2015, at 17:12, Ufuk Celebi <u...@apache.org> wrote:
>
> > I just checked out your branch and ran it with a break point set at the
> CollectHelper. If you look into the (list) accumulator you see that always
> the same object is added to it. Strangely enough, object re-use is disabled
> in the config. I don't have time to look further into it now, but it seems
> to be a problem with the object re-use mode.
> >
> > – Ufuk
> >
> > On 20 Jan 2015, at 20:53, Max Michels <m...@data-artisans.com> wrote:
> >
> >> Hi everyone,
> >>
> >> I'm running into some problems implementing a Accumulator for
> >> returning a list of a DataSet.
> >>
> >> https://github.com/mxm/flink/tree/count/collect
> >>
> >> Basically, it works fine in this test case:
> >>
> >>
> >>   ExecutionEnvironment env =
> ExecutionEnvironment.getExecutionEnvironment();
> >>
> >>   Integer[] input = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10};
> >>
> >>   DataSet<Integer> data = env.fromElements(input);
> >>
> >>   // count
> >>   long numEntries = data.count();
> >>   assertEquals(10, numEntries);
> >>
> >>   // collect
> >>   ArrayList<Integer> list = (ArrayList<Integer>) data.collect();
> >>   assertArrayEquals(input, list.toArray());
> >>
> >>
> >> But with non-primitive objects strange results occur:
> >>
> >> ExecutionEnvironment env =
> ExecutionEnvironment.getExecutionEnvironment();
> >> env.getConfig().disableObjectReuse();
> >>
> >> DataSet<Integer> data = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
> >> DataSet<Integer> data2 = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9,
> 10);
> >>
> >> DataSet<Tuple2<Integer, Integer>> data3 = data.cross(data2);
> >>
> >> // count
> >> long numEntries = data3.count();
> >> assertEquals(100, numEntries);
> >>
> >> // collect
> >> ArrayList<Tuple2<Integer, Integer>> list = (ArrayList<Tuple2<Integer,
> >> Integer>>) data3.collect();
> >>
> >> System.out.println(list)
> >>
> >> ....
> >>
> >> Output:
> >>
> >> [(10,10), (10,10), (10,10), (10,10), (10,10), (10,10), (10,10),
> >> (10,10), (10,10), (10,10), (10,10), (10,10), (10,10), (10,10),
> >> (10,10), (10,10), (10,10), (10,10), (10,10), (10,10), (10,6), (10,6),
> >> (10,6), (10,6), (10,6), (10,6), (10,6), (10,6), (10,6), (10,6),
> >> (10,6), (10,6), (10,6), (10,6), (10,6), (10,6), (10,6), (10,6),
> >> (10,6), (10,6), (10,7), (10,7), (10,7), (10,7), (10,7), (10,7),
> >> (10,7), (10,7), (10,7), (10,7), (10,7), (10,7), (10,7), (10,7),
> >> (10,7), (10,7), (10,7), (10,7), (10,7), (10,7), (10,9), (10,9),
> >> (10,9), (10,9), (10,9), (10,9), (10,9), (10,9), (10,9), (10,9),
> >> (10,9), (10,9), (10,9), (10,9), (10,9), (10,9), (10,9), (10,9),
> >> (10,9), (10,9), (10,8), (10,8), (10,8), (10,8), (10,8), (10,8),
> >> (10,8), (10,8), (10,8), (10,8), (10,8), (10,8), (10,8), (10,8),
> >> (10,8), (10,8), (10,8), (10,8), (10,8), (10,8)]
> >>
> >> I assume, the problem is the clone() method of the ListAccumulator
> >> where we just create a shallow copy. This is fine for accumulators
> >> which use primitive objects, like IntCounter but here we have a real
> >> object.
> >>
> >> How do we work around this problem?
> >>
> >> Best regards,
> >> Max
> >
>
>

Reply via email to