Hi Martin, Thanks for your advice. That’s really helpful. I am using the push scenario. I am now having some trouble because of the state I want to maintain. For me, the simplest way is to maintain to ValueState in a CoFlatMapFunction(Actually RichCoFlatMapFunction). But the rich function can only be used on Keyed Stream. And for a connected stream, at least for my scenario, I should not use KeyBy() method(Actually it seems not allowed to use KeyBy() function on connected stream ).
Thus instead of using Rich function for Keyed Managed State, I tried to use CheckpointedFunction for my non-keyed state. However, in CheckpointedFunction, I can only use ListState, which only has add() and Iterator method. I am not sure whether I can just replace the element in the ListState. What exactly make me stuck is that I cannot initialize my ListState with ListStateDescriptor. It says there is no constructor for initialization value. I actually saw that on official document. https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/state.html <https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/state.html> @Override public void initializeState(FunctionInitializationContext context) throws Exception { ListStateDescriptor<Tuple2<String, Integer>> descriptor = new ListStateDescriptor<>( "buffered-elements", TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {}), Tuple2.of(0L, 0L)); checkpointedState = context.getOperatorStateStore().getListState(descriptor); if (context.isRestored()) { for (Tuple2<String, Integer> element : checkpointedState.get()) { bufferedElements.add(element); } } } But in my code(Flink 1.3.1), it says there’s no constructor for three arguments(the third argument in the example above is the default value). I am really confused. How can I maintain my state for the CoFlatMap function? Thanks Desheng Zhang > On Jul 24, 2017, at 19:44, Martin Eden <martineden...@gmail.com> wrote: > > Hey Desheng, > > Some options that come to mind: > - Cave man style: Stop and restart job with new config. > - Poll scenario: You could build your own thread that periodically loads from > the db into a per worker accessible cache. > - Push scenario: have a config stream (based off of some queue) which you > connect to your data stream via the connect operator. In the > CoFlatMapFunction that you have to provide you basically update Flink state > from the config flatMap and read the flink state from the data flatMap and > pass it along with the data. Then in the specific operator that uses the > config it can always get it from the data tuple that comes alongside the > data, say in an invoke method call of a sink. Example here > <https://image.slidesharecdn.com/flinkstreambasics-160909223620/95/apache-flink-training-datastream-api-basics-34-638.jpg?cb=1497888680>. > > Hope that gives u some ideas, > M > > > On Mon, Jul 24, 2017 at 12:16 PM, ZalaCheung <gzzhangdesh...@corp.netease.com > <mailto:gzzhangdesh...@corp.netease.com>> wrote: > Hi all, > > I am now trying to implement a anomaly detection algorithm on Flink, which > is actually implement a Map operator to do anomaly detection based on > timeseries. > At first I want to read configuration(like which kafka source host to read > datastream from and which sink address to write data to ) from mongo db. It > contains some system metric I want to monitor. > > What I did was read configuration from mongo DB and set as configuration of > flink. > > StreamExecutionEnvironment see = > StreamExecutionEnvironment.getExecutionEnvironment(); > Configuration conf = new Configuration(); > JSONObject jsonConfiguration = readConfiguration(); > conf.setInteger("period",jsonConfiguration.getInt("period")); > conf.setDouble("percentage",jsonConfiguration.getDouble("percentage")); > conf.setDouble(“metric",jsonConfiguration.getDouble(“metric")); > see.getConfig().setGlobalJobParameters(conf); > The “readConfiguration()” method read the configuration from mongoDB. > > Just like the code I showed above. I set globalJobParameters to let all my > workers share these parameters including the metric I want to monitor.But > maybe at some point I want to change the metric I want to monitor. I think > one possible way is to dynamically(or periodically) read configuration and > reset the globalJobParameters to make the Flink program to change the metric > to monitor. Is that possible? > > Thanks > Desheng Zhang > > >