Hi Dan,

Flink currently allocates each task slot an equal portion of managed
memory. I don't know the best way to count task slots.

https://ci.apache.org/projects/flink/flink-docs-master/concepts/index.html#workers-slots-resources

If you assign TaskManagers less memory then Linux will use the memory to
cache spill files.

Greg

On Fri, Sep 2, 2016 at 11:30 AM, Dan Drewes <dre...@campus.tu-berlin.de>
wrote:

> Hi Greg,
>
> thanks for your response!
>
> I just had a look and realized that it's just about 85 GB of data. Sorry
> about that wrong information.
>
> It's read from a csv file on the master node's local file system. The 8
> nodes have more than 40 GB available memory each and since the data is
> equally distributed I assume there should be no need to spill anything on
> disk.
>
> There are 9 iterations.
>
> Is it possible that also with Flink Iterations the data is repeatedly
> distributed? Or the other way around: Might it be that flink "remembers"
> somehow that the data is already distributed even for the while loop?
>
> -Dan
>
>
>
> Am 02.09.2016 um 16:39 schrieb Greg Hogan:
>
> Hi Dan,
>
> Where are you reading the 200 GB "data" from? How much memory per node? If
> the DataSet is read from a distributed filesystem and if with iterations
> Flink must spill to disk then I wouldn't expect much difference. About how
> many iterations are run in the 30 minutes? I don't know that this is
> reported explicitly, but if your convergence function only has one input
> record per iteration then the reported total is the iteration count.
>
> One other thought, we should soon have support for object reuse with
> arrays (FLINK-3695). This would be implemented as DoubleValueArray or
> ValueArray<DoubleValue> rather than double[] but it would be interesting to
> test for a change in performance.
>
> Greg
>
> On Fri, Sep 2, 2016 at 6:16 AM, Dan Drewes <dre...@campus.tu-berlin.de>
> wrote:
>
>> Hi,
>>
>> for my bachelor thesis I'm testing an implementation of L-BFGS algorithm
>> with Flink Iterations against a version without Flink Iterations but a
>> casual while loop instead. Both programs use the same Map and Reduce
>> transformations in each iteration. It was expected, that the performance of
>> the Flink Iterations would scale better with increasing size of the input
>> data set. However, the measured results on an ibm-power-cluster are very
>> similar for both versions, e.g. around 30 minutes for 200 GB data. The
>> cluster has 8 nodes, was configured with 4 slots per node and I used a
>> total parallelism of 32.
>> In every Iteration of the while loop a new flink job is started and I
>> thought, that also the data would be distributed over the network again in
>> each iteration which should consume a significant and measurable amount of
>> time. Is that thought wrong or what is the computional overhead of the
>> flink iterations that is equalizing this disadvantage?
>> I include the relevant part of both programs and also attach the
>> generated execution plans.
>> Thank you for any ideas as I could not find much about this issue in the
>> flink docs.
>>
>> Best, Dan
>>
>> *Flink Iterations:*
>>
>> DataSet<double[]> data = ...
>>
>> State state = initialState(m, initweights,0,new double[initweights.length]);
>> DataSet<State> statedataset = env.fromElements(state);
>> //start of iteration sectionIterativeDataSet<State> loop= 
>> statedataset.iterate(niter);;
>>
>>
>> DataSet<State> statewithnewlossgradient = 
>> data.map(difffunction).withBroadcastSet(loop, "state")
>>               .reduce(accumulate)
>>               .map(new NormLossGradient(datasize))
>>               .map(new SetLossGradient()).withBroadcastSet(loop,"state")
>>               .map(new LBFGS());
>>
>>
>> DataSet<State> converged = statewithnewlossgradient.filter(
>>    new FilterFunction<State>() {
>>       @Override      public boolean filter(State value) throws Exception {
>>          if(value.getIflag()[0] == 0){
>>             return false;
>>          }
>>          return true;
>>       }
>>    }
>> );
>>
>> DataSet<State> finalstate = 
>> loop.closeWith(statewithnewlossgradient,converged);
>>
>> *While loop: *
>>
>> DataSet<double[]> data =...
>> State state = initialState(m, initweights,0,new double[initweights.length]);
>> int cnt=0;do{
>>    LBFGS lbfgs = new LBFGS();
>>    statedataset=data.map(difffunction).withBroadcastSet(statedataset, 
>> "state")
>>       .reduce(accumulate)
>>       .map(new NormLossGradient(datasize))
>>       .map(new SetLossGradient()).withBroadcastSet(statedataset,"state")
>>       .map(lbfgs);
>>    cnt++;
>> }while (cnt<niter && statedataset.collect().get(0).getIflag()[0] != 0);
>>
>>

Reply via email to