[ https://issues.apache.org/jira/browse/FLINK-1992?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14553832#comment-14553832 ]
ASF GitHub Bot commented on FLINK-1992: --------------------------------------- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/692#discussion_r30780392 --- Diff: flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/GradientDescent.scala --- @@ -76,86 +77,163 @@ class GradientDescent(runParameters: ParameterMap) extends IterativeSolver { }.withBroadcastSet(currentWeights, WEIGHTVECTOR_BROADCAST) } + + /** Provides a solution for the given optimization problem * * @param data A Dataset of LabeledVector (label, features) pairs - * @param initWeights The initial weights that will be optimized + * @param initialWeights The initial weights that will be optimized * @return The weights, optimized for the provided data. */ override def optimize( data: DataSet[LabeledVector], - initWeights: Option[DataSet[WeightVector]]): DataSet[WeightVector] = { - // TODO: Faster way to do this? - val dimensionsDS = data.map(_.vector.size).reduce((a, b) => b) - - val numberOfIterations: Int = parameterMap(Iterations) + initialWeights: Option[DataSet[WeightVector]]): DataSet[WeightVector] = { + val numberOfIterations: Int = parameters(Iterations) + // TODO(tvas): This looks out of place, why don't we get back an Option from + // parameters(ConvergenceThreshold)? + val convergenceThresholdOption = parameters.get(ConvergenceThreshold) // Initialize weights - val initialWeightsDS: DataSet[WeightVector] = initWeights match { - // Ensure provided weight vector is a DenseVector - case Some(wvDS) => { - wvDS.map{wv => { - val denseWeights = wv.weights match { - case dv: DenseVector => dv - case sv: SparseVector => sv.toDenseVector + val initialWeightsDS: DataSet[WeightVector] = createInitialWeightsDS(initialWeights, data) + + // Perform the iterations + val optimizedWeights = convergenceThresholdOption match { + // No convergence criterion + case None => + initialWeightsDS.iterate(numberOfIterations) { + weightVectorDS => { + SGDStep(data, weightVectorDS) } - WeightVector(denseWeights, wv.intercept) } - + case Some(convergence) => + /** Calculates the regularized loss, from the data and given weights **/ + def lossCalculation(data: DataSet[LabeledVector], weightDS: DataSet[WeightVector]): + DataSet[Double] = { + data.map { + new LossCalculation + }.withBroadcastSet(weightDS, WEIGHTVECTOR_BROADCAST) + .reduce { + (left, right) => + val (leftLoss, leftCount) = left + val (rightLoss, rightCount) = right + (leftLoss + rightLoss, rightCount + leftCount) + } + .map{new RegularizedLossCalculation} + .withBroadcastSet(weightDS, WEIGHTVECTOR_BROADCAST) } - } - case None => createInitialWeightVector(dimensionsDS) - } - - // Perform the iterations - // TODO: Enable convergence stopping criterion, as in Multiple Linear regression - initialWeightsDS.iterate(numberOfIterations) { - weightVector => { - SGDStep(data, weightVector) - } + // We have to calculate for each weight vector the sum of squared residuals, + // and then sum them and apply regularization + val initialLossSumDS = lossCalculation(data, initialWeightsDS) + + // Combine weight vector with the current loss + val initialWeightsWithLossSum = initialWeightsDS. + crossWithTiny(initialLossSumDS).setParallelism(1) + + val resultWithLoss = initialWeightsWithLossSum. + iterateWithTermination(numberOfIterations) { + weightsWithLossSum => + + // Extract weight vector and loss + val previousWeightsDS = weightsWithLossSum.map{_._1} + val previousLossSumDS = weightsWithLossSum.map{_._2} + + val currentWeightsDS = SGDStep(data, previousWeightsDS) + + val currentLossSumDS = lossCalculation(data, currentWeightsDS) + + // Check if the relative change in the loss is smaller than the + // convergence threshold. If yes, then terminate i.e. return empty termination data set + val termination = previousLossSumDS.crossWithTiny(currentLossSumDS).setParallelism(1). + filter{ + pair => { + val (previousLoss, currentLoss) = pair + + if (previousLoss <= 0) { + false + } else { + math.abs((previousLoss - currentLoss)/previousLoss) >= convergence --- End diff -- Can we also support different convergence criterion? For example, the absolute loss? > Add convergence criterion to SGD optimizer > ------------------------------------------ > > Key: FLINK-1992 > URL: https://issues.apache.org/jira/browse/FLINK-1992 > Project: Flink > Issue Type: Improvement > Components: Machine Learning Library > Reporter: Till Rohrmann > Assignee: Theodore Vasiloudis > Priority: Minor > Labels: ML > Fix For: 0.9 > > > Currently, Flink's SGD optimizer runs for a fixed number of iterations. It > would be good to support a dynamic convergence criterion, too. -- This message was sent by Atlassian JIRA (v6.3.4#6332)