[ 
https://issues.apache.org/jira/browse/FLINK-1992?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14550311#comment-14550311
 ] 

ASF GitHub Bot commented on FLINK-1992:
---------------------------------------

Github user thvasilo commented on a diff in the pull request:

    https://github.com/apache/flink/pull/692#discussion_r30591734
  
    --- Diff: 
flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/GradientDescent.scala
 ---
    @@ -76,86 +77,163 @@ class GradientDescent(runParameters: ParameterMap) 
extends IterativeSolver {
         }.withBroadcastSet(currentWeights, WEIGHTVECTOR_BROADCAST)
       }
     
    +
    +
       /** Provides a solution for the given optimization problem
         *
         * @param data A Dataset of LabeledVector (label, features) pairs
    -    * @param initWeights The initial weights that will be optimized
    +    * @param initialWeights The initial weights that will be optimized
         * @return The weights, optimized for the provided data.
         */
       override def optimize(
         data: DataSet[LabeledVector],
    -    initWeights: Option[DataSet[WeightVector]]): DataSet[WeightVector] = {
    -    // TODO: Faster way to do this?
    -    val dimensionsDS = data.map(_.vector.size).reduce((a, b) => b)
    -
    -    val numberOfIterations: Int = parameterMap(Iterations)
    +    initialWeights: Option[DataSet[WeightVector]]): DataSet[WeightVector] 
