Hi,

I have a Flink program which is similar to Kmeans algorithm. I use normal
iteration(for loop) because Flink iteration does not allow to compute the
intermediate results(in this case the topDistance) within one iteration.
The problem is that my program only runs when maxIteration is small. When
the maxIterations is big, Flink jobs inside the forloop are not scheduled,
deployed or executed. The program hangs forever without any exception,
error or log message.

I ran the program on both local and cluster environments, having the same
issue. I tried with smaller inputs (points and seeds), having the same
issue.

Does anybody have an idea about what is the problem? (Maybe the forloop
creates many Flink jobs?)

Here is the pseudo-code of my program:

DataSet[Point] points = env.readTextFile(inputPoints)
DataSet[Point] seeds = env.readTextFile(inputSeeds)
discardNumber: Int = 100
maxIterations: Int = 20 // maxIteration = 30 will hang the program and no
Flink job inside the forloop jobs is deployed)

for(iteration <- 1 to maxIterations) {

      val intermediateSeeds = points
        .map()
        .withBroadcastSet(seeds, "seeds")

     //topDistance contains only only double value.
      var topDistance = intermediateSeeds
        .mapPartition()
        .first(discardNumber)
        .groupBy()
        .reduceGroup()

      val newSeeds = intermediateSeeds
        .map()
        .groupBy(0)
        .reduce ().withBroadcastSet(topDistance, "topDistance")
        .map()

      seeds = newSeeds
}

val finalResult = seeds.collect()


Thanks,
Truong

Reply via email to