Hi, Dian
Thanks for your reminder.  I saw HeapMaoState.java and TtlMapState.java, You 
are right. 


But  with objectList get large, every time call "mapState.put(key,objectList);” 
 is very influencing performance, even lead to checkpoint timeout. Now I am not 
have a better method to improve performance.  Maybe I need to redesign my 
program.


Best,
Shengjk1




On 09/19/2019 20:08,Dian Fu<dian0511...@gmail.com> wrote:


Hi Shengjk1,


You should call "mapState.put(key,objectList);" manually after calling 
"objectList.add(stringObjectTuple2.f1);" to write it to the state backend. This 
is because objectList is just a common Java list object and it will not be 
synced to state backend automatic when updated.


I guess the result you see is because heap statebackend is used when running in 
IDE and rocksdb statebackend is used when running in YARN mode. You can check 
the configuration if it's the case.


Regards,
Dian


在 2019年9月19日,下午7:42,shengjk1 <jsjsjs1...@163.com> 写道:


Hi,all


As we know java map support transfer value,such as :


               HashMap<String, List<String>> stringListHashMap = new 
HashMap<>();
for (int i = 0; i < 10; i++) {
List<String> a = stringListHashMap.get("a"+i%2);
if (a==null){
a=new ArrayList<>();
stringListHashMap.put("a"+i%2,a);
}
a.add("a"+i);
}
stringListHashMap.keySet().forEach(x-> System.out.println("keys========= "+x));
stringListHashMap.get("a0").forEach(x-> System.out.println("========x=== "+x));


 Result:
   keys========= a1
           keys========= a0
           ========x=== a0
           ========x=== a2  
           ========x=== a4
           ========x=== a6
           ========x=== a8




But flink on cluster,  mapstate not support transfer value, such as :


    .process(new KeyedProcessFunction<String, Tuple2<String, Object>, String>() 
{
            ...
                @Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(Time.days(7))
.cleanupInBackground()
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)//default
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.build();
MapStateDescriptor noMatchDescriptor = new MapStateDescriptor<String, 
List<Object>>(
"kuduError",
BasicTypeInfo.STRING_TYPE_INFO,
TypeInformation.of(new TypeHint<List<Object>>() {
})
);
...
}
...
@Override
public void processElement(Tuple2<String, Object> stringObjectTuple2, Context 
context, Collector<String> collector) throws Exception {
...
List<Object> objectList = new ArrayList<>();
if (mapState.contains(key)) {
objectList = mapState.get(key);
}else{
mapState.put(key,objectList);
}
logger.info("key{} add before objectList.size {}",key,objectList.size());
objectList.add(stringObjectTuple2.f1);
logger.info("key{} add after objectList.size {}",key,objectList.size());
objectList = mapState.get(key);
logger.info("key{} mapState objectList.size {}",key,objectList.size());
        ...
                        }


If  I run this code in IDEA support transfer value, this log detail:
 
     keyorder2infos add before objectList.size 0
     keyorder2infos  add after objectList.size  1
     keyorder2infos mapState objectList.size 1   
                                           
    keyorder2infos add before objectList.size 1
    keyorder2infos  add after objectList.size  2
    keyorder2infos mapState objectList.size 2
     …  




But if run it on yarn-cluster not support transfer value ,this log detail:


    keyorder2infos add before objectList.size 0
    keyorder2infos  add after objectList.size  1
    keyorder2infos mapState objectList.size 0    
                                          
    keyorder2infos add before objectList.size 0
    keyorder2infos  add after objectList.size  1
    keyorder2infos mapState objectList.size 0
    …




So, I want to konw :
1.This is a bug or my code has some wrong?
2.Why mapstate on yarn-cluster  not support transfer value?  I have seen flink  
source code ,but not find.




It runs on:
    Flink 1.9.0
    Java 1.8 
    Hadoop 2.6.0




Best,
Shengjk1







Reply via email to