Hello everyone, Please forgive me if this is not the correct mailing list for the issue below.
Here is my use case: I have an event data stream which I need to "keyby" a certain field (tenant id) and then for each tenant's events I need to independently perform ML clustering using FlinkML's OnlineKMeans component. I am using Java. I tried different approaches but none of them seems to be correct. Basically, I try to keep an OnlineKMeansModel instance as per-key (thus per-tenant) state using a keyed processing function on the event DataStream. In the processing function for the current event, if the OnlineKMeansModel instance for the event's tenant id is not yet created, I will create one and store it as state for that tenant id, to use it in the future. However, this doesn't seem to be the correct way to do it in Flink, I am facing many hurdles using this approach. - The OnlineKMeans takes a table (as in Table API) as input; that table is basically a view of the event data stream, filtered by a certain tenant id. How do I go about this ? - The OnlineKMeansModel is provided a table to output its predictions to. How do I go about this table ? - I get many "this class is not serializable" errors, a sign that I am not using the correct approach. etc. In essence, I feel that I am overlooking a fundamental aspect when it comes to implementing a functional approach for performing FlinkML computations independently for each key within a keyed data stream. In the hope that my use case was understood, I am asking you for help on the correct approach for this scenario. Thank you ! -- Catalin Stavaru