Hi Shengjk1, You should call "mapState.put(key,objectList);" manually after calling "objectList.add(stringObjectTuple2.f1);" to write it to the state backend. This is because objectList is just a common Java list object and it will not be synced to state backend automatic when updated.
I guess the result you see is because heap statebackend is used when running in IDE and rocksdb statebackend is used when running in YARN mode. You can check the configuration if it's the case. Regards, Dian > 在 2019年9月19日,下午7:42,shengjk1 <jsjsjs1...@163.com> 写道: > > Hi,all > > As we know java map support transfer value,such as : > > HashMap<String, List<String>> stringListHashMap = new > HashMap<>(); > for (int i = 0; i < 10; i++) { > List<String> a = stringListHashMap.get("a"+i%2); > if (a==null){ > a=new ArrayList<>(); > stringListHashMap.put("a"+i%2,a); > } > a.add("a"+i); > } > stringListHashMap.keySet().forEach(x-> > System.out.println("keys========= "+x)); > stringListHashMap.get("a0").forEach(x-> > System.out.println("========x=== "+x)); > > Result: > keys========= a1 > keys========= a0 > ========x=== a0 > ========x=== a2 > ========x=== a4 > ========x=== a6 > ========x=== a8 > > > But flink on cluster, mapstate not support transfer value, such as : > > .process(new KeyedProcessFunction<String, Tuple2<String, Object>, > String>() { > ... > @Override > public void open(Configuration parameters) throws > Exception { > super.open(parameters); > > StateTtlConfig ttlConfig = StateTtlConfig > .newBuilder(Time.days(7)) > .cleanupInBackground() > > .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)//default > > .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired) > .build(); > > MapStateDescriptor noMatchDescriptor = new > MapStateDescriptor<String, List<Object>>( > "kuduError", > BasicTypeInfo.STRING_TYPE_INFO, > TypeInformation.of(new > TypeHint<List<Object>>() { > }) > ); > ... > } > ... > > @Override > public void processElement(Tuple2<String, Object> > stringObjectTuple2, Context context, Collector<String> collector) throws > Exception { > ... > List<Object> objectList = new ArrayList<>(); > if (mapState.contains(key)) { > objectList = mapState.get(key); > }else{ > mapState.put(key,objectList); > } > logger.info <http://logger.info/>("key{} add > before objectList.size {}",key,objectList.size()); > objectList.add(stringObjectTuple2.f1); > logger.info <http://logger.info/>("key{} add > after objectList.size {}",key,objectList.size()); > objectList = mapState.get(key); > logger.info <http://logger.info/>("key{} > mapState objectList.size {}",key,objectList.size()); > ... > } > > If I run this code in IDEA support transfer value, this log detail: > > keyorder2infos add before objectList.size 0 > keyorder2infos add after objectList.size 1 > keyorder2infos mapState objectList.size 1 > > keyorder2infos add before objectList.size 1 > keyorder2infos add after objectList.size 2 > keyorder2infos mapState objectList.size 2 > … > > > But if run it on yarn-cluster not support transfer value ,this log detail: > > keyorder2infos add before objectList.size 0 > keyorder2infos add after objectList.size 1 > keyorder2infos mapState objectList.size 0 > > keyorder2infos add before objectList.size 0 > keyorder2infos add after objectList.size 1 > keyorder2infos mapState objectList.size 0 > … > > > So, I want to konw : > 1.This is a bug or my code has some wrong? > 2.Why mapstate on yarn-cluster not support transfer value? I have seen > flink source code ,but not find. > > > It runs on: > Flink 1.9.0 > Java 1.8 > Hadoop 2.6.0 > > > Best, > Shengjk1 > > >