Hi, I have a couple of questions about the onlineKmeans algorithm. I am running OnlikeKmeans on a small dataset (36k records) with k=2 and testing varying decay rates. The features are consumed in flink from kafka topic. Sample feature: [0.3333, 5.0, 1.0, 0.0, 0.0, 0.0, 0.0]
Is the implementation of flink-ml similar to spark's streamingKmeans? Should I expect similar results when running the same dataset through both? I am getting very different results. spark is learning much better. Does the rate of feature ingestion affect the results? Is it supposed to do `fit` on every point? Is `fit` slower than `transform` operation? Thanks for your help! Sample code i am running final DenseVector[] trainData = new DenseVector[] { Vectors.dense(1.0, 7.0, 1.0, 0.0, 0.0, 0.0, 0.0), Vectors.dense(1.0, 15.0, 1.0, 0.0, 0.0, 0.0, 0.0) }; String featuresCol = "features"; String predictionCol = "prediction"; StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); env.enableCheckpointing(100); env.setRestartStrategy(RestartStrategies.noRestart()); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); KMeans kMeans = new KMeans().setFeaturesCol(featuresCol).setPredictionCol(predictionCol); Table offlineTrainTable = tEnv.fromDataStream(env.fromElements(trainData)).as(featuresCol); KMeansModel model = kMeans.fit(offlineTrainTable); KafkaSource<FeatureWithLabelJson> inputSource = KafkaSource.<FeatureWithLabelJson>builder() .setBootstrapServers(bootstrapServer) .setTopics("sample_topic") .setGroupId("sample_group") .setStartingOffsets(OffsetsInitializer.latest()) .setValueOnlyDeserializer(new JsonFeatureSchema()) .build(); DataStream<FeatureWithLabelJson> messageStream = env.fromSource(inputSource, WatermarkStrategy.noWatermarks(), "Kafka Source"); Table input = tEnv.fromDataStream(messageStream).as(featuresCol); OnlineKMeans onlineKMeans = new OnlineKMeans() .setGlobalBatchSize(1) .setFeaturesCol(featuresCol) .setPredictionCol(predictionCol) .setDecayFactor(0.3) .setInitialModelData(model.getModelData()[0]); OnlineKMeansModel onlineModel = onlineKMeans.fit(input); Table outputTable = onlineModel.transform(input)[0];