Hi,

I have a Flink programm, which outputs wrong results once I set the parallelism to a value larger that 1.
If I run the programm with parallelism 1, everything works fine.

The algorithm works on one input dataset, which will iteratively be split until the desired output split size is reached. The way how to split the cluster in each iteration is also determined iteratively.

Pseudocode:

val input = DataSet

for (currentSplitNumber <- 1 to numberOfSplits) { // Split dataset until desired #splits was reached
    // Iteratively compute best split
    Dataset determinedSplit = Iteration involving input

    // Split dataset to 2 smaller ones
    val tmpDataSet1 = determinedSplit.filter(x ==1) ...
    val tmpDataSet2 = determinedSplit.filter(x ==0) ...

tmpDataSet1.count() // These are necessary, to store the size of each split
    tmpDataSet2.count()

// Store tmpDataSet1 and 2 as they are needed in one of the next loop executions (as dataset to be split)
    ...

}

In all comes down to 2 nested loops, one of which can be replaced by a iteration. As nested iterations are not supported yet, I do not know how to avoid the outer loop.

Is this a know problem, and if yes, what would be a solution?

Best,
Adrian

Reply via email to