Hi Yuri,

The state that you access with getRuntimeContext().getState(...) is
scoped to the key (so for every new key this state will be null).
What key do you use?

Regards,
Roman

On Fri, Mar 12, 2021 at 7:22 AM Maminspapin <un...@mail.ru> wrote:
>
> I have following piece of configuration in flink.yaml:
>
> Key                                                             Value
> high-availability                                               zookeeper
> high-availability.storageDir                            
> file:///home/flink/flink-ha-data
> high-availability.zookeeper.quorum              localhost:2181
> state.backend                                           rocksdb
> state.backend.incremental                               true
> state.checkpoints.dir                                   
> file:///home/flink/checkpoints
>
> And in my code (Main.class):
>
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.setParallelism(1);
> env.setStateBackend(new
> RocksDBStateBackend("file:///home/flink/checkpoint-data", true));
> env.enableCheckpointing(Duration.ofMinutes(5).toMillis());
>
> Also the next class should to save data in store, when event is received:
>
> public class StateManager extends KeyedProcessFunction<String, String,
> String> {
>
>     private ValueState<String> events;
>
>
>     @Override
>     public void processElement(String s, Context context, Collector<String>
> collector) throws Exception {
>
>         System.out.println("events: " + events.value()); // Check last value
> for this key
>
>         Model model = new Gson().fromJson(s, Model.class);
>         events.update(model.toString());
>     }
>
>     @Override
>     public void open(Configuration parameters) throws Exception {
>         ValueStateDescriptor<String> stateDescriptor = new
> ValueStateDescriptor<>("state", Types.STRING);
>         events = getRuntimeContext().getState(stateDescriptor);
>         System.out.println("In open");
>     }
> }
>
>
> But when I stop a job and start it again no saving data I see. I check it
> with printing data to sysout. There is null value after restarting job.
>
> But why do I get this behavior? Maybe my settings is not proper?
>
> Thanks,
> Yuri L.
>
>
>
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Reply via email to