Hi, I added a ttl to my state *old version :* private lazy val stateDescriptor = new ValueStateDescriptor("foo", Types.CASE_CLASS[DomainState])
*vs the new version * @transient private lazy val storeTtl = StateTtlConfig.newBuilder(90) .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite) .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired) .cleanupInRocksdbCompactFilter() .build() private lazy val stateDescriptor = { val des = new ValueStateDescriptor("foo", Types.CASE_CLASS[DomainState]) des.enableTimeToLive(storeTtl) des } *BUT when trying to restore from savepoint I am getting this error:* java.lang.RuntimeException: Error while getting state at org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:62) at org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:144) ... Caused by: org.apache.flink.util.StateMigrationException: The new state serializer cannot be incompatible. at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:527) at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:475) at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:613) at org.apache.flink.runtime.state.ttl.TtlStateFactory.createTtlStateContext(TtlStateFactory.java:197) at org.apache.flink.runtime.state.ttl.TtlStateFactory.createValueState(TtlStateFactory.java:137) at org.apache.flink.runtime.state.ttl.TtlStateFactory.createState(TtlStateFactory.java:126) at org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:71) at org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:286) at org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:335) at org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:124) at org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:60) ... 11 more Do you have any idea how can I resolve it ? Best wishes