[ https://issues.apache.org/jira/browse/FLINK-1992?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14553820#comment-14553820 ]
ASF GitHub Bot commented on FLINK-1992: --------------------------------------- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/692#discussion_r30780043 --- Diff: flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/GradientDescent.scala --- @@ -76,86 +77,163 @@ class GradientDescent(runParameters: ParameterMap) extends IterativeSolver { }.withBroadcastSet(currentWeights, WEIGHTVECTOR_BROADCAST) } + + /** Provides a solution for the given optimization problem * * @param data A Dataset of LabeledVector (label, features) pairs - * @param initWeights The initial weights that will be optimized + * @param initialWeights The initial weights that will be optimized * @return The weights, optimized for the provided data. */ override def optimize( data: DataSet[LabeledVector], - initWeights: Option[DataSet[WeightVector]]): DataSet[WeightVector] = { - // TODO: Faster way to do this? - val dimensionsDS = data.map(_.vector.size).reduce((a, b) => b) - - val numberOfIterations: Int = parameterMap(Iterations) + initialWeights: Option[DataSet[WeightVector]]): DataSet[WeightVector] = { + val numberOfIterations: Int = parameters(Iterations) + // TODO(tvas): This looks out of place, why don't we get back an Option from + // parameters(ConvergenceThreshold)? + val convergenceThresholdOption = parameters.get(ConvergenceThreshold) // Initialize weights - val initialWeightsDS: DataSet[WeightVector] = initWeights match { - // Ensure provided weight vector is a DenseVector - case Some(wvDS) => { - wvDS.map{wv => { - val denseWeights = wv.weights match { - case dv: DenseVector => dv - case sv: SparseVector => sv.toDenseVector + val initialWeightsDS: DataSet[WeightVector] = createInitialWeightsDS(initialWeights, data) + + // Perform the iterations + val optimizedWeights = convergenceThresholdOption match { + // No convergence criterion + case None => + initialWeightsDS.iterate(numberOfIterations) { + weightVectorDS => { + SGDStep(data, weightVectorDS) } - WeightVector(denseWeights, wv.intercept) } - + case Some(convergence) => + /** Calculates the regularized loss, from the data and given weights **/ --- End diff -- One line comments should use `//` > Add convergence criterion to SGD optimizer > ------------------------------------------ > > Key: FLINK-1992 > URL: https://issues.apache.org/jira/browse/FLINK-1992 > Project: Flink > Issue Type: Improvement > Components: Machine Learning Library > Reporter: Till Rohrmann > Assignee: Theodore Vasiloudis > Priority: Minor > Labels: ML > Fix For: 0.9 > > > Currently, Flink's SGD optimizer runs for a fixed number of iterations. It > would be good to support a dynamic convergence criterion, too. -- This message was sent by Atlassian JIRA (v6.3.4#6332)