Hi all, I am now trying to implement a anomaly detection algorithm on Flink, which is actually implement a Map operator to do anomaly detection based on timeseries. At first I want to read configuration(like which kafka source host to read datastream from and which sink address to write data to ) from mongo db. It contains some system metric I want to monitor.
What I did was read configuration from mongo DB and set as configuration of flink. StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment(); Configuration conf = new Configuration(); JSONObject jsonConfiguration = readConfiguration(); conf.setInteger("period",jsonConfiguration.getInt("period")); conf.setDouble("percentage",jsonConfiguration.getDouble("percentage")); conf.setDouble(“metric",jsonConfiguration.getDouble(“metric")); see.getConfig().setGlobalJobParameters(conf); The “readConfiguration()” method read the configuration from mongoDB. Just like the code I showed above. I set globalJobParameters to let all my workers share these parameters including the metric I want to monitor.But maybe at some point I want to change the metric I want to monitor. I think one possible way is to dynamically(or periodically) read configuration and reset the globalJobParameters to make the Flink program to change the metric to monitor. Is that possible? Thanks Desheng Zhang