Hi Till This works only when there is only one variable to be broadcasted, doesn't it? What about the case when we need to broadcast two? Is it advisable to create a BroadcastDoubleElementMapper class or perhaps we could just send a tuple of all the variables? Perhaps that is a better idea.
Regards Sachin Goel On Tue, Jun 2, 2015 at 8:15 PM, <trohrm...@apache.org> wrote: > [ml] Replaces RichMapFunctions with mapWithBcVariable in FlinkML > > > Project: http://git-wip-us.apache.org/repos/asf/flink/repo > Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/950b79c5 > Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/950b79c5 > Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/950b79c5 > > Branch: refs/heads/master > Commit: 950b79c59327e96e3b1504616d26460cbff7fd4c > Parents: 44dae0c > Author: Till Rohrmann <trohrm...@apache.org> > Authored: Tue Jun 2 14:45:12 2015 +0200 > Committer: Till Rohrmann <trohrm...@apache.org> > Committed: Tue Jun 2 15:34:54 2015 +0200 > > ---------------------------------------------------------------------- > .../apache/flink/ml/classification/SVM.scala | 73 ++++++-------------- > .../flink/ml/preprocessing/StandardScaler.scala | 39 +++-------- > 2 files changed, 30 insertions(+), 82 deletions(-) > ---------------------------------------------------------------------- > > > > http://git-wip-us.apache.org/repos/asf/flink/blob/950b79c5/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/classification/SVM.scala > ---------------------------------------------------------------------- > diff --git > a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/classification/SVM.scala > b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/classification/SVM.scala > index e01735f..c69b56a 100644 > --- > a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/classification/SVM.scala > +++ > b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/classification/SVM.scala > @@ -26,6 +26,7 @@ import scala.util.Random > import org.apache.flink.api.common.functions.RichMapFunction > import org.apache.flink.api.scala._ > import org.apache.flink.configuration.Configuration > +import org.apache.flink.ml._ > import org.apache.flink.ml.common.FlinkMLTools.ModuloKeyPartitioner > import org.apache.flink.ml.common._ > import org.apache.flink.ml.math.Vector > @@ -190,6 +191,7 @@ class SVM extends Predictor[SVM] { > * of the algorithm. > */ > object SVM{ > + > val WEIGHT_VECTOR ="weightVector" > > // ========================================== Parameters > ========================================= > @@ -242,7 +244,13 @@ object SVM{ > > instance.weightsOption match { > case Some(weights) => { > - input.map(new PredictionMapper[T]).withBroadcastSet(weights, > WEIGHT_VECTOR) > + input.mapWithBcVariable(weights){ > + (vector, weights) => { > + val dotProduct = weights dot vector.asBreeze > + > + LabeledVector(dotProduct, vector) > + } > + } > } > > case None => { > @@ -254,28 +262,6 @@ object SVM{ > } > } > > - /** Mapper to calculate the value of the prediction function. This is a > RichMapFunction, because > - * we broadcast the weight vector to all mappers. > - */ > - class PredictionMapper[T <: Vector] extends RichMapFunction[T, > LabeledVector] { > - > - var weights: BreezeDenseVector[Double] = _ > - > - @throws(classOf[Exception]) > - override def open(configuration: Configuration): Unit = { > - // get current weights > - weights = getRuntimeContext. > - > getBroadcastVariable[BreezeDenseVector[Double]](WEIGHT_VECTOR).get(0) > - } > - > - override def map(vector: T): LabeledVector = { > - // calculate the prediction value (scaled distance from the > separating hyperplane) > - val dotProduct = weights dot vector.asBreeze > - > - LabeledVector(dotProduct, vector) > - } > - } > - > /** [[org.apache.flink.ml.pipeline.PredictOperation]] for > [[LabeledVector ]]types. The result type > * is a [[(Double, Double)]] tuple, corresponding to (truth, > prediction) > * > @@ -291,7 +277,14 @@ object SVM{ > > instance.weightsOption match { > case Some(weights) => { > - input.map(new > LabeledPredictionMapper).withBroadcastSet(weights, WEIGHT_VECTOR) > + input.mapWithBcVariable(weights){ > + (labeledVector, weights) => { > + val prediction = weights dot labeledVector.vector.asBreeze > + val truth = labeledVector.label > + > + (truth, prediction) > + } > + } > } > > case None => { > @@ -303,30 +296,6 @@ object SVM{ > } > } > > - /** Mapper to calculate the value of the prediction function. This is a > RichMapFunction, because > - * we broadcast the weight vector to all mappers. > - */ > - class LabeledPredictionMapper extends RichMapFunction[LabeledVector, > (Double, Double)] { > - > - var weights: BreezeDenseVector[Double] = _ > - > - @throws(classOf[Exception]) > - override def open(configuration: Configuration): Unit = { > - // get current weights > - weights = getRuntimeContext. > - > getBroadcastVariable[BreezeDenseVector[Double]](WEIGHT_VECTOR).get(0) > - } > - > - override def map(labeledVector: LabeledVector): (Double, Double) = { > - // calculate the prediction value (scaled distance from the > separating hyperplane) > - val prediction = weights dot labeledVector.vector.asBreeze > - val truth = labeledVector.label > - > - (truth, prediction) > - } > - } > - > - > /** [[FitOperation]] which trains a SVM with soft-margin based on the > given training data set. > * > */ > @@ -540,17 +509,17 @@ object SVM{ > > // compute projected gradient > var proj_grad = if(alpha <= 0.0){ > - math.min(grad, 0) > + scala.math.min(grad, 0) > } else if(alpha >= 1.0) { > - math.max(grad, 0) > + scala.math.max(grad, 0) > } else { > grad > } > > - if(math.abs(grad) != 0.0){ > + if(scala.math.abs(grad) != 0.0){ > val qii = x dot x > val newAlpha = if(qii != 0.0){ > - math.min(math.max((alpha - (grad / qii)), 0.0), 1.0) > + scala.math.min(scala.math.max((alpha - (grad / qii)), 0.0), 1.0) > } else { > 1.0 > } > > > http://git-wip-us.apache.org/repos/asf/flink/blob/950b79c5/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/StandardScaler.scala > ---------------------------------------------------------------------- > diff --git > a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/StandardScaler.scala > b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/StandardScaler.scala > index 2e3ed95..7992b02 100644 > --- > a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/StandardScaler.scala > +++ > b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/StandardScaler.scala > @@ -25,6 +25,7 @@ import org.apache.flink.api.common.functions._ > import org.apache.flink.api.common.typeinfo.TypeInformation > import org.apache.flink.api.scala._ > import org.apache.flink.configuration.Configuration > +import org.apache.flink.ml._ > import org.apache.flink.ml.common.{LabeledVector, Parameter, ParameterMap} > import org.apache.flink.ml.math.Breeze._ > import org.apache.flink.ml.math.{BreezeVectorConverter, Vector} > @@ -209,20 +210,9 @@ object StandardScaler { > > instance.metricsOption match { > case Some(metrics) => { > - input.map(new RichMapFunction[T, T]() { > - > - var broadcastMean: linalg.Vector[Double] = null > - var broadcastStd: linalg.Vector[Double] = null > - > - override def open(parameters: Configuration): Unit = { > - val broadcastedMetrics = > getRuntimeContext().getBroadcastVariable[ > - (linalg.Vector[Double], linalg.Vector[Double]) > - ]("broadcastedMetrics").get(0) > - broadcastMean = broadcastedMetrics._1 > - broadcastStd = broadcastedMetrics._2 > - } > - > - override def map(vector: T): T = { > + input.mapWithBcVariable(metrics){ > + (vector, metrics) => { > + val (broadcastMean, broadcastStd) = metrics > var myVector = vector.asBreeze > > myVector -= broadcastMean > @@ -230,7 +220,7 @@ object StandardScaler { > myVector = (myVector :* std) + mean > myVector.fromBreeze > } > - }).withBroadcastSet(metrics, "broadcastedMetrics") > + } > } > > case None => > @@ -251,20 +241,9 @@ object StandardScaler { > > instance.metricsOption match { > case Some(metrics) => { > - input.map(new RichMapFunction[LabeledVector, LabeledVector]() > { > - > - var broadcastMean: linalg.Vector[Double] = null > - var broadcastStd: linalg.Vector[Double] = null > - > - override def open(parameters: Configuration): Unit = { > - val broadcastedMetrics = > getRuntimeContext().getBroadcastVariable[ > - (linalg.Vector[Double], linalg.Vector[Double]) > - ]("broadcastedMetrics").get(0) > - broadcastMean = broadcastedMetrics._1 > - broadcastStd = broadcastedMetrics._2 > - } > - > - override def map(labeledVector: LabeledVector): > LabeledVector = { > + input.mapWithBcVariable(metrics){ > + (labeledVector, metrics) => { > + val (broadcastMean, broadcastStd) = metrics > val LabeledVector(label, vector) = labeledVector > var breezeVector = vector.asBreeze > > @@ -273,7 +252,7 @@ object StandardScaler { > breezeVector = (breezeVector :* std) + mean > LabeledVector(label, breezeVector.fromBreeze[Vector]) > } > - }).withBroadcastSet(metrics, "broadcastedMetrics") > + } > } > > case None => > >