Hi Martin,

Thanks for your advice. That’s really helpful. I am using the push scenario. I 
am now having some trouble because of the state I want to maintain. For me, the 
simplest way is to maintain to ValueState in a CoFlatMapFunction(Actually 
RichCoFlatMapFunction). But the rich function can only be used on Keyed Stream. 
And for a connected stream, at least for my scenario, I should not use KeyBy() 
method(Actually it seems not allowed to use KeyBy() function on connected 
stream ).

Thus instead of using Rich function for Keyed Managed State, I tried to use 
CheckpointedFunction for my non-keyed state. However, in CheckpointedFunction, 
I can only use ListState, which only has add() and Iterator method. I am not 
sure whether I can just replace the element in the ListState. What exactly make 
me stuck is that I cannot initialize my ListState with ListStateDescriptor. It 
says there is no constructor for initialization value. I actually saw that on 
official document.

https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/state.html
 
<https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/state.html>

@Override
    public void initializeState(FunctionInitializationContext context) throws 
Exception {
        ListStateDescriptor<Tuple2<String, Integer>> descriptor =
            new ListStateDescriptor<>(
                "buffered-elements",
                TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {}),
                Tuple2.of(0L, 0L));

        checkpointedState = 
context.getOperatorStateStore().getListState(descriptor);

        if (context.isRestored()) {
            for (Tuple2<String, Integer> element : checkpointedState.get()) {
                bufferedElements.add(element);
            }
        }
    }


But in my code(Flink 1.3.1), it says there’s no constructor for three 
arguments(the third argument in the example above is the default value). I am 
really confused.

How can I maintain my state for the CoFlatMap function?


Thanks
 Desheng Zhang


> On Jul 24, 2017, at 19:44, Martin Eden <martineden...@gmail.com> wrote:
> 
> Hey Desheng,
> 
> Some options that come to mind:
> - Cave man style: Stop and restart job with new config.
> - Poll scenario: You could build your own thread that periodically loads from 
> the db into a per worker accessible cache.
> - Push scenario: have a config stream (based off of some queue) which you 
> connect to your data stream via the connect operator. In the 
> CoFlatMapFunction that you have to provide you basically update Flink state 
> from the config flatMap and read the flink state from the data flatMap and 
> pass it along with the data. Then in the specific operator that uses the 
> config it can always get it from the data tuple that comes alongside the 
> data, say in an invoke method call of a sink. Example here 
> <https://image.slidesharecdn.com/flinkstreambasics-160909223620/95/apache-flink-training-datastream-api-basics-34-638.jpg?cb=1497888680>.
> 
> Hope that gives u some ideas,
> M
> 
> 
> On Mon, Jul 24, 2017 at 12:16 PM, ZalaCheung <gzzhangdesh...@corp.netease.com 
> <mailto:gzzhangdesh...@corp.netease.com>> wrote:
> Hi all, 
> 
> I am  now trying to implement a anomaly detection algorithm on Flink, which 
> is actually implement a Map operator to do anomaly detection based on 
> timeseries.
> At first I want to read configuration(like which kafka source host to read 
> datastream from and which sink address to write data to ) from mongo db. It 
> contains some system metric  I want to monitor.
> 
> What I did was read configuration from mongo DB and set as configuration of 
> flink.
> 
> StreamExecutionEnvironment see = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> Configuration conf = new Configuration();
> JSONObject jsonConfiguration = readConfiguration();
> conf.setInteger("period",jsonConfiguration.getInt("period"));
> conf.setDouble("percentage",jsonConfiguration.getDouble("percentage"));
> conf.setDouble(“metric",jsonConfiguration.getDouble(“metric"));
> see.getConfig().setGlobalJobParameters(conf);
> The “readConfiguration()” method read the configuration from mongoDB.
> 
> Just like the code I showed above. I set globalJobParameters to let all my 
> workers share these parameters including the metric I want to monitor.But 
> maybe at some point I want to change the metric I want to monitor. I think 
> one possible way is to dynamically(or periodically) read  configuration and 
> reset the globalJobParameters to make the Flink program to change the metric 
> to monitor. Is  that possible?
> 
> Thanks
> Desheng Zhang
> 
> 
> 

Reply via email to