Hello everyone, Here is my use case: I have an event data stream which I need to key by a certain field (tenant id) and then for each tenant's events I need to independently perform ML clustering using FlinkML's OnlineKMeans component. I am using Java.
I tried different approaches but none of them seems to be correct. Basically, I try to keep an OnlineKMeansModel instance as per-tenant state using a keyed processing function on the input DataStream. In the processing function for the current event, if I cannot retrieve the OnlineKMeansModel instance for the event's tenant id, I will create one and store it as state for that tenant id, to use it in the future. However, this doesn't seem to be the correct way to do it in Flink, I am facing many hurdles using this approach. - The OnlineKMeans takes a table (as in Table API) as input; that table is basically a view of the event data stream, filtered by a certain tenant id. How do I go about this ? - The OnlineKMeansModel is provided a table to output its predictions to. How do I go about this table ? - I get many "this class is not serializable" errors, a sign that I am not using the correct approach. etc. In essence, I feel that I am overlooking a fundamental aspect when it comes to implementing a functional approach for performing FlinkML computations independently for each key within a keyed data stream. In the hope that my use case was understood, I am asking you for help on the correct approach for this scenario. Thank you ! -- Catalin Stavaru