If you manually cancel and restart the job, state is only carried forward if you use a savepoint. Can you check if that is what you are doing?
On Tue, Jun 25, 2019 at 2:21 PM Simon Su <barley...@163.com> wrote: > > Hi wanglei > > Can you post how you restart the job ? > > Thanks, > Simon > On 06/25/2019 20:11,wangl...@geekplus.com.cn<wangl...@geekplus.com.cn> > <wangl...@geekplus.com.cn> wrote: > > public class StateProcessTest extends KeyedProcessFunction<String, > Tuple2<String, Long>, String> { > > private transient ValueState<Tuple2<Long,Long>> state; > public void processElement(Tuple2<String, Long> value, Context ctx, > Collector<String> out) throws Exception { > > > Tuple2<Long, Long> stateValue = state.value(); > > if(stateValue == null){ > log.info("########## initialize"); > stateValue = new Tuple2(34l,56l); > } > state.update(stateValue); > > } > > @Override > public void open(Configuration parameters) throws Exception { > ValueStateDescriptor<Tuple2<Long,Long>> descriptor = new > ValueStateDescriptor<Tuple2<Long, Long>>("avg", TypeInformation.of( > new TypeHint<Tuple2<Long, Long>>() {})); > state = getRuntimeContext().getState(descriptor); > } > } > > > > > Every time I restarted the job, The stateValue is still null. > > > ------------------------------ > wangl...@geekplus.com.cn > > > >