If it helps you with your task, then you can add it. The best thing is
probably to implement it similarly to the mapWithBcVariable.

Cheers,
Till
​

On Tue, Jun 2, 2015 at 7:25 PM, Sachin Goel <sachingoel0...@gmail.com>
wrote:

> Should I go ahead and add this method then? The mapWithBcSet I mean.
>
> Regards
> Sachin Goel
>
> On Tue, Jun 2, 2015 at 10:43 PM, Till Rohrmann <till.rohrm...@gmail.com>
> wrote:
>
> > Yes you’re right Sachin. The mapWithBcVariable is only syntactic sugar if
> > you have a broadcast DataSet which contains only one element. If you have
> > multiple elements in your DataSet then you can’t use this method. But we
> > can define another method mapWithBcSet which takes a function f:
> (element:
> > T, broadcastValues: List[B]) => O, for example.
> >
> > If you have multiple DataSet which fulfil this condition, then you can
> wrap
> > them in a tuple as you’ve said.
> >
> > Cheers,
> > Till
> > ​
> >
> > On Tue, Jun 2, 2015 at 7:10 PM, Sachin Goel <sachingoel0...@gmail.com>
> > wrote:
> >
> > > Further, I think we should return just
> > > broadcastVariable = getRuntimeContext.
> > > getBroadcastVariable[B]("broadcastVariable")
> > > in BroadcastSingleElementMapper
> > > User may wish to have a list broadcasted, and not just want to access
> the
> > > first element. For example, this would make sense in the kmeans
> > algorithm.
> > >
> > > Regards
> > > Sachin Goel
> > >
> > > On Tue, Jun 2, 2015 at 9:03 PM, Sachin Goel <sachingoel0...@gmail.com>
> > > wrote:
> > >
> > > > Hi Till
> > > > This works only when there is only one variable to be broadcasted,
> > > doesn't
> > > > it? What about the case when we need to broadcast two? Is it
> advisable
> > to
> > > > create a BroadcastDoubleElementMapper class or perhaps we could just
> > > send a
> > > > tuple of all the variables? Perhaps that is a better idea.
> > > >
> > > > Regards
> > > > Sachin Goel
> > > >
> > > > On Tue, Jun 2, 2015 at 8:15 PM, <trohrm...@apache.org> wrote:
> > > >
> > > >> [ml] Replaces RichMapFunctions with mapWithBcVariable in FlinkML
> > > >>
> > > >>
> > > >> Project: http://git-wip-us.apache.org/repos/asf/flink/repo
> > > >> Commit:
> http://git-wip-us.apache.org/repos/asf/flink/commit/950b79c5
> > > >> Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/950b79c5
> > > >> Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/950b79c5
> > > >>
> > > >> Branch: refs/heads/master
> > > >> Commit: 950b79c59327e96e3b1504616d26460cbff7fd4c
> > > >> Parents: 44dae0c
> > > >> Author: Till Rohrmann <trohrm...@apache.org>
> > > >> Authored: Tue Jun 2 14:45:12 2015 +0200
> > > >> Committer: Till Rohrmann <trohrm...@apache.org>
> > > >> Committed: Tue Jun 2 15:34:54 2015 +0200
> > > >>
> > > >>
> ----------------------------------------------------------------------
> > > >>  .../apache/flink/ml/classification/SVM.scala    | 73
> > > ++++++--------------
> > > >>  .../flink/ml/preprocessing/StandardScaler.scala | 39 +++--------
> > > >>  2 files changed, 30 insertions(+), 82 deletions(-)
> > > >>
> ----------------------------------------------------------------------
> > > >>
> > > >>
> > > >>
> > > >>
> > >
> >
> http://git-wip-us.apache.org/repos/asf/flink/blob/950b79c5/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/classification/SVM.scala
> > > >>
> ----------------------------------------------------------------------
> > > >> diff --git
> > > >>
> > >
> >
> a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/classification/SVM.scala
> > > >>
> > >
> >
> b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/classification/SVM.scala
> > > >> index e01735f..c69b56a 100644
> > > >> ---
> > > >>
> > >
> >
> a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/classification/SVM.scala
> > > >> +++
> > > >>
> > >
> >
> b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/classification/SVM.scala
> > > >> @@ -26,6 +26,7 @@ import scala.util.Random
> > > >>  import org.apache.flink.api.common.functions.RichMapFunction
> > > >>  import org.apache.flink.api.scala._
> > > >>  import org.apache.flink.configuration.Configuration
> > > >> +import org.apache.flink.ml._
> > > >>  import org.apache.flink.ml.common.FlinkMLTools.ModuloKeyPartitioner
> > > >>  import org.apache.flink.ml.common._
> > > >>  import org.apache.flink.ml.math.Vector
> > > >> @@ -190,6 +191,7 @@ class SVM extends Predictor[SVM] {
> > > >>    * of the algorithm.
> > > >>    */
> > > >>  object SVM{
> > > >> +
> > > >>    val WEIGHT_VECTOR ="weightVector"
> > > >>
> > > >>    // ========================================== Parameters
> > > >> =========================================
> > > >> @@ -242,7 +244,13 @@ object SVM{
> > > >>
> > > >>          instance.weightsOption match {
> > > >>            case Some(weights) => {
> > > >> -            input.map(new
> > > PredictionMapper[T]).withBroadcastSet(weights,
> > > >> WEIGHT_VECTOR)
> > > >> +            input.mapWithBcVariable(weights){
> > > >> +              (vector, weights) => {
> > > >> +                val dotProduct = weights dot vector.asBreeze
> > > >> +
> > > >> +                LabeledVector(dotProduct, vector)
> > > >> +              }
> > > >> +            }
> > > >>            }
> > > >>
> > > >>            case None => {
> > > >> @@ -254,28 +262,6 @@ object SVM{
> > > >>      }
> > > >>    }
> > > >>
> > > >> -  /** Mapper to calculate the value of the prediction function.
> This
> > is
> > > >> a RichMapFunction, because
> > > >> -    * we broadcast the weight vector to all mappers.
> > > >> -    */
> > > >> -  class PredictionMapper[T <: Vector] extends RichMapFunction[T,
> > > >> LabeledVector] {
> > > >> -
> > > >> -    var weights: BreezeDenseVector[Double] = _
> > > >> -
> > > >> -    @throws(classOf[Exception])
> > > >> -    override def open(configuration: Configuration): Unit = {
> > > >> -      // get current weights
> > > >> -      weights = getRuntimeContext.
> > > >> -
> > > >>
> getBroadcastVariable[BreezeDenseVector[Double]](WEIGHT_VECTOR).get(0)
> > > >> -    }
> > > >> -
> > > >> -    override def map(vector: T): LabeledVector = {
> > > >> -      // calculate the prediction value (scaled distance from the
> > > >> separating hyperplane)
> > > >> -      val dotProduct = weights dot vector.asBreeze
> > > >> -
> > > >> -      LabeledVector(dotProduct, vector)
> > > >> -    }
> > > >> -  }
> > > >> -
> > > >>    /** [[org.apache.flink.ml.pipeline.PredictOperation]] for
> > > >> [[LabeledVector ]]types. The result type
> > > >>      * is a [[(Double, Double)]] tuple, corresponding to (truth,
> > > >> prediction)
> > > >>      *
> > > >> @@ -291,7 +277,14 @@ object SVM{
> > > >>
> > > >>          instance.weightsOption match {
> > > >>            case Some(weights) => {
> > > >> -            input.map(new
> > > >> LabeledPredictionMapper).withBroadcastSet(weights, WEIGHT_VECTOR)
> > > >> +            input.mapWithBcVariable(weights){
> > > >> +              (labeledVector, weights) => {
> > > >> +                val prediction = weights dot
> > > >> labeledVector.vector.asBreeze
> > > >> +                val truth = labeledVector.label
> > > >> +
> > > >> +                (truth, prediction)
> > > >> +              }
> > > >> +            }
> > > >>            }
> > > >>
> > > >>            case None => {
> > > >> @@ -303,30 +296,6 @@ object SVM{
> > > >>      }
> > > >>    }
> > > >>
> > > >> -  /** Mapper to calculate the value of the prediction function.
> This
> > is
> > > >> a RichMapFunction, because
> > > >> -    * we broadcast the weight vector to all mappers.
> > > >> -    */
> > > >> -  class LabeledPredictionMapper extends
> > RichMapFunction[LabeledVector,
> > > >> (Double, Double)] {
> > > >> -
> > > >> -    var weights: BreezeDenseVector[Double] = _
> > > >> -
> > > >> -    @throws(classOf[Exception])
> > > >> -    override def open(configuration: Configuration): Unit = {
> > > >> -      // get current weights
> > > >> -      weights = getRuntimeContext.
> > > >> -
> > > >>
> getBroadcastVariable[BreezeDenseVector[Double]](WEIGHT_VECTOR).get(0)
> > > >> -    }
> > > >> -
> > > >> -    override def map(labeledVector: LabeledVector): (Double,
> Double)
> > =
> > > {
> > > >> -      // calculate the prediction value (scaled distance from the
> > > >> separating hyperplane)
> > > >> -      val prediction = weights dot labeledVector.vector.asBreeze
> > > >> -      val truth = labeledVector.label
> > > >> -
> > > >> -      (truth, prediction)
> > > >> -    }
> > > >> -  }
> > > >> -
> > > >> -
> > > >>    /** [[FitOperation]] which trains a SVM with soft-margin based on
> > the
> > > >> given training data set.
> > > >>      *
> > > >>      */
> > > >> @@ -540,17 +509,17 @@ object SVM{
> > > >>
> > > >>      // compute projected gradient
> > > >>      var proj_grad = if(alpha  <= 0.0){
> > > >> -      math.min(grad, 0)
> > > >> +      scala.math.min(grad, 0)
> > > >>      } else if(alpha >= 1.0) {
> > > >> -      math.max(grad, 0)
> > > >> +      scala.math.max(grad, 0)
> > > >>      } else {
> > > >>        grad
> > > >>      }
> > > >>
> > > >> -    if(math.abs(grad) != 0.0){
> > > >> +    if(scala.math.abs(grad) != 0.0){
> > > >>        val qii = x dot x
> > > >>        val newAlpha = if(qii != 0.0){
> > > >> -        math.min(math.max((alpha - (grad / qii)), 0.0), 1.0)
> > > >> +        scala.math.min(scala.math.max((alpha - (grad / qii)), 0.0),
> > > 1.0)
> > > >>        } else {
> > > >>          1.0
> > > >>        }
> > > >>
> > > >>
> > > >>
> > >
> >
> http://git-wip-us.apache.org/repos/asf/flink/blob/950b79c5/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/StandardScaler.scala
> > > >>
> ----------------------------------------------------------------------
> > > >> diff --git
> > > >>
> > >
> >
> a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/StandardScaler.scala
> > > >>
> > >
> >
> b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/StandardScaler.scala
> > > >> index 2e3ed95..7992b02 100644
> > > >> ---
> > > >>
> > >
> >
> a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/StandardScaler.scala
> > > >> +++
> > > >>
> > >
> >
> b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/StandardScaler.scala
> > > >> @@ -25,6 +25,7 @@ import org.apache.flink.api.common.functions._
> > > >>  import org.apache.flink.api.common.typeinfo.TypeInformation
> > > >>  import org.apache.flink.api.scala._
> > > >>  import org.apache.flink.configuration.Configuration
> > > >> +import org.apache.flink.ml._
> > > >>  import org.apache.flink.ml.common.{LabeledVector, Parameter,
> > > >> ParameterMap}
> > > >>  import org.apache.flink.ml.math.Breeze._
> > > >>  import org.apache.flink.ml.math.{BreezeVectorConverter, Vector}
> > > >> @@ -209,20 +210,9 @@ object StandardScaler {
> > > >>
> > > >>          instance.metricsOption match {
> > > >>            case Some(metrics) => {
> > > >> -            input.map(new RichMapFunction[T, T]() {
> > > >> -
> > > >> -              var broadcastMean: linalg.Vector[Double] = null
> > > >> -              var broadcastStd: linalg.Vector[Double] = null
> > > >> -
> > > >> -              override def open(parameters: Configuration): Unit =
> {
> > > >> -                val broadcastedMetrics =
> > > >> getRuntimeContext().getBroadcastVariable[
> > > >> -                    (linalg.Vector[Double], linalg.Vector[Double])
> > > >> -                  ]("broadcastedMetrics").get(0)
> > > >> -                broadcastMean = broadcastedMetrics._1
> > > >> -                broadcastStd = broadcastedMetrics._2
> > > >> -              }
> > > >> -
> > > >> -              override def map(vector: T): T = {
> > > >> +            input.mapWithBcVariable(metrics){
> > > >> +              (vector, metrics) => {
> > > >> +                val (broadcastMean, broadcastStd) = metrics
> > > >>                  var myVector = vector.asBreeze
> > > >>
> > > >>                  myVector -= broadcastMean
> > > >> @@ -230,7 +220,7 @@ object StandardScaler {
> > > >>                  myVector = (myVector :* std) + mean
> > > >>                  myVector.fromBreeze
> > > >>                }
> > > >> -            }).withBroadcastSet(metrics, "broadcastedMetrics")
> > > >> +            }
> > > >>            }
> > > >>
> > > >>            case None =>
> > > >> @@ -251,20 +241,9 @@ object StandardScaler {
> > > >>
> > > >>          instance.metricsOption match {
> > > >>            case Some(metrics) => {
> > > >> -            input.map(new RichMapFunction[LabeledVector,
> > > >> LabeledVector]() {
> > > >> -
> > > >> -              var broadcastMean: linalg.Vector[Double] = null
> > > >> -              var broadcastStd: linalg.Vector[Double] = null
> > > >> -
> > > >> -              override def open(parameters: Configuration): Unit =
> {
> > > >> -                val broadcastedMetrics =
> > > >> getRuntimeContext().getBroadcastVariable[
> > > >> -                  (linalg.Vector[Double], linalg.Vector[Double])
> > > >> -                  ]("broadcastedMetrics").get(0)
> > > >> -                broadcastMean = broadcastedMetrics._1
> > > >> -                broadcastStd = broadcastedMetrics._2
> > > >> -              }
> > > >> -
> > > >> -              override def map(labeledVector: LabeledVector):
> > > >> LabeledVector = {
> > > >> +            input.mapWithBcVariable(metrics){
> > > >> +              (labeledVector, metrics) => {
> > > >> +                val (broadcastMean, broadcastStd) = metrics
> > > >>                  val LabeledVector(label, vector) = labeledVector
> > > >>                  var breezeVector = vector.asBreeze
> > > >>
> > > >> @@ -273,7 +252,7 @@ object StandardScaler {
> > > >>                  breezeVector = (breezeVector :* std) + mean
> > > >>                  LabeledVector(label,
> breezeVector.fromBreeze[Vector])
> > > >>                }
> > > >> -            }).withBroadcastSet(metrics, "broadcastedMetrics")
> > > >> +            }
> > > >>            }
> > > >>
> > > >>            case None =>
> > > >>
> > > >>
> > > >
> > >
> >
>

Reply via email to