Hi Shengjk1,

You should call "mapState.put(key,objectList);" manually after calling 
"objectList.add(stringObjectTuple2.f1);" to write it to the state backend. This 
is because objectList is just a common Java list object and it will not be 
synced to state backend automatic when updated.

Regards,
Dian

> 在 2019年9月19日,下午6:59,shengjk1 <jsjsjs1...@163.com> 写道:
> 
> Hi,all
> 
> This is my code 
> 
> .process(new KeyedProcessFunction<String, Tuple2<String, Object>, String>() {
>         ...
>               @Override
> public void open(Configuration parameters) throws Exception {
>                               super.open(parameters);
>                               
>                               StateTtlConfig ttlConfig = StateTtlConfig
>                                               .newBuilder(Time.days(7))
>                                               .cleanupInBackground()
>                                               
> .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)//default
>                                               
> .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
>                                               .build();
>                               
>                               MapStateDescriptor noMatchDescriptor = new 
> MapStateDescriptor<String, List<Object>>(
>                                               "kuduError",
>                                               BasicTypeInfo.STRING_TYPE_INFO,
>                                               TypeInformation.of(new 
> TypeHint<List<Object>>() {
>                                               })
>                               );
>                               ...
>                       }
>                       ...
>                       
>                       @Override
>       public void processElement(Tuple2<String, Object> stringObjectTuple2, 
> Context context, Collector<String> collector) throws Exception {
>                               ...
>                               List<Object> objectList = new ArrayList<>();
>                               if (mapState.contains(key)) {
>                                       objectList = mapState.get(key);
>                               }else{
>                                       mapState.put(key,objectList);
>                               }
>                               logger.info <http://logger.info/>("key{} add 
> before objectList.size{}",key,objectList.size());
>                               objectList.add(stringObjectTuple2.f1);
>                               logger.info <http://logger.info/>("key{} add 
> after objectList.size{}",key,objectList.size());
>                               objectList = mapState.get(key);
>                               logger.info <http://logger.info/>("key{} 
> mapState objectList.size{}",key,objectList.size());
>                                 ...
>               }
> 
> 
> If I run this in idea  the  log detail:
> 
> 
> Best,
> Shengjk1
> 
> 
> 

Reply via email to