[ https://issues.apache.org/jira/browse/FLINK-1731?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14605753#comment-14605753 ]
ASF GitHub Bot commented on FLINK-1731: --------------------------------------- Github user peedeeX21 commented on a diff in the pull request: https://github.com/apache/flink/pull/700#discussion_r33475321 --- Diff: flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/clustering/KMeans.scala --- @@ -0,0 +1,247 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.clustering + +import org.apache.flink.api.common.functions.RichMapFunction +import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields +import org.apache.flink.api.scala.{DataSet, _} +import org.apache.flink.configuration.Configuration +import org.apache.flink.ml.common.{LabeledVector, _} +import org.apache.flink.ml.math.Breeze._ +import org.apache.flink.ml.math.Vector +import org.apache.flink.ml.metrics.distances.EuclideanDistanceMetric +import org.apache.flink.ml.pipeline._ + +import scala.collection.JavaConverters._ + + +/** + * Implements the KMeans algorithm which calculates cluster centroids based on set of training data + * points and a set of k initial centroids. + * + * [[KMeans]] is a [[Predictor]] which needs to be trained on a set of data points and can then be + * used to assign new points to the learned cluster centroids. + * + * The KMeans algorithm works as described on Wikipedia + * (http://en.wikipedia.org/wiki/K-means_clustering): + * + * Given an initial set of k means m1(1),…,mk(1) (see below), the algorithm proceeds by alternating + * between two steps: + * + * ===Assignment step:=== + * + * Assign each observation to the cluster whose mean yields the least within-cluster sum of + * squares (WCSS). Since the sum of squares is the squared Euclidean distance, this is intuitively + * the "nearest" mean. (Mathematically, this means partitioning the observations according to the + * Voronoi diagram generated by the means). + * + * `S_i^(t) = { x_p : || x_p - m_i^(t) ||^2 ≤ || x_p - m_j^(t) ||^2 \forall j, 1 ≤ j ≤ k}`, + * where each `x_p` is assigned to exactly one `S^{(t)}`, even if it could be assigned to two or + * more of them. + * + * ===Update step:=== + * + * Calculate the new means to be the centroids of the observations in the new clusters. + * + * `m^{(t+1)}_i = ( 1 / |S^{(t)}_i| ) \sum_{x_j \in S^{(t)}_i} x_j` + * + * Since the arithmetic mean is a least-squares estimator, this also minimizes the within-cluster + * sum of squares (WCSS) objective. + * + * @example + * {{{ + * val trainingDS: DataSet[Vector] = env.fromCollection(Clustering.trainingData) + * val initialCentroids: DataSet[LabledVector] = env.fromCollection(Clustering.initCentroids) + * + * val kmeans = KMeans() + * .setInitialCentroids(initialCentroids) + * .setNumIterations(10) + * + * kmeans.fit(trainingDS) + * + * // getting the computed centroids + * val centroidsResult = kmeans.centroids.get.collect() + * + * // get matching clusters for new points + * val testDS: DataSet[Vector] = env.fromCollection(Clustering.testData) + * val clusters: DataSet[LabeledVector] = kmeans.predict(testDS) + * }}} + * + * =Parameters= + * + * - [[org.apache.flink.ml.clustering.KMeans.NumIterations]]: + * Defines the number of iterations to recalculate the centroids of the clusters. As it + * is a heuristic algorithm, there is no guarantee that it will converge to the global optimum. The + * centroids of the clusters and the reassignment of the data points will be repeated till the + * given number of iterations is reached. + * (Default value: '''10''') + * + * - [[org.apache.flink.ml.clustering.KMeans.InitialCentroids]]: + * Defines the initial k centroids of the k clusters. They are used as start off point of the + * algorithm for clustering the data set. The centroids are recalculated as often as set in + * [[org.apache.flink.ml.clustering.KMeans.NumIterations]]. The choice of the initial centroids + * mainly affects the outcome of the algorithm. + * + */ +class KMeans extends Predictor[KMeans] { + + import KMeans._ + + /** Stores the learned clusters after the fit operation */ + var centroids: Option[DataSet[LabeledVector]] = None + + /** + * Sets the number of iterations. + * + * @param numIterations + * @return itself + */ + def setNumIterations(numIterations: Int): KMeans = { + parameters.add(NumIterations, numIterations) + this + } + + /** + * Sets the initial centroids on which the algorithm will start computing. + * These points should depend on the data and significantly influence the resulting centroids. + * + * @param initialCentroids A sequence of labeled vectors. + * @return itself + */ + def setInitialCentroids(initialCentroids: DataSet[LabeledVector]): KMeans = { + parameters.add(InitialCentroids, initialCentroids) + this + } + +} + +/** + * Companion object of KMeans. Contains convenience functions, the parameter type definitions + * of the algorithm and the [[FitOperation]] & [[PredictOperation]]. + */ +object KMeans { + val CENTROIDS = "centroids" + + case object NumIterations extends Parameter[Int] { + val defaultValue = Some(10) + } + + case object InitialCentroids extends Parameter[DataSet[LabeledVector]] { + val defaultValue = None + } + + // ========================================== Factory methods ==================================== + + def apply(): KMeans = { + new KMeans() + } + + // ========================================== Operations ========================================= + + /** + * [[PredictOperation]] for vector types. The result type is a [[LabeledVector]]. + */ + implicit def predictValues = { + new PredictOperation[KMeans, Vector, LabeledVector] { + override def predict( + instance: KMeans, + predictParameters: ParameterMap, + input: DataSet[Vector]) + : DataSet[LabeledVector] = { + + instance.centroids match { + case Some(centroids) => { + input.map(new SelectNearestCenterMapper).withBroadcastSet(centroids, CENTROIDS) + } + + case None => { + throw new RuntimeException("The KMeans model has not been trained. Call first fit" + + "before calling the predict operation.") + } + } + } + } + } + + /** + * [[FitOperation]] which iteratively computes centroids that match the given input DataSet by + * adjusting the given initial centroids. + */ + implicit def fitKMeans = { + new FitOperation[KMeans, Vector] { + override def fit( + instance: KMeans, + fitParameters: ParameterMap, + input: DataSet[Vector]) + : Unit = { + val resultingParameters = instance.parameters ++ fitParameters + + val centroids: DataSet[LabeledVector] = resultingParameters.get(InitialCentroids).get + val numIterations: Int = resultingParameters.get(NumIterations).get + + val finalCentroids = centroids.iterate(numIterations) { currentCentroids => + val newCentroids: DataSet[LabeledVector] = input + .map(new SelectNearestCenterMapper).withBroadcastSet(currentCentroids, CENTROIDS) --- End diff -- done > Add kMeans clustering algorithm to machine learning library > ----------------------------------------------------------- > > Key: FLINK-1731 > URL: https://issues.apache.org/jira/browse/FLINK-1731 > Project: Flink > Issue Type: New Feature > Components: Machine Learning Library > Reporter: Till Rohrmann > Assignee: Peter Schrott > Labels: ML > > The Flink repository already contains a kMeans implementation but it is not > yet ported to the machine learning library. I assume that only the used data > types have to be adapted and then it can be more or less directly moved to > flink-ml. > The kMeans++ [1] and the kMeans|| [2] algorithm constitute a better > implementation because the improve the initial seeding phase to achieve near > optimal clustering. It might be worthwhile to implement kMeans||. > Resources: > [1] http://ilpubs.stanford.edu:8090/778/1/2006-13.pdf > [2] http://theory.stanford.edu/~sergei/papers/vldb12-kmpar.pdf -- This message was sent by Atlassian JIRA (v6.3.4#6332)