public class StateProcessTest extends KeyedProcessFunction<String, Tuple2<String, Long>, String> { private transient ValueState<Tuple2<Long,Long>> state; public void processElement(Tuple2<String, Long> value, Context ctx, Collector<String> out) throws Exception {
Tuple2<Long, Long> stateValue = state.value(); if(stateValue == null){ log.info("########## initialize"); stateValue = new Tuple2(34l,56l); } state.update(stateValue); } @Override public void open(Configuration parameters) throws Exception { ValueStateDescriptor<Tuple2<Long,Long>> descriptor = new ValueStateDescriptor<Tuple2<Long, Long>>("avg", TypeInformation.of( new TypeHint<Tuple2<Long, Long>>() {})); state = getRuntimeContext().getState(descriptor); } } Every time I restarted the job, The stateValue is still null. wangl...@geekplus.com.cn