Hi Roman, Thanks for the quick response. It wasn't that, but your comment about erasure made me realize I should have debugged the code and looked at the types. Apparently setting TTL changes the serializer, so I also had to add TTL in the WindowReaderFunction.
Regards, Alexis. -----Original Message----- From: Roman Khachatryan <ro...@apache.org> Sent: Freitag, 8. April 2022 11:48 To: Alexis Sarda-Espinosa <alexis.sarda-espin...@microfocus.com> Cc: user@flink.apache.org Subject: Re: Using state processor API to read state defined with a TypeHint Hi Alexis, I think your setup is fine, but probably Java type erasure makes Flink consider the two serializers as different. Could you try creating a MapStateDescriptor by explicitly providing serializers (constructed manually)? Regards, Roman On Fri, Apr 8, 2022 at 10:01 AM Alexis Sarda-Espinosa <alexis.sarda-espin...@microfocus.com> wrote: > > Hi everyone, > > > > I have a ProcessWindowFunction that uses Global window state. It uses > MapState with a descriptor defined like this: > > > > MapStateDescriptor<Long, List<String>> msd = new MapStateDescriptor<>( > > "descriptorName", > > TypeInformation.of(Long.class), > > TypeInformation.of(new TypeHint<List<String>>() {}) > > ); > > > > Now I’m trying to access a checkpoint’s state data to read that (created with > RocksDB as backend in Flink 1.14.4). I have a WindowReaderFunction<MyPojo, > Integer, String, TimeWindow> that defines the same descriptor and calls this > in readWindow: > > > > MapState<Long, List<String>> mapState = > context.globalState().getMapState(msd); > > > > After loading the savepoint with EmbeddedRocksDBStateBackend(true), I try to > configure the reader function like this: > > > > savepoint > > .window(SlidingEventTimeWindows.of(Time.minutes(11L), > Time.minutes(1L))) > > .process( > > "my-uid", > > new StateReaderFunction(), > > Types.STRING, > > TypeInformation.of(MyPojo.class), > > Types.INT > > ) > > .print(); > > > > But I am getting this exception: > > > > Caused by: org.apache.flink.util.StateMigrationException: The new state > serializer > (org.apache.flink.api.common.typeutils.base.MapSerializer@a07c9263) must not > be incompatible with the old state serializer > (org.apache.flink.api.common.typeutils.base.MapSerializer@706b3103). > > > > Does someone know what I’m doing wrong in my setup? > > > > Regards, > > Alexis. > >