Hi,all
As we know java map support transfer value,such as : HashMap<String, List<String>> stringListHashMap = new HashMap<>(); for (int i = 0; i < 10; i++) { List<String> a = stringListHashMap.get("a"+i%2); if (a==null){ a=new ArrayList<>(); stringListHashMap.put("a"+i%2,a); } a.add("a"+i); } stringListHashMap.keySet().forEach(x-> System.out.println("keys========= "+x)); stringListHashMap.get("a0").forEach(x-> System.out.println("========x=== "+x)); Result: keys========= a1 keys========= a0 ========x=== a0 ========x=== a2 ========x=== a4 ========x=== a6 ========x=== a8 But flink on cluster, mapstate not support transfer value, such as : .process(new KeyedProcessFunction<String, Tuple2<String, Object>, String>() { ... @Override public void open(Configuration parameters) throws Exception { super.open(parameters); StateTtlConfig ttlConfig = StateTtlConfig .newBuilder(Time.days(7)) .cleanupInBackground() .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)//default .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired) .build(); MapStateDescriptor noMatchDescriptor = new MapStateDescriptor<String, List<Object>>( "kuduError", BasicTypeInfo.STRING_TYPE_INFO, TypeInformation.of(new TypeHint<List<Object>>() { }) ); ... } ... @Override public void processElement(Tuple2<String, Object> stringObjectTuple2, Context context, Collector<String> collector) throws Exception { ... List<Object> objectList = new ArrayList<>(); if (mapState.contains(key)) { objectList = mapState.get(key); }else{ mapState.put(key,objectList); } logger.info("key{} add before objectList.size {}",key,objectList.size()); objectList.add(stringObjectTuple2.f1); logger.info("key{} add after objectList.size {}",key,objectList.size()); objectList = mapState.get(key); logger.info("key{} mapState objectList.size {}",key,objectList.size()); ... } If I run this code in IDEA support transfer value, this log detail: keyorder2infos add before objectList.size 0 keyorder2infos add after objectList.size 1 keyorder2infos mapState objectList.size 1 keyorder2infos add before objectList.size 1 keyorder2infos add after objectList.size 2 keyorder2infos mapState objectList.size 2 … But if run it on yarn-cluster not support transfer value ,this log detail: keyorder2infos add before objectList.size 0 keyorder2infos add after objectList.size 1 keyorder2infos mapState objectList.size 0 keyorder2infos add before objectList.size 0 keyorder2infos add after objectList.size 1 keyorder2infos mapState objectList.size 0 … So, I want to konw : 1.This is a bug or my code has some wrong? 2.Why mapstate on yarn-cluster not support transfer value? I have seen flink source code ,but not find. It runs on: Flink 1.9.0 Java 1.8 Hadoop 2.6.0 Best, Shengjk1