Re: Implementing a list accumulator

2015-01-27 Thread Max Michels
Thank you for your help, Ufuk and Stephan. I made some changes to immediately serialize the stored objects. On Thu, Jan 22, 2015 at 2:58 AM, Stephan Ewen wrote: > True, that is tricky. The user code does not necessarily respect the > non-reuse mode. That may be true for any user code. Can the lis

Re: Implementing a list accumulator

2015-01-21 Thread Stephan Ewen
True, that is tricky. The user code does not necessarily respect the non-reuse mode. That may be true for any user code. Can the list accumulator immediately serialize the objects and send over a byte array? That should since it reliably without adding overhead (serialization will happen anyways).

Re: Implementing a list accumulator

2015-01-21 Thread Ufuk Celebi
The thing is that the DefaultCrossFunction always uses the same holder Tuple2 object, which is then handed over to the chained collect helper flatMap(). I can see why it is OK to keep the default functions to reuse "holder" objects, but when they are chained to an operator it becomes problematic

Re: Implementing a list accumulator

2015-01-21 Thread Ufuk Celebi
I just checked out your branch and ran it with a break point set at the CollectHelper. If you look into the (list) accumulator you see that always the same object is added to it. Strangely enough, object re-use is disabled in the config. I don't have time to look further into it now, but it seem

Implementing a list accumulator

2015-01-20 Thread Max Michels
Hi everyone, I'm running into some problems implementing a Accumulator for returning a list of a DataSet. https://github.com/mxm/flink/tree/count/collect Basically, it works fine in this test case: ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); Integer[] in