Hi Shengjk1, You should call "mapState.put(key,objectList);" manually after calling "objectList.add(stringObjectTuple2.f1);" to write it to the state backend. This is because objectList is just a common Java list object and it will not be synced to state backend automatic when updated.
Regards, Dian > 在 2019年9月19日,下午6:59,shengjk1 <jsjsjs1...@163.com> 写道: > > Hi,all > > This is my code > > .process(new KeyedProcessFunction<String, Tuple2<String, Object>, String>() { > ... > @Override > public void open(Configuration parameters) throws Exception { > super.open(parameters); > > StateTtlConfig ttlConfig = StateTtlConfig > .newBuilder(Time.days(7)) > .cleanupInBackground() > > .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)//default > > .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired) > .build(); > > MapStateDescriptor noMatchDescriptor = new > MapStateDescriptor<String, List<Object>>( > "kuduError", > BasicTypeInfo.STRING_TYPE_INFO, > TypeInformation.of(new > TypeHint<List<Object>>() { > }) > ); > ... > } > ... > > @Override > public void processElement(Tuple2<String, Object> stringObjectTuple2, > Context context, Collector<String> collector) throws Exception { > ... > List<Object> objectList = new ArrayList<>(); > if (mapState.contains(key)) { > objectList = mapState.get(key); > }else{ > mapState.put(key,objectList); > } > logger.info <http://logger.info/>("key{} add > before objectList.size{}",key,objectList.size()); > objectList.add(stringObjectTuple2.f1); > logger.info <http://logger.info/>("key{} add > after objectList.size{}",key,objectList.size()); > objectList = mapState.get(key); > logger.info <http://logger.info/>("key{} > mapState objectList.size{}",key,objectList.size()); > ... > } > > > If I run this in idea the log detail: > > > Best, > Shengjk1 > > >