Hi Puneet,

Can you describe how you validated that the state is not restored properly?
Specifically, how did you introduce faults to the cluster?

Best,
Gary

On Tue, Mar 3, 2020 at 11:08 AM Puneet Kinra <
puneet.ki...@customercentria.com> wrote:

> Sorry for the missed information
>
> On recovery the value is coming as false instead of true, state.backend
> has been configured in flink-conf.yaml  along the
> the path for checkpointing and savepoint.
>
> On Tue, Mar 3, 2020 at 3:34 PM Puneet Kinra <
> puneet.ki...@customercentria.com> wrote:
>
>> Hi
>>
>> Stuck with the simple program regarding the checkpointing Flink version I
>> am using 1.10.0
>>
>> *Here I have created DummySource for testing*
>>
>> *DummySource*
>> package com.nudge.stateful;
>>
>> import org.apache.flink.api.java.tuple.Tuple2;
>> import org.apache.flink.streaming.api.functions.source.SourceFunction;
>>
>> public class BeaconSource implements SourceFunction<Tuple2<Long,String>>{
>>
>> /**
>> *
>> */
>> private static final long serialVersionUID = 1L;
>> private Boolean isRunning=true;
>>
>>
>> public BeaconSource() {
>> super();
>> // TODO Auto-generated constructor stub
>> }
>>
>>
>>
>> public void cancel() {
>> // TODO Auto-generated method stub
>>
>> this.isRunning=false;
>>
>> }
>>
>> public void run(SourceContext<Tuple2<Long,String>> arg0) throws Exception
>> {
>> // TODO Auto-generated method stub
>> while(isRunning) {
>> Thread.sleep(30000L);
>> arg0.collect(new Tuple2<Long,String>(100000L,"AMQSource"));
>> }
>> }
>>
>> }
>>
>>
>>
>> ---------------------------------------------------------------------------------------
>> *KeyedProcessFunction (to register the timer and update the status to
>> true so that only one-time trigger should)*
>>
>>
>> package com.nudge.stateful;
>>
>> import org.apache.flink.api.common.functions.IterationRuntimeContext;
>> import org.apache.flink.api.common.functions.RuntimeContext;
>> import org.apache.flink.api.common.state.ListState;
>> import org.apache.flink.api.common.state.ListStateDescriptor;
>> import org.apache.flink.api.common.state.ValueState;
>> import org.apache.flink.api.common.state.ValueStateDescriptor;
>> import org.apache.flink.api.common.typeinfo.TypeHint;
>> import org.apache.flink.api.common.typeinfo.TypeInformation;
>> import org.apache.flink.api.java.tuple.Tuple;
>> import org.apache.flink.api.java.tuple.Tuple2;
>> import org.apache.flink.api.java.tuple.Tuple3;
>> import org.apache.flink.configuration.Configuration;
>> import org.apache.flink.runtime.state.FunctionInitializationContext;
>> import org.apache.flink.runtime.state.FunctionSnapshotContext;
>> import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
>> import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
>> import org.apache.flink.streaming.api.functions.ProcessFunction;
>> import org.apache.flink.util.Collector;
>>
>> import com.google.gson.JsonObject;
>> import com.google.gson.JsonParser;
>>
>> import scala.collection.mutable.LinkedHashMap;
>>
>>
>>
>> import java.util.HashMap;
>> import java.util.Map;
>> import java.util.Map.Entry;
>> import java.util.Set;
>>
>> public class TimeProcessTrigger extends
>> KeyedProcessFunction<Tuple,Tuple2<Long,String>,String>{
>>
>> /**
>> *
>> */
>> private static final long serialVersionUID = 1L;
>> /**
>> *
>> */
>>
>> private transient ValueState<Boolean> contacthistory;
>> private static final  Long  ONE_MINUTE=60000L;
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> @Override
>> public void onTimer(long timestamp, KeyedProcessFunction<Tuple,
>> Tuple2<Long, String>, String>.OnTimerContext ctx,
>> Collector<String> out) throws Exception {
>> // TODO Auto-generated method stub
>> super.onTimer(timestamp, ctx, out);
>> System.out.println("Timer has fired for the key"+ctx.getCurrentKey());
>> }
>>
>>
>>
>>
>>
>>
>> @Override
>> public void open(Configuration parameters) throws Exception {
>> // TODO Auto-generated method stub
>> super.open(parameters);
>>
>>
>> ValueStateDescriptor<Boolean> descriptor = new
>> ValueStateDescriptor<Boolean>(
>> "contact-history", // the state name
>> Boolean.class); // type information
>>
>> this.contacthistory=getRuntimeContext().getState(descriptor);
>> }
>>
>>
>>
>>
>>
>>
>> @Override
>> public void processElement(Tuple2<Long, String> input,
>> KeyedProcessFunction<Tuple, Tuple2<Long, String>, String>.Context ctx,
>> Collector<String> collect)
>> throws Exception {
>> // TODO Auto-generated method stub
>>
>>
>> System.out.println(this.contacthistory.value());
>> Boolean value = this.contacthistory.value();
>> if(value==null) {
>> Long currentTime = ctx.timerService().currentProcessingTime();
>> Long regTimer=currentTime+ONE_MINUTE;
>> System.out.println("Updating the flag and registering the timer
>> @:"+regTimer);
>> this.contacthistory.update(true);
>> ctx.timerService().registerProcessingTimeTimer(regTimer);
>>
>> }else {
>> System.out.println("Timer has already register for this key");
>> }
>> }
>>
>> }
>>
>>
>> -------------------------------------------------
>> *Main App*
>>
>> package com.nudge.stateful;
>>
>> import org.apache.flink.api.java.functions.KeySelector;
>> import org.apache.flink.api.java.tuple.Tuple;
>> import org.apache.flink.api.java.tuple.Tuple2;
>> import org.apache.flink.api.java.utils.ParameterTool;
>> import org.apache.flink.runtime.state.filesystem.FsStateBackend;
>> import org.apache.flink.streaming.api.CheckpointingMode;
>> import org.apache.flink.streaming.api.TimeCharacteristic;
>> import org.apache.flink.streaming.api.datastream.DataStreamSource;
>> import org.apache.flink.streaming.api.datastream.KeyedStream;
>> import
>> org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
>> import
>> org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup;
>> import
>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
>>
>> import com.google.gson.JsonObject;
>> import com.google.gson.JsonParser;
>> import com.indiabulls.nudge.stateful.*;
>>
>> public class App
>> {
>> public static void main( String[] args ) throws Exception
>> {
>> StreamExecutionEnvironment env =
>> StreamExecutionEnvironment.getExecutionEnvironment();
>> env.enableCheckpointing(30000);
>> env.setParallelism(1);
>> // // advanced options:
>> // // set mode to exactly-once (this is the default)
>>
>> env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
>> // // make sure 500 ms of progress happen between checkpoints
>> env.getCheckpointConfig().setMinPauseBetweenCheckpoints(10000);
>> // // checkpoints have to complete within one minute, or are discarded
>> env.getCheckpointConfig().setCheckpointTimeout(60000);
>> // // allow only one checkpoint to be in progress at the same time
>> env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
>> // // enable externalized checkpoints which are retained after job
>> cancellation
>>
>> env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
>> // // allow job recovery fallback to checkpoint when there is a more
>> recent savepoint
>> env.getCheckpointConfig().setPreferCheckpointForRecovery(true);
>>
>> SingleOutputStreamOperator<Tuple2<Long, String>> AMQSource =
>> env.addSource(new BeaconSource())
>> .name("AMQSource");
>> env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
>> env.setParallelism(1);
>> KeyedStream<Tuple2<Long, String>, Tuple> keyedValues =
>> AMQSource.keyBy(0);
>> SingleOutputStreamOperator<String> processedStream =
>> keyedValues.process(new TimeProcessTrigger()).setParallelism(1);
>> processedStream.print();
>> env.execute();
>> }
>> }
>> --
>> *Cheers *
>>
>> *Puneet Kinra*
>>
>> *Mobile:+918800167808 | Skype : puneet.ki...@customercentria.com
>> <puneet.ki...@customercentria.com>*
>>
>> *e-mail :puneet.ki...@customercentria.com
>> <puneet.ki...@customercentria.com>*
>>
>>
>>
>
> --
> *Cheers *
>
> *Puneet Kinra*
>
> *Mobile:+918800167808 | Skype : puneet.ki...@customercentria.com
> <puneet.ki...@customercentria.com>*
>
> *e-mail :puneet.ki...@customercentria.com
> <puneet.ki...@customercentria.com>*
>
>
>

Reply via email to