Hi wanglei


 Can you post how you restart the job ? 


Thanks,
Simon
On 06/25/2019 20:11,wangl...@geekplus.com.cn<wangl...@geekplus.com.cn> wrote:
public class StateProcessTest extends KeyedProcessFunction<String, 
Tuple2<String, Long>, String> {

private transient ValueState<Tuple2<Long,Long>> state;
public void processElement(Tuple2<String, Long> value, Context ctx, 
Collector<String> out) throws Exception {


        Tuple2<Long, Long> stateValue = state.value();

if(stateValue == null){
log.info("##########  initialize");
            stateValue = new Tuple2(34l,56l);
        }
state.update(stateValue);

    }

@Override
public void open(Configuration parameters) throws Exception {
        ValueStateDescriptor<Tuple2<Long,Long>> descriptor = new 
ValueStateDescriptor<Tuple2<Long, Long>>("avg", TypeInformation.of(
new TypeHint<Tuple2<Long, Long>>() {}));
state = getRuntimeContext().getState(descriptor);
    }
}






Every time I restarted the job,   The stateValue is still null.




wangl...@geekplus.com.cn
 

Reply via email to