Hey Desheng, Some options that come to mind: - Cave man style: Stop and restart job with new config. - Poll scenario: You could build your own thread that periodically loads from the db into a per worker accessible cache. - Push scenario: have a config stream (based off of some queue) which you connect to your data stream via the connect operator. In the CoFlatMapFunction that you have to provide you basically update Flink state from the config flatMap and read the flink state from the data flatMap and pass it along with the data. Then in the specific operator that uses the config it can always get it from the data tuple that comes alongside the data, say in an invoke method call of a sink. Example here <https://image.slidesharecdn.com/flinkstreambasics-160909223620/95/apache-flink-training-datastream-api-basics-34-638.jpg?cb=1497888680> .
Hope that gives u some ideas, M On Mon, Jul 24, 2017 at 12:16 PM, ZalaCheung < gzzhangdesh...@corp.netease.com> wrote: > Hi all, > > I am now trying to implement a anomaly detection algorithm on Flink, > which is actually implement a Map operator to do anomaly detection based on > timeseries. > At first I want to read configuration(like which kafka source host to read > datastream from and which sink address to write data to ) from mongo db. It > contains some system metric I want to monitor. > > What I did was read configuration from mongo DB and set as configuration > of flink. > > StreamExecutionEnvironment see = > StreamExecutionEnvironment.getExecutionEnvironment(); > Configuration conf = new Configuration(); > > JSONObject jsonConfiguration = readConfiguration(); > > conf.setInteger("period",jsonConfiguration.getInt("period")); > conf.setDouble("percentage",jsonConfiguration.getDouble("percentage")); > conf.setDouble(“metric",jsonConfiguration.getDouble(“metric")); > > see.getConfig().setGlobalJobParameters(conf); > > The “readConfiguration()” method read the configuration from mongoDB. > > Just like the code I showed above. I set globalJobParameters to let all my > workers share these parameters including the metric I want to monitor.But > maybe at some point I want to change the metric I want to monitor. I think > one possible way is to dynamically(or periodically) read configuration and > reset the globalJobParameters to make the Flink program to change the > metric to monitor. Is that possible? > > Thanks > Desheng Zhang > > >