Hi Alexis, I think your setup is fine, but probably Java type erasure makes Flink consider the two serializers as different. Could you try creating a MapStateDescriptor by explicitly providing serializers (constructed manually)?
Regards, Roman On Fri, Apr 8, 2022 at 10:01 AM Alexis Sarda-Espinosa <alexis.sarda-espin...@microfocus.com> wrote: > > Hi everyone, > > > > I have a ProcessWindowFunction that uses Global window state. It uses > MapState with a descriptor defined like this: > > > > MapStateDescriptor<Long, List<String>> msd = new MapStateDescriptor<>( > > "descriptorName", > > TypeInformation.of(Long.class), > > TypeInformation.of(new TypeHint<List<String>>() {}) > > ); > > > > Now I’m trying to access a checkpoint’s state data to read that (created with > RocksDB as backend in Flink 1.14.4). I have a WindowReaderFunction<MyPojo, > Integer, String, TimeWindow> that defines the same descriptor and calls this > in readWindow: > > > > MapState<Long, List<String>> mapState = > context.globalState().getMapState(msd); > > > > After loading the savepoint with EmbeddedRocksDBStateBackend(true), I try to > configure the reader function like this: > > > > savepoint > > .window(SlidingEventTimeWindows.of(Time.minutes(11L), > Time.minutes(1L))) > > .process( > > "my-uid", > > new StateReaderFunction(), > > Types.STRING, > > TypeInformation.of(MyPojo.class), > > Types.INT > > ) > > .print(); > > > > But I am getting this exception: > > > > Caused by: org.apache.flink.util.StateMigrationException: The new state > serializer > (org.apache.flink.api.common.typeutils.base.MapSerializer@a07c9263) must not > be incompatible with the old state serializer > (org.apache.flink.api.common.typeutils.base.MapSerializer@706b3103). > > > > Does someone know what I’m doing wrong in my setup? > > > > Regards, > > Alexis. > >