Hi Dan, Where are you reading the 200 GB "data" from? How much memory per node? If the DataSet is read from a distributed filesystem and if with iterations Flink must spill to disk then I wouldn't expect much difference. About how many iterations are run in the 30 minutes? I don't know that this is reported explicitly, but if your convergence function only has one input record per iteration then the reported total is the iteration count.
One other thought, we should soon have support for object reuse with arrays (FLINK-3695). This would be implemented as DoubleValueArray or ValueArray<DoubleValue> rather than double[] but it would be interesting to test for a change in performance. Greg On Fri, Sep 2, 2016 at 6:16 AM, Dan Drewes <dre...@campus.tu-berlin.de> wrote: > Hi, > > for my bachelor thesis I'm testing an implementation of L-BFGS algorithm > with Flink Iterations against a version without Flink Iterations but a > casual while loop instead. Both programs use the same Map and Reduce > transformations in each iteration. It was expected, that the performance of > the Flink Iterations would scale better with increasing size of the input > data set. However, the measured results on an ibm-power-cluster are very > similar for both versions, e.g. around 30 minutes for 200 GB data. The > cluster has 8 nodes, was configured with 4 slots per node and I used a > total parallelism of 32. > In every Iteration of the while loop a new flink job is started and I > thought, that also the data would be distributed over the network again in > each iteration which should consume a significant and measurable amount of > time. Is that thought wrong or what is the computional overhead of the > flink iterations that is equalizing this disadvantage? > I include the relevant part of both programs and also attach the generated > execution plans. > Thank you for any ideas as I could not find much about this issue in the > flink docs. > > Best, Dan > > *Flink Iterations:* > > DataSet<double[]> data = ... > > State state = initialState(m, initweights,0,new double[initweights.length]); > DataSet<State> statedataset = env.fromElements(state); > //start of iteration sectionIterativeDataSet<State> loop= > statedataset.iterate(niter);; > > > DataSet<State> statewithnewlossgradient = > data.map(difffunction).withBroadcastSet(loop, "state") > .reduce(accumulate) > .map(new NormLossGradient(datasize)) > .map(new SetLossGradient()).withBroadcastSet(loop,"state") > .map(new LBFGS()); > > > DataSet<State> converged = statewithnewlossgradient.filter( > new FilterFunction<State>() { > @Override public boolean filter(State value) throws Exception { > if(value.getIflag()[0] == 0){ > return false; > } > return true; > } > } > ); > > DataSet<State> finalstate = > loop.closeWith(statewithnewlossgradient,converged); > > > > > *While loop: * > > DataSet<double[]> data =... > State state = initialState(m, initweights,0,new double[initweights.length]); > int cnt=0;do{ > LBFGS lbfgs = new LBFGS(); > statedataset=data.map(difffunction).withBroadcastSet(statedataset, "state") > .reduce(accumulate) > .map(new NormLossGradient(datasize)) > .map(new SetLossGradient()).withBroadcastSet(statedataset,"state") > .map(lbfgs); > cnt++; > }while (cnt<niter && statedataset.collect().get(0).getIflag()[0] != 0); > > >