Hi,all

As we know java map support transfer value,such as :


               HashMap<String, List<String>> stringListHashMap = new 
HashMap<>();
for (int i = 0; i < 10; i++) {
List<String> a = stringListHashMap.get("a"+i%2);
if (a==null){
a=new ArrayList<>();
stringListHashMap.put("a"+i%2,a);
}
a.add("a"+i);
}
stringListHashMap.keySet().forEach(x-> System.out.println("keys========= "+x));
stringListHashMap.get("a0").forEach(x-> System.out.println("========x=== "+x));


 Result:
   keys========= a1
           keys========= a0
           ========x=== a0
           ========x=== a2  
           ========x=== a4
           ========x=== a6
           ========x=== a8




But flink on cluster,  mapstate not support transfer value, such as :


    .process(new KeyedProcessFunction<String, Tuple2<String, Object>, String>() 
{
            ...
                @Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(Time.days(7))
.cleanupInBackground()
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)//default
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.build();
MapStateDescriptor noMatchDescriptor = new MapStateDescriptor<String, 
List<Object>>(
"kuduError",
BasicTypeInfo.STRING_TYPE_INFO,
TypeInformation.of(new TypeHint<List<Object>>() {
})
);
...
}
...
@Override
public void processElement(Tuple2<String, Object> stringObjectTuple2, Context 
context, Collector<String> collector) throws Exception {
...
List<Object> objectList = new ArrayList<>();
if (mapState.contains(key)) {
objectList = mapState.get(key);
}else{
mapState.put(key,objectList);
}
logger.info("key{} add before objectList.size {}",key,objectList.size());
objectList.add(stringObjectTuple2.f1);
logger.info("key{} add after objectList.size {}",key,objectList.size());
objectList = mapState.get(key);
logger.info("key{} mapState objectList.size {}",key,objectList.size());
        ...
                        }


If  I run this code in IDEA support transfer value, this log detail:
 
     keyorder2infos add before objectList.size 0
     keyorder2infos  add after objectList.size  1
     keyorder2infos mapState objectList.size 1   
                                           
    keyorder2infos add before objectList.size 1
    keyorder2infos  add after objectList.size  2
    keyorder2infos mapState objectList.size 2
     …  




But if run it on yarn-cluster not support transfer value ,this log detail:


    keyorder2infos add before objectList.size 0
    keyorder2infos  add after objectList.size  1
    keyorder2infos mapState objectList.size 0    
                                          
    keyorder2infos add before objectList.size 0
    keyorder2infos  add after objectList.size  1
    keyorder2infos mapState objectList.size 0
    …




So, I want to konw :
1.This is a bug or my code has some wrong?
2.Why mapstate on yarn-cluster  not support transfer value?  I have seen flink  
source code ,but not find.




It runs on:
    Flink 1.9.0
    Java 1.8 
    Hadoop 2.6.0




Best,
Shengjk1



Reply via email to