Hi all, Does anyone have idea about the non-keyed managed state problem below? I think all the function in the testFunc class should share the ListState “metrics”. But after I add element to ListState at flatMap2 function, I cannot retrieve the element added to ListState.
Desheng Zhang > On Jul 24, 2017, at 22:06, ZalaCheung <gzzhangdesh...@corp.netease.com> wrote: > > Hi Chesnay, > > Thank you very much. Now I tried to ignore the default value of ListState and > Try to use the CoFlatmap function with managed state. But what surprised me > is that it seems the state was not shared by two streams. > > My test code is shown below. > > DataStream<String> result = stream > .connect(control) > .flatMap(new testFunc()); > > public static class testFunc implements > CoFlatMapFunction<String,String,String>,CheckpointedFunction{ > > private ListState<String> metrics; > > @Override > public void snapshotState(FunctionSnapshotContext > functionSnapshotContext) throws Exception { > > } > > @Override > public void initializeState(FunctionInitializationContext > functionInitializationContext) throws Exception { > ListStateDescriptor<String> metricsStateDescriptor = > new ListStateDescriptor<>( > "metrics", > TypeInformation.of(new TypeHint<String>() {})); > metrics = > functionInitializationContext.getOperatorStateStore().getListState(metricsStateDescriptor); > > } > > @Override > public void flatMap1(String s, Collector<String> collector) throws > Exception { > String myMetrics = null; > for(String element:metrics.get()){ > logger.info("element in metric: " + s); > myMetrics = element; > } > if(myMetrics == null){ > logger.info("Not initialized"); > }else { > logger.info("initialized: " + myMetrics); > } > > } > > @Override > public void flatMap2(String s, Collector<String> collector) throws > Exception { > metrics.clear(); > metrics.add(s); > > for(String element:metrics.get()){ > logger.info("element in metric: " + element); > > } > > } > } > > I connected two streams(stream and control) and use CoflatmapFunction on > them. For control stream, I send a string and print the right log: > - element in metric: heyjude > Then I send another string to the first stream. > But the log prints: > - Not initialized > > I am confused. I successfully receive msg for stream control and add the > string to ListState. But when I tried to retrieve ListState and flatMap1, I > got nothing. > > Thanks. > Desheng Zhang > > > >> On Jul 24, 2017, at 21:01, Chesnay Schepler <ches...@apache.org >> <mailto:ches...@apache.org>> wrote: >> >> Hello, >> >> That's an error in the documentation, only the ValueStateDescriptor has a >> defaultValue constructor argument. >> >> Regards, >> Chesnay >> >> On 24.07.2017 14:56, ZalaCheung wrote: >>> Hi Martin, >>> >>> Thanks for your advice. That’s really helpful. I am using the push >>> scenario. I am now having some trouble because of the state I want to >>> maintain. For me, the simplest way is to maintain to ValueState in a >>> CoFlatMapFunction(Actually RichCoFlatMapFunction). But the rich function >>> can only be used on Keyed Stream. And for a connected stream, at least for >>> my scenario, I should not use KeyBy() method(Actually it seems not allowed >>> to use KeyBy() function on connected stream ). >>> >>> Thus instead of using Rich function for Keyed Managed State, I tried to use >>> CheckpointedFunction for my non-keyed state. However, in >>> CheckpointedFunction, I can only use ListState, which only has add() and >>> Iterator method. I am not sure whether I can just replace the element in >>> the ListState. What exactly make me stuck is that I cannot initialize my >>> ListState with ListStateDescriptor. It says there is no constructor for >>> initialization value. I actually saw that on official document. >>> >>> https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/state.html >>> >>> <https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/state.html> >>> >>> @Override >>> public void initializeState(FunctionInitializationContext context) >>> throws Exception { >>> ListStateDescriptor<Tuple2<String, Integer>> descriptor = >>> new ListStateDescriptor<>( >>> "buffered-elements", >>> TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {}), >>> Tuple2.of(0L, 0L)); >>> >>> checkpointedState = >>> context.getOperatorStateStore().getListState(descriptor); >>> >>> if (context.isRestored()) { >>> for (Tuple2<String, Integer> element : checkpointedState.get()) >>> { >>> bufferedElements.add(element); >>> } >>> } >>> } >>> >>> >>> But in my code(Flink 1.3.1), it says there’s no constructor for three >>> arguments(the third argument in the example above is the default value). I >>> am really confused. >>> >>> How can I maintain my state for the CoFlatMap function? >>> >>> >>> Thanks >>> Desheng Zhang >>> >>> >>>> On Jul 24, 2017, at 19:44, Martin Eden <martineden...@gmail.com >>>> <mailto:martineden...@gmail.com>> wrote: >>>> >>>> Hey Desheng, >>>> >>>> Some options that come to mind: >>>> - Cave man style: Stop and restart job with new config. >>>> - Poll scenario: You could build your own thread that periodically loads >>>> from the db into a per worker accessible cache. >>>> - Push scenario: have a config stream (based off of some queue) which you >>>> connect to your data stream via the connect operator. In the >>>> CoFlatMapFunction that you have to provide you basically update Flink >>>> state from the config flatMap and read the flink state from the data >>>> flatMap and pass it along with the data. Then in the specific operator >>>> that uses the config it can always get it from the data tuple that comes >>>> alongside the data, say in an invoke method call of a sink. Example here >>>> <https://image.slidesharecdn.com/flinkstreambasics-160909223620/95/apache-flink-training-datastream-api-basics-34-638.jpg?cb=1497888680>. >>>> >>>> Hope that gives u some ideas, >>>> M >>>> >>>> >>>> On Mon, Jul 24, 2017 at 12:16 PM, ZalaCheung >>>> <gzzhangdesh...@corp.netease.com <mailto:gzzhangdesh...@corp.netease.com>> >>>> wrote: >>>> Hi all, >>>> >>>> I am now trying to implement a anomaly detection algorithm on Flink, >>>> which is actually implement a Map operator to do anomaly detection based >>>> on timeseries. >>>> At first I want to read configuration(like which kafka source host to read >>>> datastream from and which sink address to write data to ) from mongo db. >>>> It contains some system metric I want to monitor. >>>> >>>> What I did was read configuration from mongo DB and set as configuration >>>> of flink. >>>> >>>> StreamExecutionEnvironment see = >>>> StreamExecutionEnvironment.getExecutionEnvironment(); >>>> Configuration conf = new Configuration(); >>>> JSONObject jsonConfiguration = readConfiguration(); >>>> conf.setInteger("period",jsonConfiguration.getInt("period")); >>>> conf.setDouble("percentage",jsonConfiguration.getDouble("percentage")); >>>> conf.setDouble(“metric",jsonConfiguration.getDouble(“metric")); >>>> see.getConfig().setGlobalJobParameters(conf); >>>> The “readConfiguration()” method read the configuration from mongoDB. >>>> >>>> Just like the code I showed above. I set globalJobParameters to let all my >>>> workers share these parameters including the metric I want to monitor.But >>>> maybe at some point I want to change the metric I want to monitor. I think >>>> one possible way is to dynamically(or periodically) read configuration >>>> and reset the globalJobParameters to make the Flink program to change the >>>> metric to monitor. Is that possible? >>>> >>>> Thanks >>>> Desheng Zhang >>>> >>>> >>>> >>> >> >