Hi all,

Does anyone have idea about the non-keyed managed state problem below?
I think all the function in the testFunc class should share the ListState 
“metrics”. But after I add element to ListState at flatMap2 function, I cannot 
retrieve the element added to ListState.


Desheng Zhang


> On Jul 24, 2017, at 22:06, ZalaCheung <gzzhangdesh...@corp.netease.com> wrote:
> 
> Hi Chesnay,
> 
> Thank you very much. Now I tried to ignore the default value of ListState and 
> Try to use the CoFlatmap function with managed state. But what surprised me 
> is that it seems the state was not shared by two streams.
> 
> My test code is shown below.
> 
> DataStream<String> result = stream
>         .connect(control)
>         .flatMap(new testFunc());
> 
> public static class testFunc implements 
> CoFlatMapFunction<String,String,String>,CheckpointedFunction{
> 
>     private ListState<String> metrics;
> 
>     @Override
>     public void snapshotState(FunctionSnapshotContext 
> functionSnapshotContext) throws Exception {
> 
>     }
> 
>     @Override
>     public void initializeState(FunctionInitializationContext 
> functionInitializationContext) throws Exception {
>         ListStateDescriptor<String> metricsStateDescriptor =
>                 new ListStateDescriptor<>(
>                         "metrics",
>                         TypeInformation.of(new TypeHint<String>() {}));
>         metrics = 
> functionInitializationContext.getOperatorStateStore().getListState(metricsStateDescriptor);
> 
>     }
> 
>     @Override
>     public void flatMap1(String s, Collector<String> collector) throws 
> Exception {
>         String myMetrics = null;
>         for(String element:metrics.get()){
>             logger.info("element in metric: " + s);
>             myMetrics = element;
>         }
>         if(myMetrics == null){
>             logger.info("Not initialized");
>         }else {
>             logger.info("initialized: " + myMetrics);
>         }
> 
>     }
> 
>     @Override
>     public void flatMap2(String s, Collector<String> collector) throws 
> Exception {
>         metrics.clear();
>         metrics.add(s);
>         
>         for(String element:metrics.get()){
>             logger.info("element in metric: " + element);
> 
>         }
> 
>     }
> }
> 
> I connected two streams(stream and control) and use CoflatmapFunction on 
> them. For control stream, I send a string and print the right log:
> - element in metric: heyjude
> Then I send another string to the first stream. 
> But the log prints:
> - Not initialized
> 
> I am confused. I successfully receive msg for stream control and add the 
> string to ListState. But when I tried to retrieve ListState and flatMap1, I 
> got nothing.
> 
> Thanks.
> Desheng Zhang
> 
> 
> 
>> On Jul 24, 2017, at 21:01, Chesnay Schepler <ches...@apache.org 
>> <mailto:ches...@apache.org>> wrote:
>> 
>> Hello,
>> 
>> That's an error in the documentation, only the ValueStateDescriptor has a 
>> defaultValue constructor argument.
>> 
>> Regards,
>> Chesnay
>> 
>> On 24.07.2017 14:56, ZalaCheung wrote:
>>> Hi Martin,
>>> 
>>> Thanks for your advice. That’s really helpful. I am using the push 
>>> scenario. I am now having some trouble because of the state I want to 
>>> maintain. For me, the simplest way is to maintain to ValueState in a 
>>> CoFlatMapFunction(Actually RichCoFlatMapFunction). But the rich function 
>>> can only be used on Keyed Stream. And for a connected stream, at least for 
>>> my scenario, I should not use KeyBy() method(Actually it seems not allowed 
>>> to use KeyBy() function on connected stream ).
>>> 
>>> Thus instead of using Rich function for Keyed Managed State, I tried to use 
>>> CheckpointedFunction for my non-keyed state. However, in 
>>> CheckpointedFunction, I can only use ListState, which only has add() and 
>>> Iterator method. I am not sure whether I can just replace the element in 
>>> the ListState. What exactly make me stuck is that I cannot initialize my 
>>> ListState with ListStateDescriptor. It says there is no constructor for 
>>> initialization value. I actually saw that on official document.
>>> 
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/state.html
>>>  
>>> <https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/state.html>
>>> 
>>> @Override
>>>     public void initializeState(FunctionInitializationContext context) 
>>> throws Exception {
>>>         ListStateDescriptor<Tuple2<String, Integer>> descriptor =
>>>             new ListStateDescriptor<>(
>>>                 "buffered-elements",
>>>                 TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {}),
>>>                 Tuple2.of(0L, 0L));
>>> 
>>>         checkpointedState = 
>>> context.getOperatorStateStore().getListState(descriptor);
>>> 
>>>         if (context.isRestored()) {
>>>             for (Tuple2<String, Integer> element : checkpointedState.get()) 
>>> {
>>>                 bufferedElements.add(element);
>>>             }
>>>         }
>>>     }
>>> 
>>> 
>>> But in my code(Flink 1.3.1), it says there’s no constructor for three 
>>> arguments(the third argument in the example above is the default value). I 
>>> am really confused.
>>> 
>>> How can I maintain my state for the CoFlatMap function?
>>> 
>>> 
>>> Thanks
>>>  Desheng Zhang
>>> 
>>> 
>>>> On Jul 24, 2017, at 19:44, Martin Eden <martineden...@gmail.com 
>>>> <mailto:martineden...@gmail.com>> wrote:
>>>> 
>>>> Hey Desheng,
>>>> 
>>>> Some options that come to mind:
>>>> - Cave man style: Stop and restart job with new config.
>>>> - Poll scenario: You could build your own thread that periodically loads 
>>>> from the db into a per worker accessible cache.
>>>> - Push scenario: have a config stream (based off of some queue) which you 
>>>> connect to your data stream via the connect operator. In the 
>>>> CoFlatMapFunction that you have to provide you basically update Flink 
>>>> state from the config flatMap and read the flink state from the data 
>>>> flatMap and pass it along with the data. Then in the specific operator 
>>>> that uses the config it can always get it from the data tuple that comes 
>>>> alongside the data, say in an invoke method call of a sink. Example here 
>>>> <https://image.slidesharecdn.com/flinkstreambasics-160909223620/95/apache-flink-training-datastream-api-basics-34-638.jpg?cb=1497888680>.
>>>> 
>>>> Hope that gives u some ideas,
>>>> M
>>>> 
>>>> 
>>>> On Mon, Jul 24, 2017 at 12:16 PM, ZalaCheung 
>>>> <gzzhangdesh...@corp.netease.com <mailto:gzzhangdesh...@corp.netease.com>> 
>>>> wrote:
>>>> Hi all, 
>>>> 
>>>> I am  now trying to implement a anomaly detection algorithm on Flink, 
>>>> which is actually implement a Map operator to do anomaly detection based 
>>>> on timeseries.
>>>> At first I want to read configuration(like which kafka source host to read 
>>>> datastream from and which sink address to write data to ) from mongo db. 
>>>> It contains some system metric  I want to monitor.
>>>> 
>>>> What I did was read configuration from mongo DB and set as configuration 
>>>> of flink.
>>>> 
>>>> StreamExecutionEnvironment see = 
>>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>>> Configuration conf = new Configuration();
>>>> JSONObject jsonConfiguration = readConfiguration();
>>>> conf.setInteger("period",jsonConfiguration.getInt("period"));
>>>> conf.setDouble("percentage",jsonConfiguration.getDouble("percentage"));
>>>> conf.setDouble(“metric",jsonConfiguration.getDouble(“metric"));
>>>> see.getConfig().setGlobalJobParameters(conf);
>>>> The “readConfiguration()” method read the configuration from mongoDB.
>>>> 
>>>> Just like the code I showed above. I set globalJobParameters to let all my 
>>>> workers share these parameters including the metric I want to monitor.But 
>>>> maybe at some point I want to change the metric I want to monitor. I think 
>>>> one possible way is to dynamically(or periodically) read  configuration 
>>>> and reset the globalJobParameters to make the Flink program to change the 
>>>> metric to monitor. Is  that possible?
>>>> 
>>>> Thanks
>>>> Desheng Zhang
>>>> 
>>>> 
>>>> 
>>> 
>> 
> 

Reply via email to