Hi Greg,
thanks for your response!
I just had a look and realized that it's just about 85 GB of data. Sorry
about that wrong information.
It's read from a csv file on the master node's local file system. The 8
nodes have more than 40 GB available memory each and since the data is
equally distributed I assume there should be no need to spill anything
on disk.
There are 9 iterations.
Is it possible that also with Flink Iterations the data is repeatedly
distributed? Or the other way around: Might it be that flink "remembers"
somehow that the data is already distributed even for the while loop?
-Dan
Am 02.09.2016 um 16:39 schrieb Greg Hogan:
Hi Dan,
Where are you reading the 200 GB "data" from? How much memory per
node? If the DataSet is read from a distributed filesystem and if with
iterations Flink must spill to disk then I wouldn't expect much
difference. About how many iterations are run in the 30 minutes? I
don't know that this is reported explicitly, but if your convergence
function only has one input record per iteration then the reported
total is the iteration count.
One other thought, we should soon have support for object reuse with
arrays (FLINK-3695). This would be implemented as DoubleValueArray or
ValueArray<DoubleValue> rather than double[] but it would be
interesting to test for a change in performance.
Greg
On Fri, Sep 2, 2016 at 6:16 AM, Dan Drewes <dre...@campus.tu-berlin.de
<mailto:dre...@campus.tu-berlin.de>> wrote:
Hi,
for my bachelor thesis I'm testing an implementation of L-BFGS
algorithm with Flink Iterations against a version without Flink
Iterations but a casual while loop instead. Both programs use the
same Map and Reduce transformations in each iteration. It was
expected, that the performance of the Flink Iterations would scale
better with increasing size of the input data set. However, the
measured results on an ibm-power-cluster are very similar for both
versions, e.g. around 30 minutes for 200 GB data. The cluster has
8 nodes, was configured with 4 slots per node and I used a total
parallelism of 32.
In every Iteration of the while loop a new flink job is started
and I thought, that also the data would be distributed over the
network again in each iteration which should consume a significant
and measurable amount of time. Is that thought wrong or what is
the computional overhead of the flink iterations that is
equalizing this disadvantage?
I include the relevant part of both programs and also attach the
generated execution plans.
Thank you for any ideas as I could not find much about this issue
in the flink docs.
Best, Dan
*Flink Iterations:*
DataSet<double[]> data = ...
State state =initialState(m, initweights,0,new double[initweights.length]);
DataSet<State> statedataset = env.fromElements(state);
//start of iteration section IterativeDataSet<State> loop=
statedataset.iterate(niter);;
DataSet<State> statewithnewlossgradient =
data.map(difffunction).withBroadcastSet(loop,"state")
.reduce(accumulate)
.map(new NormLossGradient(datasize))
.map(new SetLossGradient()).withBroadcastSet(loop,"state")
.map(new LBFGS());
DataSet<State> converged = statewithnewlossgradient.filter(
new FilterFunction<State>() {
@Override public boolean filter(State value)throws Exception {
if(value.getIflag()[0] ==0){
return false;
}
return true;
}
}
);
DataSet<State> finalstate =
loop.closeWith(statewithnewlossgradient,converged);
***While loop: *
DataSet<double[]> data =... State state =initialState(m, initweights,0,new
double[initweights.length]);
int cnt=0;
do{
LBFGS lbfgs =new LBFGS();
statedataset=data.map(difffunction).withBroadcastSet(statedataset,"state")
.reduce(accumulate)
.map(new NormLossGradient(datasize))
.map(new SetLossGradient()).withBroadcastSet(statedataset,"state")
.map(lbfgs);
cnt++;
}while (cnt<niter && statedataset.collect().get(0).getIflag()[0] !=0);