If it helps you with your task, then you can add it. The best thing is probably to implement it similarly to the mapWithBcVariable.
Cheers, Till On Tue, Jun 2, 2015 at 7:25 PM, Sachin Goel <sachingoel0...@gmail.com> wrote: > Should I go ahead and add this method then? The mapWithBcSet I mean. > > Regards > Sachin Goel > > On Tue, Jun 2, 2015 at 10:43 PM, Till Rohrmann <till.rohrm...@gmail.com> > wrote: > > > Yes you’re right Sachin. The mapWithBcVariable is only syntactic sugar if > > you have a broadcast DataSet which contains only one element. If you have > > multiple elements in your DataSet then you can’t use this method. But we > > can define another method mapWithBcSet which takes a function f: > (element: > > T, broadcastValues: List[B]) => O, for example. > > > > If you have multiple DataSet which fulfil this condition, then you can > wrap > > them in a tuple as you’ve said. > > > > Cheers, > > Till > > > > > > On Tue, Jun 2, 2015 at 7:10 PM, Sachin Goel <sachingoel0...@gmail.com> > > wrote: > > > > > Further, I think we should return just > > > broadcastVariable = getRuntimeContext. > > > getBroadcastVariable[B]("broadcastVariable") > > > in BroadcastSingleElementMapper > > > User may wish to have a list broadcasted, and not just want to access > the > > > first element. For example, this would make sense in the kmeans > > algorithm. > > > > > > Regards > > > Sachin Goel > > > > > > On Tue, Jun 2, 2015 at 9:03 PM, Sachin Goel <sachingoel0...@gmail.com> > > > wrote: > > > > > > > Hi Till > > > > This works only when there is only one variable to be broadcasted, > > > doesn't > > > > it? What about the case when we need to broadcast two? Is it > advisable > > to > > > > create a BroadcastDoubleElementMapper class or perhaps we could just > > > send a > > > > tuple of all the variables? Perhaps that is a better idea. > > > > > > > > Regards > > > > Sachin Goel > > > > > > > > On Tue, Jun 2, 2015 at 8:15 PM, <trohrm...@apache.org> wrote: > > > > > > > >> [ml] Replaces RichMapFunctions with mapWithBcVariable in FlinkML > > > >> > > > >> > > > >> Project: http://git-wip-us.apache.org/repos/asf/flink/repo > > > >> Commit: > http://git-wip-us.apache.org/repos/asf/flink/commit/950b79c5 > > > >> Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/950b79c5 > > > >> Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/950b79c5 > > > >> > > > >> Branch: refs/heads/master > > > >> Commit: 950b79c59327e96e3b1504616d26460cbff7fd4c > > > >> Parents: 44dae0c > > > >> Author: Till Rohrmann <trohrm...@apache.org> > > > >> Authored: Tue Jun 2 14:45:12 2015 +0200 > > > >> Committer: Till Rohrmann <trohrm...@apache.org> > > > >> Committed: Tue Jun 2 15:34:54 2015 +0200 > > > >> > > > >> > ---------------------------------------------------------------------- > > > >> .../apache/flink/ml/classification/SVM.scala | 73 > > > ++++++-------------- > > > >> .../flink/ml/preprocessing/StandardScaler.scala | 39 +++-------- > > > >> 2 files changed, 30 insertions(+), 82 deletions(-) > > > >> > ---------------------------------------------------------------------- > > > >> > > > >> > > > >> > > > >> > > > > > > http://git-wip-us.apache.org/repos/asf/flink/blob/950b79c5/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/classification/SVM.scala > > > >> > ---------------------------------------------------------------------- > > > >> diff --git > > > >> > > > > > > a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/classification/SVM.scala > > > >> > > > > > > b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/classification/SVM.scala > > > >> index e01735f..c69b56a 100644 > > > >> --- > > > >> > > > > > > a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/classification/SVM.scala > > > >> +++ > > > >> > > > > > > b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/classification/SVM.scala > > > >> @@ -26,6 +26,7 @@ import scala.util.Random > > > >> import org.apache.flink.api.common.functions.RichMapFunction > > > >> import org.apache.flink.api.scala._ > > > >> import org.apache.flink.configuration.Configuration > > > >> +import org.apache.flink.ml._ > > > >> import org.apache.flink.ml.common.FlinkMLTools.ModuloKeyPartitioner > > > >> import org.apache.flink.ml.common._ > > > >> import org.apache.flink.ml.math.Vector > > > >> @@ -190,6 +191,7 @@ class SVM extends Predictor[SVM] { > > > >> * of the algorithm. > > > >> */ > > > >> object SVM{ > > > >> + > > > >> val WEIGHT_VECTOR ="weightVector" > > > >> > > > >> // ========================================== Parameters > > > >> ========================================= > > > >> @@ -242,7 +244,13 @@ object SVM{ > > > >> > > > >> instance.weightsOption match { > > > >> case Some(weights) => { > > > >> - input.map(new > > > PredictionMapper[T]).withBroadcastSet(weights, > > > >> WEIGHT_VECTOR) > > > >> + input.mapWithBcVariable(weights){ > > > >> + (vector, weights) => { > > > >> + val dotProduct = weights dot vector.asBreeze > > > >> + > > > >> + LabeledVector(dotProduct, vector) > > > >> + } > > > >> + } > > > >> } > > > >> > > > >> case None => { > > > >> @@ -254,28 +262,6 @@ object SVM{ > > > >> } > > > >> } > > > >> > > > >> - /** Mapper to calculate the value of the prediction function. > This > > is > > > >> a RichMapFunction, because > > > >> - * we broadcast the weight vector to all mappers. > > > >> - */ > > > >> - class PredictionMapper[T <: Vector] extends RichMapFunction[T, > > > >> LabeledVector] { > > > >> - > > > >> - var weights: BreezeDenseVector[Double] = _ > > > >> - > > > >> - @throws(classOf[Exception]) > > > >> - override def open(configuration: Configuration): Unit = { > > > >> - // get current weights > > > >> - weights = getRuntimeContext. > > > >> - > > > >> > getBroadcastVariable[BreezeDenseVector[Double]](WEIGHT_VECTOR).get(0) > > > >> - } > > > >> - > > > >> - override def map(vector: T): LabeledVector = { > > > >> - // calculate the prediction value (scaled distance from the > > > >> separating hyperplane) > > > >> - val dotProduct = weights dot vector.asBreeze > > > >> - > > > >> - LabeledVector(dotProduct, vector) > > > >> - } > > > >> - } > > > >> - > > > >> /** [[org.apache.flink.ml.pipeline.PredictOperation]] for > > > >> [[LabeledVector ]]types. The result type > > > >> * is a [[(Double, Double)]] tuple, corresponding to (truth, > > > >> prediction) > > > >> * > > > >> @@ -291,7 +277,14 @@ object SVM{ > > > >> > > > >> instance.weightsOption match { > > > >> case Some(weights) => { > > > >> - input.map(new > > > >> LabeledPredictionMapper).withBroadcastSet(weights, WEIGHT_VECTOR) > > > >> + input.mapWithBcVariable(weights){ > > > >> + (labeledVector, weights) => { > > > >> + val prediction = weights dot > > > >> labeledVector.vector.asBreeze > > > >> + val truth = labeledVector.label > > > >> + > > > >> + (truth, prediction) > > > >> + } > > > >> + } > > > >> } > > > >> > > > >> case None => { > > > >> @@ -303,30 +296,6 @@ object SVM{ > > > >> } > > > >> } > > > >> > > > >> - /** Mapper to calculate the value of the prediction function. > This > > is > > > >> a RichMapFunction, because > > > >> - * we broadcast the weight vector to all mappers. > > > >> - */ > > > >> - class LabeledPredictionMapper extends > > RichMapFunction[LabeledVector, > > > >> (Double, Double)] { > > > >> - > > > >> - var weights: BreezeDenseVector[Double] = _ > > > >> - > > > >> - @throws(classOf[Exception]) > > > >> - override def open(configuration: Configuration): Unit = { > > > >> - // get current weights > > > >> - weights = getRuntimeContext. > > > >> - > > > >> > getBroadcastVariable[BreezeDenseVector[Double]](WEIGHT_VECTOR).get(0) > > > >> - } > > > >> - > > > >> - override def map(labeledVector: LabeledVector): (Double, > Double) > > = > > > { > > > >> - // calculate the prediction value (scaled distance from the > > > >> separating hyperplane) > > > >> - val prediction = weights dot labeledVector.vector.asBreeze > > > >> - val truth = labeledVector.label > > > >> - > > > >> - (truth, prediction) > > > >> - } > > > >> - } > > > >> - > > > >> - > > > >> /** [[FitOperation]] which trains a SVM with soft-margin based on > > the > > > >> given training data set. > > > >> * > > > >> */ > > > >> @@ -540,17 +509,17 @@ object SVM{ > > > >> > > > >> // compute projected gradient > > > >> var proj_grad = if(alpha <= 0.0){ > > > >> - math.min(grad, 0) > > > >> + scala.math.min(grad, 0) > > > >> } else if(alpha >= 1.0) { > > > >> - math.max(grad, 0) > > > >> + scala.math.max(grad, 0) > > > >> } else { > > > >> grad > > > >> } > > > >> > > > >> - if(math.abs(grad) != 0.0){ > > > >> + if(scala.math.abs(grad) != 0.0){ > > > >> val qii = x dot x > > > >> val newAlpha = if(qii != 0.0){ > > > >> - math.min(math.max((alpha - (grad / qii)), 0.0), 1.0) > > > >> + scala.math.min(scala.math.max((alpha - (grad / qii)), 0.0), > > > 1.0) > > > >> } else { > > > >> 1.0 > > > >> } > > > >> > > > >> > > > >> > > > > > > http://git-wip-us.apache.org/repos/asf/flink/blob/950b79c5/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/StandardScaler.scala > > > >> > ---------------------------------------------------------------------- > > > >> diff --git > > > >> > > > > > > a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/StandardScaler.scala > > > >> > > > > > > b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/StandardScaler.scala > > > >> index 2e3ed95..7992b02 100644 > > > >> --- > > > >> > > > > > > a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/StandardScaler.scala > > > >> +++ > > > >> > > > > > > b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/StandardScaler.scala > > > >> @@ -25,6 +25,7 @@ import org.apache.flink.api.common.functions._ > > > >> import org.apache.flink.api.common.typeinfo.TypeInformation > > > >> import org.apache.flink.api.scala._ > > > >> import org.apache.flink.configuration.Configuration > > > >> +import org.apache.flink.ml._ > > > >> import org.apache.flink.ml.common.{LabeledVector, Parameter, > > > >> ParameterMap} > > > >> import org.apache.flink.ml.math.Breeze._ > > > >> import org.apache.flink.ml.math.{BreezeVectorConverter, Vector} > > > >> @@ -209,20 +210,9 @@ object StandardScaler { > > > >> > > > >> instance.metricsOption match { > > > >> case Some(metrics) => { > > > >> - input.map(new RichMapFunction[T, T]() { > > > >> - > > > >> - var broadcastMean: linalg.Vector[Double] = null > > > >> - var broadcastStd: linalg.Vector[Double] = null > > > >> - > > > >> - override def open(parameters: Configuration): Unit = > { > > > >> - val broadcastedMetrics = > > > >> getRuntimeContext().getBroadcastVariable[ > > > >> - (linalg.Vector[Double], linalg.Vector[Double]) > > > >> - ]("broadcastedMetrics").get(0) > > > >> - broadcastMean = broadcastedMetrics._1 > > > >> - broadcastStd = broadcastedMetrics._2 > > > >> - } > > > >> - > > > >> - override def map(vector: T): T = { > > > >> + input.mapWithBcVariable(metrics){ > > > >> + (vector, metrics) => { > > > >> + val (broadcastMean, broadcastStd) = metrics > > > >> var myVector = vector.asBreeze > > > >> > > > >> myVector -= broadcastMean > > > >> @@ -230,7 +220,7 @@ object StandardScaler { > > > >> myVector = (myVector :* std) + mean > > > >> myVector.fromBreeze > > > >> } > > > >> - }).withBroadcastSet(metrics, "broadcastedMetrics") > > > >> + } > > > >> } > > > >> > > > >> case None => > > > >> @@ -251,20 +241,9 @@ object StandardScaler { > > > >> > > > >> instance.metricsOption match { > > > >> case Some(metrics) => { > > > >> - input.map(new RichMapFunction[LabeledVector, > > > >> LabeledVector]() { > > > >> - > > > >> - var broadcastMean: linalg.Vector[Double] = null > > > >> - var broadcastStd: linalg.Vector[Double] = null > > > >> - > > > >> - override def open(parameters: Configuration): Unit = > { > > > >> - val broadcastedMetrics = > > > >> getRuntimeContext().getBroadcastVariable[ > > > >> - (linalg.Vector[Double], linalg.Vector[Double]) > > > >> - ]("broadcastedMetrics").get(0) > > > >> - broadcastMean = broadcastedMetrics._1 > > > >> - broadcastStd = broadcastedMetrics._2 > > > >> - } > > > >> - > > > >> - override def map(labeledVector: LabeledVector): > > > >> LabeledVector = { > > > >> + input.mapWithBcVariable(metrics){ > > > >> + (labeledVector, metrics) => { > > > >> + val (broadcastMean, broadcastStd) = metrics > > > >> val LabeledVector(label, vector) = labeledVector > > > >> var breezeVector = vector.asBreeze > > > >> > > > >> @@ -273,7 +252,7 @@ object StandardScaler { > > > >> breezeVector = (breezeVector :* std) + mean > > > >> LabeledVector(label, > breezeVector.fromBreeze[Vector]) > > > >> } > > > >> - }).withBroadcastSet(metrics, "broadcastedMetrics") > > > >> + } > > > >> } > > > >> > > > >> case None => > > > >> > > > >> > > > > > > > > > >