Hi Dan, Flink currently allocates each task slot an equal portion of managed memory. I don't know the best way to count task slots.
https://ci.apache.org/projects/flink/flink-docs-master/concepts/index.html#workers-slots-resources If you assign TaskManagers less memory then Linux will use the memory to cache spill files. Greg On Fri, Sep 2, 2016 at 11:30 AM, Dan Drewes <dre...@campus.tu-berlin.de> wrote: > Hi Greg, > > thanks for your response! > > I just had a look and realized that it's just about 85 GB of data. Sorry > about that wrong information. > > It's read from a csv file on the master node's local file system. The 8 > nodes have more than 40 GB available memory each and since the data is > equally distributed I assume there should be no need to spill anything on > disk. > > There are 9 iterations. > > Is it possible that also with Flink Iterations the data is repeatedly > distributed? Or the other way around: Might it be that flink "remembers" > somehow that the data is already distributed even for the while loop? > > -Dan > > > > Am 02.09.2016 um 16:39 schrieb Greg Hogan: > > Hi Dan, > > Where are you reading the 200 GB "data" from? How much memory per node? If > the DataSet is read from a distributed filesystem and if with iterations > Flink must spill to disk then I wouldn't expect much difference. About how > many iterations are run in the 30 minutes? I don't know that this is > reported explicitly, but if your convergence function only has one input > record per iteration then the reported total is the iteration count. > > One other thought, we should soon have support for object reuse with > arrays (FLINK-3695). This would be implemented as DoubleValueArray or > ValueArray<DoubleValue> rather than double[] but it would be interesting to > test for a change in performance. > > Greg > > On Fri, Sep 2, 2016 at 6:16 AM, Dan Drewes <dre...@campus.tu-berlin.de> > wrote: > >> Hi, >> >> for my bachelor thesis I'm testing an implementation of L-BFGS algorithm >> with Flink Iterations against a version without Flink Iterations but a >> casual while loop instead. Both programs use the same Map and Reduce >> transformations in each iteration. It was expected, that the performance of >> the Flink Iterations would scale better with increasing size of the input >> data set. However, the measured results on an ibm-power-cluster are very >> similar for both versions, e.g. around 30 minutes for 200 GB data. The >> cluster has 8 nodes, was configured with 4 slots per node and I used a >> total parallelism of 32. >> In every Iteration of the while loop a new flink job is started and I >> thought, that also the data would be distributed over the network again in >> each iteration which should consume a significant and measurable amount of >> time. Is that thought wrong or what is the computional overhead of the >> flink iterations that is equalizing this disadvantage? >> I include the relevant part of both programs and also attach the >> generated execution plans. >> Thank you for any ideas as I could not find much about this issue in the >> flink docs. >> >> Best, Dan >> >> *Flink Iterations:* >> >> DataSet<double[]> data = ... >> >> State state = initialState(m, initweights,0,new double[initweights.length]); >> DataSet<State> statedataset = env.fromElements(state); >> //start of iteration sectionIterativeDataSet<State> loop= >> statedataset.iterate(niter);; >> >> >> DataSet<State> statewithnewlossgradient = >> data.map(difffunction).withBroadcastSet(loop, "state") >> .reduce(accumulate) >> .map(new NormLossGradient(datasize)) >> .map(new SetLossGradient()).withBroadcastSet(loop,"state") >> .map(new LBFGS()); >> >> >> DataSet<State> converged = statewithnewlossgradient.filter( >> new FilterFunction<State>() { >> @Override public boolean filter(State value) throws Exception { >> if(value.getIflag()[0] == 0){ >> return false; >> } >> return true; >> } >> } >> ); >> >> DataSet<State> finalstate = >> loop.closeWith(statewithnewlossgradient,converged); >> >> *While loop: * >> >> DataSet<double[]> data =... >> State state = initialState(m, initweights,0,new double[initweights.length]); >> int cnt=0;do{ >> LBFGS lbfgs = new LBFGS(); >> statedataset=data.map(difffunction).withBroadcastSet(statedataset, >> "state") >> .reduce(accumulate) >> .map(new NormLossGradient(datasize)) >> .map(new SetLossGradient()).withBroadcastSet(statedataset,"state") >> .map(lbfgs); >> cnt++; >> }while (cnt<niter && statedataset.collect().get(0).getIflag()[0] != 0); >> >>