= {
    +    val numberOfIterations: Int = parameters(Iterations)
    +    // TODO(tvas): This looks out of place, why don't we get back an 
Option from
    +    // parameters(ConvergenceThreshold)?
    +    val convergenceThresholdOption = parameters.get(ConvergenceThreshold)
     
         // Initialize weights
    -    val initialWeightsDS: DataSet[WeightVector] = initWeights match {
    -      // Ensure provided weight vector is a DenseVector
    -      case Some(wvDS) => {
    -        wvDS.map{wv => {
    -          val denseWeights = wv.weights match {
    -            case dv: DenseVector => dv
    -            case sv: SparseVector => sv.toDenseVector
    +    val initialWeightsDS: DataSet[WeightVector] = 
createInitialWeightsDS(initialWeights, data)
    +
    +    // Perform the iterations
    +    val optimizedWeights = convergenceThresholdOption match {
    +      // No convergence criterion
    +      case None =>
    +        initialWeightsDS.iterate(numberOfIterations) {
    +          weightVectorDS => {
    +            SGDStep(data, weightVectorDS)
               }
    -          WeightVector(denseWeights, wv.intercept)
             }
    -
    +      case Some(convergence) =>
    +        /** Calculates the regularized loss, from the data and given 
weights **/
    +        def lossCalculation(data: DataSet[LabeledVector], weightDS: 
DataSet[WeightVector]):
    +        DataSet[Double] = {
    +          data.map {
    +            new LossCalculation
    +          }.withBroadcastSet(weightDS, WEIGHTVECTOR_BROADCAST)
    +            .reduce {
    +            (left, right) =>
    +              val (leftLoss, leftCount) = left
    +              val (rightLoss, rightCount) = right
    +              (leftLoss + rightLoss, rightCount + leftCount)
    +          }
    +            .map{new RegularizedLossCalculation}
    +            .withBroadcastSet(weightDS, WEIGHTVECTOR_BROADCAST)
             }
    -      }
    -      case None => createInitialWeightVector(dimensionsDS)
    -    }
    -
    -    // Perform the iterations
    -    // TODO: Enable convergence stopping criterion, as in Multiple Linear 
regression
    -    initialWeightsDS.iterate(numberOfIterations) {
    -      weightVector => {
    -        SGDStep(data, weightVector)
    -      }
    +        // We have to calculate for each weight vector the sum of squared 
residuals,
    +        // and then sum them and apply regularization
    +        val initialLossSumDS = lossCalculation(data, initialWeightsDS)
    +
    +        // Combine weight vector with the current loss
    +        val initialWeightsWithLossSum = initialWeightsDS.
    +          crossWithTiny(initialLossSumDS).setParallelism(1)
    +
    +        val resultWithLoss = initialWeightsWithLossSum.
    +          iterateWithTermination(numberOfIterations) {
    +          weightsWithLossSum =>
    +
    +            // Extract weight vector and loss
    +            val previousWeightsDS = weightsWithLossSum.map{_._1}
    +            val previousLossSumDS = weightsWithLossSum.map{_._2}
    +
    +            val currentWeightsDS = SGDStep(data, previousWeightsDS)
    +
    +            val currentLossSumDS = lossCalculation(data, currentWeightsDS)
    +
    +            // Check if the relative change in the loss is smaller than the
    +            // convergence threshold. If yes, then terminate i.e. return 
empty termination data set
    +            val termination = 
previousLossSumDS.crossWithTiny(currentLossSumDS).setParallelism(1).
    +              filter{
    +              pair => {
    +                val (previousLoss, currentLoss) = pair
    +
    +                if (previousLoss <= 0) {
    +                  false
    +                } else {
    +                  math.abs((previousLoss - currentLoss)/previousLoss) >= 
convergence
    +                }
    +              }
    +            }
    +
    +            // Result for new iteration
    +            (currentWeightsDS cross currentLossSumDS, termination)
    +        }
    +        // Return just the weights
    +        resultWithLoss.map{_._1}
         }
    +    optimizedWeights
       }
     
    -  /** Mapping function that calculates the weight gradients from the data.
    +  /** Calculates the loss value, given a labeled vector and the current 
weight vector
         *
    +    * The weight vector is received as a broadcast variable.
         */
    -  private class GradientCalculation extends
    -    RichMapFunction[LabeledVector, (WeightVector, Double, Int)] {
    +  private class LossCalculation extends RichMapFunction[LabeledVector, 
(Double, Int)] {
     
         var weightVector: WeightVector = null
     
    +
         @throws(classOf[Exception])
         override def open(configuration: Configuration): Unit = {
           val list = this.getRuntimeContext.
             getBroadcastVariable[WeightVector](WEIGHTVECTOR_BROADCAST)
     
           weightVector = list.get(0)
    -    }
     
    -    override def map(example: LabeledVector): (WeightVector, Double, Int) 
= {
    +    }
     
    -      val lossFunction = parameterMap(LossFunction)
    -      val regType = parameterMap(RegularizationType)
    -      val regParameter = parameterMap(RegularizationParameter)
    -      val predictionFunction = parameterMap(PredictionFunctionParameter)
    +    override def map(example: LabeledVector): (Double, Int) = {
    +      val lossFunction = parameters(LossFunctionParameter)
    +      val predictionFunction = parameters(PredictionFunctionParameter)
           val dimensions = example.vector.size
    -      // TODO(tvas): Any point in carrying the weightGradient vector for 
in-place replacement?
    -      // The idea in spark is to avoid object creation, but here we have 
to do it anyway
    +      // TODO(tvas): Avoid needless creation of WeightGradient object
    +      // Create a lossValue function in LossFunction?
           val weightGradient = new DenseVector(new Array[Double](dimensions))
    --- End diff --
    
    I would probably like to change this. Worth it to add another public 
function to LossFunction?


> Add convergence criterion to SGD optimizer
> ------------------------------------------
>
>                 Key: FLINK-1992
>                 URL: https://issues.apache.org/jira/browse/FLINK-1992
>             Project: Flink
>          Issue Type: Improvement
>          Components: Machine Learning Library
>            Reporter: Till Rohrmann
>            Assignee: Theodore Vasiloudis
>              Labels: ML
>
> Currently, Flink's SGD optimizer runs for a fixed number of iterations. It 
> would be good to support a dynamic convergence criterion, too.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to