Nested iterations are not supported via a "native iteration" operator.
There is no way to avoid the for loop at the moment.

I think it's not possible to tell why the results are wrong from the
code snippet. How do you propagate the counts back? In general I
expect this program to perform very badly for larger data sets,
because there is no support for caching intermediate results yet.



On Mon, Jul 4, 2016 at 11:56 AM, Adrian Bartnik
<bart...@campus.tu-berlin.de> wrote:
> Hi,
>
> I have a Flink programm, which outputs wrong results once I set the
> parallelism to a value larger that 1.
> If I run the programm with parallelism 1, everything works fine.
>
> The algorithm works on one input dataset, which will iteratively be split
> until the desired output split size is reached.
> The way how to split the cluster in each iteration is also determined
> iteratively.
>
> Pseudocode:
>
> val input = DataSet
>
> for (currentSplitNumber <- 1 to numberOfSplits) { // Split dataset until
> desired #splits was reached
>     // Iteratively compute best split
>     Dataset determinedSplit = Iteration involving input
>
>     // Split dataset to 2 smaller ones
>     val tmpDataSet1 = determinedSplit.filter(x ==1) ...
>     val tmpDataSet2 = determinedSplit.filter(x ==0) ...
>
>     tmpDataSet1.count() // These are necessary, to store the size of each
> split
>     tmpDataSet2.count()
>
>     // Store tmpDataSet1 and 2 as they are needed in one of the next loop
> executions (as dataset to be split)
>     ...
>
> }
>
> In all comes down to 2 nested loops, one of which can be replaced by a
> iteration.
> As nested iterations are not supported yet, I do not know how to avoid the
> outer loop.
>
> Is this a know problem, and if yes, what would be a solution?
>
> Best,
> Adrian

Reply via email to