Hi Vasia,

Thank you very much for your explanation :). When running with small
maxIteration, the job graph that Flink executed was optimal. However, when
maxIterations was large, Flink took very long time to generate the job
graph. The actually time to execute the jobs was very fast but the time to
optimize and schedule the jobs was slow.

Regarding your suggestion, I didn't use iterate/iterateDelta because I need
to access the intermediate results within an iteration (the topDistance in
my pseudo-code). As you said before, Flink does not support that feature,
so I wondered if you have a workround for interate or iterateDelta?

Thanks,
Truong

On Tue, Jul 5, 2016 at 8:46 PM, Vasiliki Kalavri <vasilikikala...@gmail.com>
wrote:

> Hi Truong,
>
> I'm afraid what you're experiencing is to be expected. Currently, for
> loops do not perform well in Flink since there is no support for caching
> intermediate results yet. This has been a quite often requested feature
> lately, so maybe it will be added soon :)
> Until then, I suggest you try implementing your logic using iterate or
> iterateDelta.
>
> Cheers,
> -Vasia.
>
> On 5 July 2016 at 17:11, Nguyen Xuan Truong <truongn...@gmail.com> wrote:
>
>> Hi,
>>
>> I have a Flink program which is similar to Kmeans algorithm. I use normal
>> iteration(for loop) because Flink iteration does not allow to compute the
>> intermediate results(in this case the topDistance) within one iteration.
>> The problem is that my program only runs when maxIteration is small. When
>> the maxIterations is big, Flink jobs inside the forloop are not scheduled,
>> deployed or executed. The program hangs forever without any exception,
>> error or log message.
>>
>> I ran the program on both local and cluster environments, having the same
>> issue. I tried with smaller inputs (points and seeds), having the same
>> issue.
>>
>> Does anybody have an idea about what is the problem? (Maybe the forloop
>> creates many Flink jobs?)
>>
>> Here is the pseudo-code of my program:
>>
>> DataSet[Point] points = env.readTextFile(inputPoints)
>> DataSet[Point] seeds = env.readTextFile(inputSeeds)
>> discardNumber: Int = 100
>> maxIterations: Int = 20 // maxIteration = 30 will hang the program and no
>> Flink job inside the forloop jobs is deployed)
>>
>> for(iteration <- 1 to maxIterations) {
>>
>>       val intermediateSeeds = points
>>         .map()
>>         .withBroadcastSet(seeds, "seeds")
>>
>>      //topDistance contains only only double value.
>>       var topDistance = intermediateSeeds
>>         .mapPartition()
>>         .first(discardNumber)
>>         .groupBy()
>>         .reduceGroup()
>>
>>       val newSeeds = intermediateSeeds
>>         .map()
>>         .groupBy(0)
>>         .reduce ().withBroadcastSet(topDistance, "topDistance")
>>         .map()
>>
>>       seeds = newSeeds
>> }
>>
>> val finalResult = seeds.collect()
>>
>>
>> Thanks,
>> Truong
>>
>
>

Reply via email to