Dear Flink Community,

is there a compact and efficient way to get parameters that are know at 
run-time, but not compile-time inside an iteration? I tried the following:


>define an object with the parameters:


object  iterationVariables{
var numDataPoints = 1
var lambda = 0.2
var stepSize = 0.01
}

?>update it in the driver before starting the iteration:

iterationVariables.numDataPoints = numDP
iterationVariables.lambda = l
iterationVariable.stepSize = s


>and then use it inside the iteration - accessing it accordingly?:


val resultingWeights = weightVector.iterate(numIterations) {

weights => {

val computeGradient = new RichMapFunction[LabeledPoint, DenseVector[Double]] {
var originalW: DenseVector[Double] = _

override def open(parameters: Configuration): Unit = {
originalW = getRuntimeContext.getBroadcastVariable(WEIGHT_VECTOR).get(0)
}

override def map(dp: LabeledPoint): DenseVector[Double] = {
val learning_rate: Double = iterationVariables.s / 
Math.sqrt(getIterationRuntimeContext().getSuperstepNumber.toDouble)

val sumElement = (dp.features.toDenseVector * (dp.label - 
mlutils.logisticFunction(originalW, (dp.features)))
- (iterationVariables.lambda / iterationVariables.numDataPoints.toDouble) * 
originalW
) * learning_rate

sumElement
}
}
val newWeights : DataSet[DenseVector[Double]] = 
weights.union(data.map(computeGradient).withBroadcastSet(weights, 
WEIGHT_VECTOR).reduce{_ + _}).reduce{_ + _}
newWeights
}


This did work perfectly fine in local mode, however once deployed on an actual 
cluster, iterationVariables inside the iteration actually returns the values 
set in the original object (e.g. numDataPoints = 1) and not the updated value 
that was set later in the driver, ultimately leading to wrong results in the 
computation.

So once again, is there a way to get parameters the will only be known at 
run-time inside an iteration?

Best regards,
Christoph Boden


Reply via email to