Hello,
That's an error in the documentation, only the ValueStateDescriptor has
a defaultValue constructor argument.
Regards,
Chesnay
On 24.07.2017 14:56, ZalaCheung wrote:
Hi Martin,
Thanks for your advice. That’s really helpful. I am using the push
scenario. I am now having some trouble because of the state I want to
maintain. For me, the simplest way is to maintain to ValueState in a
CoFlatMapFunction(Actually RichCoFlatMapFunction). But the rich
function can only be used on Keyed Stream. And for a connected stream,
at least for my scenario, I should not use KeyBy() method(Actually it
seems not allowed to use KeyBy() function on connected stream ).
Thus instead of using Rich function for Keyed Managed State, I tried
to use CheckpointedFunction for my non-keyed state. However, in
CheckpointedFunction, I can only use ListState, which only has add()
and Iterator method. I am not sure whether I can just replace the
element in the ListState. What exactly make me stuck is that I cannot
initialize my ListState with ListStateDescriptor. It says there is no
constructor for initialization value. I actually saw that on official
document.
https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/state.html
|@Override public void initializeState(FunctionInitializationContext
context) throws Exception { ListStateDescriptor<Tuple2<String,
Integer>> descriptor = new ListStateDescriptor<>( "buffered-elements",
TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {}),
Tuple2.of(0L, 0L)); checkpointedState =
context.getOperatorStateStore().getListState(descriptor); if
(context.isRestored()) { for (Tuple2<String, Integer> element :
checkpointedState.get()) { bufferedElements.add(element); } } }|
But in my code(Flink 1.3.1), it says there’s no constructor for three
arguments(the third argument in the example above is the default
value). I am really confused.
How can I maintain my state for the CoFlatMap function?
Thanks
Desheng Zhang
On Jul 24, 2017, at 19:44, Martin Eden <martineden...@gmail.com
<mailto:martineden...@gmail.com>> wrote:
Hey Desheng,
Some options that come to mind:
- Cave man style: Stop and restart job with new config.
- Poll scenario: You could build your own thread that periodically
loads from the db into a per worker accessible cache.
- Push scenario: have a config stream (based off of some queue) which
you connect to your data stream via the connect operator. In the
CoFlatMapFunction that you have to provide you basically update Flink
state from the config flatMap and read the flink state from the data
flatMap and pass it along with the data. Then in the specific
operator that uses the config it can always get it from the data
tuple that comes alongside the data, say in an invoke method call of
a sink. Example here
<https://image.slidesharecdn.com/flinkstreambasics-160909223620/95/apache-flink-training-datastream-api-basics-34-638.jpg?cb=1497888680>.
Hope that gives u some ideas,
M
On Mon, Jul 24, 2017 at 12:16 PM, ZalaCheung
<gzzhangdesh...@corp.netease.com
<mailto:gzzhangdesh...@corp.netease.com>> wrote:
Hi all,
I am now trying to implement a anomaly detection algorithm on
Flink, which is actually implement a Map operator to do anomaly
detection based on timeseries.
At first I want to read configuration(like which kafka source
host to read datastream from and which sink address to write data
to ) from mongo db. It contains some system metric I want to
monitor.
What I did was read configuration from mongo DB and set as
configuration of flink.
StreamExecutionEnvironment see
=StreamExecutionEnvironment.getExecutionEnvironment(); Configuration conf =new
Configuration();
JSONObject jsonConfiguration =readConfiguration();
conf.setInteger("period",jsonConfiguration.getInt("period"));
conf.setDouble("percentage",jsonConfiguration.getDouble("percentage"));
conf.setDouble(“metric",jsonConfiguration.getDouble(“metric"));
see.getConfig().setGlobalJobParameters(conf);
The “readConfiguration()” method read the configuration from mongoDB.
Just like the code I showed above. I set globalJobParameters to
let all my workers share these parameters including the metric I
want to monitor.But maybe at some point I want to change the
metric I want to monitor. I think one possible way is to
dynamically(or periodically) read configuration and reset
the globalJobParameters to make the Flink program to change the
metric to monitor. Is that possible?
Thanks
Desheng Zhang