If you manually cancel and restart the job, state is only carried forward
if you use a savepoint.
Can you check if that is what you are doing?

On Tue, Jun 25, 2019 at 2:21 PM Simon Su <barley...@163.com> wrote:

>
> Hi wanglei
>
>  Can you post how you restart the job ?
>
> Thanks,
> Simon
> On 06/25/2019 20:11,wangl...@geekplus.com.cn<wangl...@geekplus.com.cn>
> <wangl...@geekplus.com.cn> wrote:
>
> public class StateProcessTest extends KeyedProcessFunction<String, 
> Tuple2<String, Long>, String> {
>
>     private transient ValueState<Tuple2<Long,Long>> state;
>     public void processElement(Tuple2<String, Long> value, Context ctx, 
> Collector<String> out) throws Exception {
>
>
>         Tuple2<Long, Long> stateValue = state.value();
>
>         if(stateValue == null){
>             log.info("##########  initialize");
>             stateValue = new Tuple2(34l,56l);
>         }
>         state.update(stateValue);
>
>     }
>
>     @Override
>     public void open(Configuration parameters) throws Exception {
>         ValueStateDescriptor<Tuple2<Long,Long>> descriptor = new 
> ValueStateDescriptor<Tuple2<Long, Long>>("avg", TypeInformation.of(
>                 new TypeHint<Tuple2<Long, Long>>() {}));
>         state = getRuntimeContext().getState(descriptor);
>     }
> }
>
>
>
>
> Every time I restarted the job,   The stateValue is still null.
>
>
> ------------------------------
> wangl...@geekplus.com.cn
>
>
>
>

Reply via email to