Hi,

I have a couple of questions about the onlineKmeans algorithm. I am running
OnlikeKmeans on a small dataset (36k records) with k=2 and testing varying
decay rates. The features are consumed in flink from kafka topic.
Sample feature: [0.3333, 5.0, 1.0, 0.0, 0.0, 0.0, 0.0]

Is the implementation of flink-ml similar to spark's streamingKmeans?
Should I expect similar results when running the same dataset through both?
I am getting very different results. spark is learning much better.

Does the rate of feature ingestion affect the results? Is it supposed to do
`fit` on every point?
Is `fit` slower than `transform` operation?
Thanks for your help!

Sample code i am running

final DenseVector[] trainData =
        new DenseVector[] {
                Vectors.dense(1.0, 7.0, 1.0, 0.0, 0.0, 0.0, 0.0),
                Vectors.dense(1.0, 15.0, 1.0, 0.0, 0.0, 0.0, 0.0)
};

String featuresCol = "features";
String predictionCol = "prediction";

StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.enableCheckpointing(100);
env.setRestartStrategy(RestartStrategies.noRestart());
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

KMeans kMeans = new
KMeans().setFeaturesCol(featuresCol).setPredictionCol(predictionCol);
Table offlineTrainTable =
tEnv.fromDataStream(env.fromElements(trainData)).as(featuresCol);
KMeansModel model = kMeans.fit(offlineTrainTable);

KafkaSource<FeatureWithLabelJson> inputSource =
KafkaSource.<FeatureWithLabelJson>builder()
        .setBootstrapServers(bootstrapServer)
        .setTopics("sample_topic")
        .setGroupId("sample_group")
        .setStartingOffsets(OffsetsInitializer.latest())
        .setValueOnlyDeserializer(new JsonFeatureSchema())
        .build();


DataStream<FeatureWithLabelJson> messageStream =
        env.fromSource(inputSource, WatermarkStrategy.noWatermarks(),
"Kafka Source");


Table input = tEnv.fromDataStream(messageStream).as(featuresCol);
OnlineKMeans onlineKMeans =
        new OnlineKMeans()
                .setGlobalBatchSize(1)
                .setFeaturesCol(featuresCol)
                .setPredictionCol(predictionCol)
                .setDecayFactor(0.3)
                .setInitialModelData(model.getModelData()[0]);

OnlineKMeansModel onlineModel = onlineKMeans.fit(input);
Table outputTable = onlineModel.transform(input)[0];

Reply via email to