Hi Till,

you're right, my implementation wouldn't scale well for a very large number of 
features.Thank you for that hint! However, i'm not using that much features, so 
this shouldn't be the cause for the strange behaviour.


Yes, the 30 Minutes is the time for all jobs together. It's the time displayed 
in the web dashboard and I also thought about this beeing only the time for the 
last iteration, but then again it matches the time from starting the program 
until it quits. I.e. all iterations/jobs must be included in this time.


-Dan


Am 07.09.2016 um 17:25 schrieb Till Rohrmann:
Hi Dan,

first a general remark: I fear that your L-BFGS implementation is not well 
suited for large scale problems. You might wanna take a look at [1].

In the case of the while loop solution you're actually executing n jobs with n 
being the number of iterations. Thus, you have to add the execution times for 
all jobs together. Did you do that?

[1] https://papers.nips.cc/paper/5333-large-scale-l-bfgs-using-mapreduce.pdf

On Wed, Sep 7, 2016 at 3:43 PM, Dan Drewes 
<dre...@campus.tu-berlin.de<mailto:dre...@campus.tu-berlin.de>> wrote:

Thank you for your replys so far!

I've uploaded the files to github:

iterations: 
https://github.com/dan-drewes/thesis/blob/master/IterationsLBFGS.java

while loop: https://github.com/dan-drewes/thesis/blob/master/flinkLBFGS.java

The additional classes I wrote are there, too.

I had 32 task slots in total, that's 4 on each node, so their should actually 
be enough memory.

I don't know if there is a better way of profiling but I have the timelines for 
both versions attached with this post. As far as I can see it, there is not 
much difference in reading the data. However, maybe you can see anything i 
didn't.

Thanks again for your help,

Dan



Am 07.09.2016 um 10:23 schrieb Till Rohrmann:
Usually, the while loop solution should perform much worse since it will 
execute with each new iteration all previous iterations steps without 
persisting the intermediate results. Thus, it should have a quadratic 
complexity in terms of iteration step operations instead of a linear 
complexity. Additionally the while loop will suffer from memory fragmentation 
because of the explicit DAG unrolling.

I agree with Theo that access to the full code would help a lot to pinpoint the 
problem.

Cheers,
Till

On Tue, Sep 6, 2016 at 6:50 PM, Theodore Vasiloudis 
<theodoros.vasilou...@gmail.com<mailto:theodoros.vasilou...@gmail.com>> wrote:

Have you tried profiling the application to see where most of the time is spent 
during the runs?

If most of the time is spent reading in the data maybe any difference between 
the two methods is being obscured.

--
Sent from a mobile device. May contain autocorrect errors.

On Sep 6, 2016 4:55 PM, "Greg Hogan" 
<c...@greghogan.com<mailto:c...@greghogan.com>> wrote:
Hi Dan,

Flink currently allocates each task slot an equal portion of managed memory. I 
don't know the best way to count task slots.
  
https://ci.apache.org/projects/flink/flink-docs-master/concepts/index.html#workers-slots-resources

If you assign TaskManagers less memory then Linux will use the memory to cache 
spill files.

Greg

On Fri, Sep 2, 2016 at 11:30 AM, Dan Drewes 
<dre...@campus.tu-berlin.de<mailto:dre...@campus.tu-berlin.de>> wrote:

Hi Greg,

thanks for your response!

I just had a look and realized that it's just about 85 GB of data. Sorry about 
that wrong information.

It's read from a csv file on the master node's local file system. The 8 nodes 
have more than 40 GB available memory each and since the data is equally 
distributed I assume there should be no need to spill anything on disk.

There are 9 iterations.

Is it possible that also with Flink Iterations the data is repeatedly 
distributed? Or the other way around: Might it be that flink "remembers" 
somehow that the data is already distributed even for the while loop?

-Dan


Am 02.09.2016 um 16:39 schrieb Greg Hogan:
Hi Dan,

Where are you reading the 200 GB "data" from? How much memory per node? If the 
DataSet is read from a distributed filesystem and if with iterations Flink must 
spill to disk then I wouldn't expect much difference. About how many iterations 
are run in the 30 minutes? I don't know that this is reported explicitly, but 
if your convergence function only has one input record per iteration then the 
reported total is the iteration count.

One other thought, we should soon have support for object reuse with arrays 
(FLINK-3695). This would be implemented as DoubleValueArray or 
ValueArray<DoubleValue> rather than double[] but it would be interesting to 
test for a change in performance.

Greg

On Fri, Sep 2, 2016 at 6:16 AM, Dan Drewes 
<dre...@campus.tu-berlin.de<mailto:dre...@campus.tu-berlin.de>> wrote:
Hi,

for my bachelor thesis I'm testing an implementation of L-BFGS algorithm with 
Flink Iterations against a version without Flink Iterations but a casual while 
loop instead. Both programs use the same Map and Reduce transformations in each 
iteration. It was expected, that the performance of the Flink Iterations would 
scale better with increasing size of the input data set. However, the measured 
results on an ibm-power-cluster are very similar for both versions, e.g. around 
30 minutes for 200 GB data. The cluster has 8 nodes, was configured with 4 
slots per node and I used a total parallelism of 32.
In every Iteration of the while loop a new flink job is started and I thought, 
that also the data would be distributed over the network again in each 
iteration which should consume a significant and measurable amount of time. Is 
that thought wrong or what is the computional overhead of the flink iterations 
that is equalizing this disadvantage?
I include the relevant part of both programs and also attach the generated 
execution plans.
Thank you for any ideas as I could not find much about this issue in the flink 
docs.

Best, Dan

Flink Iterations:


DataSet<double[]> data = ...


State state = initialState(m, initweights,0,new double[initweights.length]);
DataSet<State> statedataset = env.fromElements(state);

//start of iteration section

IterativeDataSet<State> loop= statedataset.iterate(niter);;


DataSet<State> statewithnewlossgradient = 
data.map(difffunction).withBroadcastSet(loop, "state")
              .reduce(accumulate)
              .map(new NormLossGradient(datasize))
              .map(new SetLossGradient()).withBroadcastSet(loop,"state")
              .map(new LBFGS());


DataSet<State> converged = statewithnewlossgradient.filter(
   new FilterFunction<State>() {
      @Override
      public boolean filter(State value) throws Exception {
         if(value.getIflag()[0] == 0){
            return false;
         }
         return true;
      }
   }
);

DataSet<State> finalstate = loop.closeWith(statewithnewlossgradient,converged);

While loop:

DataSet<double[]> data =...
State state = initialState(m, initweights,0,new double[initweights.length]);

int cnt=0;
do{
   LBFGS lbfgs = new LBFGS();
   statedataset=data.map(difffunction).withBroadcastSet(statedataset, "state")
      .reduce(accumulate)
      .map(new NormLossGradient(datasize))
      .map(new SetLossGradient()).withBroadcastSet(statedataset,"state")
      .map(lbfgs);
   cnt++;
}while (cnt<niter && statedataset.collect().get(0).getIflag()[0] != 0);

[https://ipmcdn.avast.com/images/icons/icon-envelope-tick-round-orange-animated-tick-v1.gif]<https://www.avast.com/sig-email?utm_medium=email&utm_source=link&utm_campaign=sig-email&utm_content=emailclient>
   Virenfrei. 
www.avast.com<https://www.avast.com/sig-email?utm_medium=email&utm_source=link&utm_campaign=sig-email&utm_content=emailclient>

Reply via email to