public class StateProcessTest extends KeyedProcessFunction<String, 
Tuple2<String, Long>, String> {
    
    private transient ValueState<Tuple2<Long,Long>> state;
    public void processElement(Tuple2<String, Long> value, Context ctx, 
Collector<String> out) throws Exception {


        Tuple2<Long, Long> stateValue = state.value();

        if(stateValue == null){
            log.info("##########  initialize");
            stateValue = new Tuple2(34l,56l);
        }
        state.update(stateValue);
  
    }

    @Override
    public void open(Configuration parameters) throws Exception {
        ValueStateDescriptor<Tuple2<Long,Long>> descriptor = new 
ValueStateDescriptor<Tuple2<Long, Long>>("avg", TypeInformation.of(
                new TypeHint<Tuple2<Long, Long>>() {}));
        state = getRuntimeContext().getState(descriptor);
    }
}



Every time I restarted the job,   The stateValue is still null.




wangl...@geekplus.com.cn
 

Reply via email to