Hi Dan, first a general remark: I fear that your L-BFGS implementation is not well suited for large scale problems. You might wanna take a look at [1].
In the case of the while loop solution you're actually executing n jobs with n being the number of iterations. Thus, you have to add the execution times for all jobs together. Did you do that? [1] https://papers.nips.cc/paper/5333-large-scale-l-bfgs-using-mapreduce.pdf On Wed, Sep 7, 2016 at 3:43 PM, Dan Drewes <dre...@campus.tu-berlin.de> wrote: > Thank you for your replys so far! > > I've uploaded the files to github: > > iterations: https://github.com/dan-drewes/thesis/blob/master/ > IterationsLBFGS.java > > while loop: https://github.com/dan-drewes/thesis/blob/master/flinkLBFGS. > java > > The additional classes I wrote are there, too. > > I had 32 task slots in total, that's 4 on each node, so their should > actually be enough memory. > > I don't know if there is a better way of profiling but I have the > timelines for both versions attached with this post. As far as I can see > it, there is not much difference in reading the data. However, maybe you > can see anything i didn't. > > Thanks again for your help, > Dan > > > > Am 07.09.2016 um 10:23 schrieb Till Rohrmann: > > Usually, the while loop solution should perform much worse since it will > execute with each new iteration all previous iterations steps without > persisting the intermediate results. Thus, it should have a quadratic > complexity in terms of iteration step operations instead of a linear > complexity. Additionally the while loop will suffer from memory > fragmentation because of the explicit DAG unrolling. > > I agree with Theo that access to the full code would help a lot to > pinpoint the problem. > > Cheers, > Till > > On Tue, Sep 6, 2016 at 6:50 PM, Theodore Vasiloudis < > theodoros.vasilou...@gmail.com> wrote: > >> Have you tried profiling the application to see where most of the time is >> spent during the runs? >> >> If most of the time is spent reading in the data maybe any difference >> between the two methods is being obscured. >> >> -- >> Sent from a mobile device. May contain autocorrect errors. >> >> On Sep 6, 2016 4:55 PM, "Greg Hogan" <c...@greghogan.com> wrote: >> >>> Hi Dan, >>> >>> Flink currently allocates each task slot an equal portion of managed >>> memory. I don't know the best way to count task slots. >>> https://ci.apache.org/projects/flink/flink-docs-master/conce >>> pts/index.html#workers-slots-resources >>> >>> If you assign TaskManagers less memory then Linux will use the memory to >>> cache spill files. >>> >>> Greg >>> >>> On Fri, Sep 2, 2016 at 11:30 AM, Dan Drewes <dre...@campus.tu-berlin.de> >>> wrote: >>> >>>> Hi Greg, >>>> >>>> thanks for your response! >>>> >>>> I just had a look and realized that it's just about 85 GB of data. >>>> Sorry about that wrong information. >>>> >>>> It's read from a csv file on the master node's local file system. The 8 >>>> nodes have more than 40 GB available memory each and since the data is >>>> equally distributed I assume there should be no need to spill anything on >>>> disk. >>>> >>>> There are 9 iterations. >>>> >>>> Is it possible that also with Flink Iterations the data is repeatedly >>>> distributed? Or the other way around: Might it be that flink "remembers" >>>> somehow that the data is already distributed even for the while loop? >>>> >>>> -Dan >>>> >>>> >>>> >>>> Am 02.09.2016 um 16:39 schrieb Greg Hogan: >>>> >>>> Hi Dan, >>>> >>>> Where are you reading the 200 GB "data" from? How much memory per node? >>>> If the DataSet is read from a distributed filesystem and if with iterations >>>> Flink must spill to disk then I wouldn't expect much difference. About how >>>> many iterations are run in the 30 minutes? I don't know that this is >>>> reported explicitly, but if your convergence function only has one input >>>> record per iteration then the reported total is the iteration count. >>>> >>>> One other thought, we should soon have support for object reuse with >>>> arrays (FLINK-3695). This would be implemented as DoubleValueArray or >>>> ValueArray<DoubleValue> rather than double[] but it would be interesting to >>>> test for a change in performance. >>>> >>>> Greg >>>> >>>> On Fri, Sep 2, 2016 at 6:16 AM, Dan Drewes <dre...@campus.tu-berlin.de> >>>> wrote: >>>> >>>>> Hi, >>>>> >>>>> for my bachelor thesis I'm testing an implementation of L-BFGS >>>>> algorithm with Flink Iterations against a version without Flink Iterations >>>>> but a casual while loop instead. Both programs use the same Map and Reduce >>>>> transformations in each iteration. It was expected, that the performance >>>>> of >>>>> the Flink Iterations would scale better with increasing size of the input >>>>> data set. However, the measured results on an ibm-power-cluster are very >>>>> similar for both versions, e.g. around 30 minutes for 200 GB data. The >>>>> cluster has 8 nodes, was configured with 4 slots per node and I used a >>>>> total parallelism of 32. >>>>> In every Iteration of the while loop a new flink job is started and I >>>>> thought, that also the data would be distributed over the network again in >>>>> each iteration which should consume a significant and measurable amount of >>>>> time. Is that thought wrong or what is the computional overhead of the >>>>> flink iterations that is equalizing this disadvantage? >>>>> I include the relevant part of both programs and also attach the >>>>> generated execution plans. >>>>> Thank you for any ideas as I could not find much about this issue in >>>>> the flink docs. >>>>> >>>>> Best, Dan >>>>> >>>>> *Flink Iterations:* >>>>> >>>>> DataSet<double[]> data = ... >>>>> >>>>> State state = initialState(m, initweights,0,new >>>>> double[initweights.length]); >>>>> DataSet<State> statedataset = env.fromElements(state); >>>>> //start of iteration sectionIterativeDataSet<State> loop= >>>>> statedataset.iterate(niter);; >>>>> >>>>> >>>>> DataSet<State> statewithnewlossgradient = >>>>> data.map(difffunction).withBroadcastSet(loop, "state") >>>>> .reduce(accumulate) >>>>> .map(new NormLossGradient(datasize)) >>>>> .map(new SetLossGradient()).withBroadcastSet(loop,"state") >>>>> .map(new LBFGS()); >>>>> >>>>> >>>>> DataSet<State> converged = statewithnewlossgradient.filter( >>>>> new FilterFunction<State>() { >>>>> @Override public boolean filter(State value) throws Exception { >>>>> if(value.getIflag()[0] == 0){ >>>>> return false; >>>>> } >>>>> return true; >>>>> } >>>>> } >>>>> ); >>>>> >>>>> DataSet<State> finalstate = >>>>> loop.closeWith(statewithnewlossgradient,converged); >>>>> >>>>> *While loop: * >>>>> >>>>> DataSet<double[]> data =... >>>>> State state = initialState(m, initweights,0,new >>>>> double[initweights.length]); >>>>> int cnt=0;do{ >>>>> LBFGS lbfgs = new LBFGS(); >>>>> statedataset=data.map(difffunction).withBroadcastSet(statedataset, >>>>> "state") >>>>> .reduce(accumulate) >>>>> .map(new NormLossGradient(datasize)) >>>>> .map(new SetLossGradient()).withBroadcastSet(statedataset,"state") >>>>> .map(lbfgs); >>>>> cnt++; >>>>> }while (cnt<niter && statedataset.collect().get(0).getIflag()[0] != 0); >>>>> >>>>> > > <https://www.avast.com/sig-email?utm_medium=email&utm_source=link&utm_campaign=sig-email&utm_content=emailclient> > Virenfrei. > www.avast.com > <https://www.avast.com/sig-email?utm_medium=email&utm_source=link&utm_campaign=sig-email&utm_content=emailclient> >