Hi all, 

I am  now trying to implement a anomaly detection algorithm on Flink, which is 
actually implement a Map operator to do anomaly detection based on timeseries.
At first I want to read configuration(like which kafka source host to read 
datastream from and which sink address to write data to ) from mongo db. It 
contains some system metric  I want to monitor.

What I did was read configuration from mongo DB and set as configuration of 
flink.

StreamExecutionEnvironment see = 
StreamExecutionEnvironment.getExecutionEnvironment();
Configuration conf = new Configuration();
JSONObject jsonConfiguration = readConfiguration();
conf.setInteger("period",jsonConfiguration.getInt("period"));
conf.setDouble("percentage",jsonConfiguration.getDouble("percentage"));
conf.setDouble(“metric",jsonConfiguration.getDouble(“metric"));
see.getConfig().setGlobalJobParameters(conf);
The “readConfiguration()” method read the configuration from mongoDB.

Just like the code I showed above. I set globalJobParameters to let all my 
workers share these parameters including the metric I want to monitor.But maybe 
at some point I want to change the metric I want to monitor. I think one 
possible way is to dynamically(or periodically) read  configuration and reset 
the globalJobParameters to make the Flink program to change the metric to 
monitor. Is  that possible?

Thanks
Desheng Zhang


Reply via email to