Hi everyone,

I have a ProcessWindowFunction that uses Global window state. It uses MapState 
with a descriptor defined like this:

MapStateDescriptor<Long, List<String>> msd = new MapStateDescriptor<>(
        "descriptorName",
        TypeInformation.of(Long.class),
        TypeInformation.of(new TypeHint<List<String>>() {})
);

Now I'm trying to access a checkpoint's state data to read that (created with 
RocksDB as backend in Flink 1.14.4). I have a WindowReaderFunction<MyPojo, 
Integer, String, TimeWindow> that defines the same descriptor and calls this in 
readWindow:

MapState<Long, List<String>> mapState = context.globalState().getMapState(msd);

After loading the savepoint with EmbeddedRocksDBStateBackend(true), I try to 
configure the reader function like this:

savepoint
        .window(SlidingEventTimeWindows.of(Time.minutes(11L), Time.minutes(1L)))
        .process(
                "my-uid",
                new StateReaderFunction(),
                Types.STRING,
                TypeInformation.of(MyPojo.class),
                Types.INT
        )
        .print();

But I am getting this exception:

Caused by: org.apache.flink.util.StateMigrationException: The new state 
serializer (org.apache.flink.api.common.typeutils.base.MapSerializer@a07c9263) 
must not be incompatible with the old state serializer 
(org.apache.flink.api.common.typeutils.base.MapSerializer@706b3103).

Does someone know what I'm doing wrong in my setup?

Regards,
Alexis.

Reply via email to