Hi, I have a Flink program which is similar to Kmeans algorithm. I use normal iteration(for loop) because Flink iteration does not allow to compute the intermediate results(in this case the topDistance) within one iteration. The problem is that my program only runs when maxIteration is small. When the maxIterations is big, Flink jobs inside the forloop are not scheduled, deployed or executed. The program hangs forever without any exception, error or log message.
I ran the program on both local and cluster environments, having the same issue. I tried with smaller inputs (points and seeds), having the same issue. Does anybody have an idea about what is the problem? (Maybe the forloop creates many Flink jobs?) Here is the pseudo-code of my program: DataSet[Point] points = env.readTextFile(inputPoints) DataSet[Point] seeds = env.readTextFile(inputSeeds) discardNumber: Int = 100 maxIterations: Int = 20 // maxIteration = 30 will hang the program and no Flink job inside the forloop jobs is deployed) for(iteration <- 1 to maxIterations) { val intermediateSeeds = points .map() .withBroadcastSet(seeds, "seeds") //topDistance contains only only double value. var topDistance = intermediateSeeds .mapPartition() .first(discardNumber) .groupBy() .reduceGroup() val newSeeds = intermediateSeeds .map() .groupBy(0) .reduce ().withBroadcastSet(topDistance, "topDistance") .map() seeds = newSeeds } val finalResult = seeds.collect() Thanks, Truong