Hi Till
This works only when there is only one variable to be broadcasted, doesn't
it? What about the case when we need to broadcast two? Is it advisable to
create a BroadcastDoubleElementMapper class or perhaps we could just send a
tuple of all the variables? Perhaps that is a better idea.

Regards
Sachin Goel

On Tue, Jun 2, 2015 at 8:15 PM, <trohrm...@apache.org> wrote:

> [ml] Replaces RichMapFunctions with mapWithBcVariable in FlinkML
>
>
> Project: http://git-wip-us.apache.org/repos/asf/flink/repo
> Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/950b79c5
> Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/950b79c5
> Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/950b79c5
>
> Branch: refs/heads/master
> Commit: 950b79c59327e96e3b1504616d26460cbff7fd4c
> Parents: 44dae0c
> Author: Till Rohrmann <trohrm...@apache.org>
> Authored: Tue Jun 2 14:45:12 2015 +0200
> Committer: Till Rohrmann <trohrm...@apache.org>
> Committed: Tue Jun 2 15:34:54 2015 +0200
>
> ----------------------------------------------------------------------
>  .../apache/flink/ml/classification/SVM.scala    | 73 ++++++--------------
>  .../flink/ml/preprocessing/StandardScaler.scala | 39 +++--------
>  2 files changed, 30 insertions(+), 82 deletions(-)
> ----------------------------------------------------------------------
>
>
>
> http://git-wip-us.apache.org/repos/asf/flink/blob/950b79c5/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/classification/SVM.scala
> ----------------------------------------------------------------------
> diff --git
> a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/classification/SVM.scala
> b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/classification/SVM.scala
> index e01735f..c69b56a 100644
> ---
> a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/classification/SVM.scala
> +++
> b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/classification/SVM.scala
> @@ -26,6 +26,7 @@ import scala.util.Random
>  import org.apache.flink.api.common.functions.RichMapFunction
>  import org.apache.flink.api.scala._
>  import org.apache.flink.configuration.Configuration
> +import org.apache.flink.ml._
>  import org.apache.flink.ml.common.FlinkMLTools.ModuloKeyPartitioner
>  import org.apache.flink.ml.common._
>  import org.apache.flink.ml.math.Vector
> @@ -190,6 +191,7 @@ class SVM extends Predictor[SVM] {
>    * of the algorithm.
>    */
>  object SVM{
> +
>    val WEIGHT_VECTOR ="weightVector"
>
>    // ========================================== Parameters
> =========================================
> @@ -242,7 +244,13 @@ object SVM{
>
>          instance.weightsOption match {
>            case Some(weights) => {
> -            input.map(new PredictionMapper[T]).withBroadcastSet(weights,
> WEIGHT_VECTOR)
> +            input.mapWithBcVariable(weights){
> +              (vector, weights) => {
> +                val dotProduct = weights dot vector.asBreeze
> +
> +                LabeledVector(dotProduct, vector)
> +              }
> +            }
>            }
>
>            case None => {
> @@ -254,28 +262,6 @@ object SVM{
>      }
>    }
>
> -  /** Mapper to calculate the value of the prediction function. This is a
> RichMapFunction, because
> -    * we broadcast the weight vector to all mappers.
> -    */
> -  class PredictionMapper[T <: Vector] extends RichMapFunction[T,
> LabeledVector] {
> -
> -    var weights: BreezeDenseVector[Double] = _
> -
> -    @throws(classOf[Exception])
> -    override def open(configuration: Configuration): Unit = {
> -      // get current weights
> -      weights = getRuntimeContext.
> -
> getBroadcastVariable[BreezeDenseVector[Double]](WEIGHT_VECTOR).get(0)
> -    }
> -
> -    override def map(vector: T): LabeledVector = {
> -      // calculate the prediction value (scaled distance from the
> separating hyperplane)
> -      val dotProduct = weights dot vector.asBreeze
> -
> -      LabeledVector(dotProduct, vector)
> -    }
> -  }
> -
>    /** [[org.apache.flink.ml.pipeline.PredictOperation]] for
> [[LabeledVector ]]types. The result type
>      * is a [[(Double, Double)]] tuple, corresponding to (truth,
> prediction)
>      *
> @@ -291,7 +277,14 @@ object SVM{
>
>          instance.weightsOption match {
>            case Some(weights) => {
> -            input.map(new
> LabeledPredictionMapper).withBroadcastSet(weights, WEIGHT_VECTOR)
> +            input.mapWithBcVariable(weights){
> +              (labeledVector, weights) => {
> +                val prediction = weights dot labeledVector.vector.asBreeze
> +                val truth = labeledVector.label
> +
> +                (truth, prediction)
> +              }
> +            }
>            }
>
>            case None => {
> @@ -303,30 +296,6 @@ object SVM{
>      }
>    }
>
> -  /** Mapper to calculate the value of the prediction function. This is a
> RichMapFunction, because
> -    * we broadcast the weight vector to all mappers.
> -    */
> -  class LabeledPredictionMapper extends RichMapFunction[LabeledVector,
> (Double, Double)] {
> -
> -    var weights: BreezeDenseVector[Double] = _
> -
> -    @throws(classOf[Exception])
> -    override def open(configuration: Configuration): Unit = {
> -      // get current weights
> -      weights = getRuntimeContext.
> -
> getBroadcastVariable[BreezeDenseVector[Double]](WEIGHT_VECTOR).get(0)
> -    }
> -
> -    override def map(labeledVector: LabeledVector): (Double, Double) = {
> -      // calculate the prediction value (scaled distance from the
> separating hyperplane)
> -      val prediction = weights dot labeledVector.vector.asBreeze
> -      val truth = labeledVector.label
> -
> -      (truth, prediction)
> -    }
> -  }
> -
> -
>    /** [[FitOperation]] which trains a SVM with soft-margin based on the
> given training data set.
>      *
>      */
> @@ -540,17 +509,17 @@ object SVM{
>
>      // compute projected gradient
>      var proj_grad = if(alpha  <= 0.0){
> -      math.min(grad, 0)
> +      scala.math.min(grad, 0)
>      } else if(alpha >= 1.0) {
> -      math.max(grad, 0)
> +      scala.math.max(grad, 0)
>      } else {
>        grad
>      }
>
> -    if(math.abs(grad) != 0.0){
> +    if(scala.math.abs(grad) != 0.0){
>        val qii = x dot x
>        val newAlpha = if(qii != 0.0){
> -        math.min(math.max((alpha - (grad / qii)), 0.0), 1.0)
> +        scala.math.min(scala.math.max((alpha - (grad / qii)), 0.0), 1.0)
>        } else {
>          1.0
>        }
>
>
> http://git-wip-us.apache.org/repos/asf/flink/blob/950b79c5/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/StandardScaler.scala
> ----------------------------------------------------------------------
> diff --git
> a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/StandardScaler.scala
> b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/StandardScaler.scala
> index 2e3ed95..7992b02 100644
> ---
> a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/StandardScaler.scala
> +++
> b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/StandardScaler.scala
> @@ -25,6 +25,7 @@ import org.apache.flink.api.common.functions._
>  import org.apache.flink.api.common.typeinfo.TypeInformation
>  import org.apache.flink.api.scala._
>  import org.apache.flink.configuration.Configuration
> +import org.apache.flink.ml._
>  import org.apache.flink.ml.common.{LabeledVector, Parameter, ParameterMap}
>  import org.apache.flink.ml.math.Breeze._
>  import org.apache.flink.ml.math.{BreezeVectorConverter, Vector}
> @@ -209,20 +210,9 @@ object StandardScaler {
>
>          instance.metricsOption match {
>            case Some(metrics) => {
> -            input.map(new RichMapFunction[T, T]() {
> -
> -              var broadcastMean: linalg.Vector[Double] = null
> -              var broadcastStd: linalg.Vector[Double] = null
> -
> -              override def open(parameters: Configuration): Unit = {
> -                val broadcastedMetrics =
> getRuntimeContext().getBroadcastVariable[
> -                    (linalg.Vector[Double], linalg.Vector[Double])
> -                  ]("broadcastedMetrics").get(0)
> -                broadcastMean = broadcastedMetrics._1
> -                broadcastStd = broadcastedMetrics._2
> -              }
> -
> -              override def map(vector: T): T = {
> +            input.mapWithBcVariable(metrics){
> +              (vector, metrics) => {
> +                val (broadcastMean, broadcastStd) = metrics
>                  var myVector = vector.asBreeze
>
>                  myVector -= broadcastMean
> @@ -230,7 +220,7 @@ object StandardScaler {
>                  myVector = (myVector :* std) + mean
>                  myVector.fromBreeze
>                }
> -            }).withBroadcastSet(metrics, "broadcastedMetrics")
> +            }
>            }
>
>            case None =>
> @@ -251,20 +241,9 @@ object StandardScaler {
>
>          instance.metricsOption match {
>            case Some(metrics) => {
> -            input.map(new RichMapFunction[LabeledVector, LabeledVector]()
> {
> -
> -              var broadcastMean: linalg.Vector[Double] = null
> -              var broadcastStd: linalg.Vector[Double] = null
> -
> -              override def open(parameters: Configuration): Unit = {
> -                val broadcastedMetrics =
> getRuntimeContext().getBroadcastVariable[
> -                  (linalg.Vector[Double], linalg.Vector[Double])
> -                  ]("broadcastedMetrics").get(0)
> -                broadcastMean = broadcastedMetrics._1
> -                broadcastStd = broadcastedMetrics._2
> -              }
> -
> -              override def map(labeledVector: LabeledVector):
> LabeledVector = {
> +            input.mapWithBcVariable(metrics){
> +              (labeledVector, metrics) => {
> +                val (broadcastMean, broadcastStd) = metrics
>                  val LabeledVector(label, vector) = labeledVector
>                  var breezeVector = vector.asBreeze
>
> @@ -273,7 +252,7 @@ object StandardScaler {
>                  breezeVector = (breezeVector :* std) + mean
>                  LabeledVector(label, breezeVector.fromBreeze[Vector])
>                }
> -            }).withBroadcastSet(metrics, "broadcastedMetrics")
> +            }
>            }
>
>            case None =>
>
>

Reply via email to