Dear Flink Community,
is there a compact and efficient way to get parameters that are know at run-time, but not compile-time inside an iteration? I tried the following: >define an object with the parameters: object iterationVariables{ var numDataPoints = 1 var lambda = 0.2 var stepSize = 0.01 } ?>update it in the driver before starting the iteration: iterationVariables.numDataPoints = numDP iterationVariables.lambda = l iterationVariable.stepSize = s >and then use it inside the iteration - accessing it accordingly?: val resultingWeights = weightVector.iterate(numIterations) { weights => { val computeGradient = new RichMapFunction[LabeledPoint, DenseVector[Double]] { var originalW: DenseVector[Double] = _ override def open(parameters: Configuration): Unit = { originalW = getRuntimeContext.getBroadcastVariable(WEIGHT_VECTOR).get(0) } override def map(dp: LabeledPoint): DenseVector[Double] = { val learning_rate: Double = iterationVariables.s / Math.sqrt(getIterationRuntimeContext().getSuperstepNumber.toDouble) val sumElement = (dp.features.toDenseVector * (dp.label - mlutils.logisticFunction(originalW, (dp.features))) - (iterationVariables.lambda / iterationVariables.numDataPoints.toDouble) * originalW ) * learning_rate sumElement } } val newWeights : DataSet[DenseVector[Double]] = weights.union(data.map(computeGradient).withBroadcastSet(weights, WEIGHT_VECTOR).reduce{_ + _}).reduce{_ + _} newWeights } This did work perfectly fine in local mode, however once deployed on an actual cluster, iterationVariables inside the iteration actually returns the values set in the original object (e.g. numDataPoints = 1) and not the updated value that was set later in the driver, ultimately leading to wrong results in the computation. So once again, is there a way to get parameters the will only be known at run-time inside an iteration? Best regards, Christoph Boden