I agree with what Ankur said. The kmeans seeding program ('takeSample' method) runs in parallel, so each partition has its sampling points based on the local data which will cause the 'partition agnostic'. The seeding method is based on Bahmani et al. kmeansII algorithm which gives approximation guarantees on the kmeans cost.
You could set the initial seeding points which will avoid the 'agnostic' issue. Regards, Yu Zhang On Wed, May 24, 2017 at 1:49 PM, Ankur Srivastava < ankur.srivast...@gmail.com> wrote: > Hi Christoph, > > I am not an expert in ML and have not used Spark KMeans but your problem > seems to be an issue of local minimum vs global minimum. You should run > K-means multiple times with random starting point and also try with > multiple values of K (unless you are already sure). > > Hope this helps. > > Thanks > Ankur > > > > On Wed, May 24, 2017 at 2:15 AM, Christoph Bruecke <carabo...@gmail.com> > wrote: > >> Hi Anastasios, >> >> thanks for the reply but caching doesn’t seem to change anything. >> >> After further investigation it really seems that the RDD#takeSample >> method is the cause of the non-reproducibility. >> >> Is this considered a bug and should I open an Issue for that? >> >> BTW: my example script contains a little type in line 3: it is `feature` >> not `features` (mind the `s`). >> >> Best, >> Christoph >> >> The script with caching >> >> ``` >> import org.apache.spark.sql.DataFrame >> import org.apache.spark.ml.clustering.KMeans >> import org.apache.spark.ml.feature.VectorAssembler >> import org.apache.spark.storage.StorageLevel >> >> // generate random data for clustering >> val randomData = spark.range(1, 1000).withColumn("a", >> rand(123)).withColumn("b", rand(321)) >> >> val vecAssembler = new VectorAssembler().setInputCols(Array("a", >> "b")).setOutputCol("features") >> >> val data = vecAssembler.transform(randomData) >> >> // instantiate KMeans with fixed seed >> val kmeans = new KMeans().setK(10).setSeed(9876L) >> >> // train the model with different partitioning >> val dataWith1Partition = data.repartition(1) >> // cache the data >> dataWith1Partition.persist(StorageLevel.MEMORY_AND_DISK) >> println("1 Partition: " + kmeans.fit(dataWith1Partition) >> .computeCost(dataWith1Partition)) >> >> val dataWith4Partition = data.repartition(4) >> // cache the data >> dataWith4Partition.persist(StorageLevel.MEMORY_AND_DISK) >> println("4 Partition: " + kmeans.fit(dataWith4Partition) >> .computeCost(dataWith4Partition)) >> >> >> ``` >> >> Output: >> >> ``` >> 1 Partition: 16.028212597888057 >> 4 Partition: 16.14758460544976 >> ``` >> >> > On 22 May 2017, at 16:33, Anastasios Zouzias <zouz...@gmail.com> wrote: >> > >> > Hi Christoph, >> > >> > Take a look at this, you might end up having a similar case: >> > >> > http://www.spark.tc/using-sparks-cache-for-correctness-not- >> just-performance/ >> > >> > If this is not the case, then I agree with you the kmeans should be >> partitioning agnostic (although I haven't check the code yet). >> > >> > Best, >> > Anastasios >> > >> > >> > On Mon, May 22, 2017 at 3:42 PM, Christoph Bruecke <carabo...@gmail.com> >> wrote: >> > Hi, >> > >> > I’m trying to figure out how to use KMeans in order to achieve >> reproducible results. I have found that running the same kmeans instance on >> the same data, with different partitioning will produce different >> clusterings. >> > >> > Given a simple KMeans run with fixed seed returns different results on >> the same >> > training data, if the training data is partitioned differently. >> > >> > Consider the following example. The same KMeans clustering set up is >> run on >> > identical data. The only difference is the partitioning of the training >> data >> > (one partition vs. four partitions). >> > >> > ``` >> > import org.apache.spark.sql.DataFrame >> > import org.apache.spark.ml.clustering.KMeans >> > import org.apache.spark.ml.features.VectorAssembler >> > >> > // generate random data for clustering >> > val randomData = spark.range(1, 1000).withColumn("a", >> rand(123)).withColumn("b", rand(321)) >> > >> > val vecAssembler = new VectorAssembler().setInputCols(Array("a", >> "b")).setOutputCol("features") >> > >> > val data = vecAssembler.transform(randomData) >> > >> > // instantiate KMeans with fixed seed >> > val kmeans = new KMeans().setK(10).setSeed(9876L) >> > >> > // train the model with different partitioning >> > val dataWith1Partition = data.repartition(1) >> > println("1 Partition: " + kmeans.fit(dataWith1Partition) >> .computeCost(dataWith1Partition)) >> > >> > val dataWith4Partition = data.repartition(4) >> > println("4 Partition: " + kmeans.fit(dataWith4Partition) >> .computeCost(dataWith4Partition)) >> > ``` >> > >> > I get the following related cost >> > >> > ``` >> > 1 Partition: 16.028212597888057 >> > 4 Partition: 16.14758460544976 >> > ``` >> > >> > What I want to achieve is that repeated computations of the KMeans >> Clustering should yield identical result on identical training data, >> regardless of the partitioning. >> > >> > Looking through the Spark source code, I guess the cause is the >> initialization method of KMeans which in turn uses the `takeSample` method, >> which does not seem to be partition agnostic. >> > >> > Is this behaviour expected? Is there anything I could do to achieve >> reproducible results? >> > >> > Best, >> > Christoph >> > --------------------------------------------------------------------- >> > To unsubscribe e-mail: user-unsubscr...@spark.apache.org >> > >> > >> > >> > >> > -- >> > -- Anastasios Zouzias >> >> >> --------------------------------------------------------------------- >> To unsubscribe e-mail: user-unsubscr...@spark.apache.org >> >> >