Nested iterations are not supported via a "native iteration" operator. There is no way to avoid the for loop at the moment.
I think it's not possible to tell why the results are wrong from the code snippet. How do you propagate the counts back? In general I expect this program to perform very badly for larger data sets, because there is no support for caching intermediate results yet. On Mon, Jul 4, 2016 at 11:56 AM, Adrian Bartnik <bart...@campus.tu-berlin.de> wrote: > Hi, > > I have a Flink programm, which outputs wrong results once I set the > parallelism to a value larger that 1. > If I run the programm with parallelism 1, everything works fine. > > The algorithm works on one input dataset, which will iteratively be split > until the desired output split size is reached. > The way how to split the cluster in each iteration is also determined > iteratively. > > Pseudocode: > > val input = DataSet > > for (currentSplitNumber <- 1 to numberOfSplits) { // Split dataset until > desired #splits was reached > // Iteratively compute best split > Dataset determinedSplit = Iteration involving input > > // Split dataset to 2 smaller ones > val tmpDataSet1 = determinedSplit.filter(x ==1) ... > val tmpDataSet2 = determinedSplit.filter(x ==0) ... > > tmpDataSet1.count() // These are necessary, to store the size of each > split > tmpDataSet2.count() > > // Store tmpDataSet1 and 2 as they are needed in one of the next loop > executions (as dataset to be split) > ... > > } > > In all comes down to 2 nested loops, one of which can be replaced by a > iteration. > As nested iterations are not supported yet, I do not know how to avoid the > outer loop. > > Is this a know problem, and if yes, what would be a solution? > > Best, > Adrian