Hi,
I added a ttl to my state
*old version :*
 private lazy val stateDescriptor = new ValueStateDescriptor("foo",
Types.CASE_CLASS[DomainState])

*vs the new version *

@transient
private lazy val storeTtl = StateTtlConfig.newBuilder(90)
  .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
  .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
  .cleanupInRocksdbCompactFilter()
  .build()

  private lazy val stateDescriptor = {
    val des = new ValueStateDescriptor("foo", Types.CASE_CLASS[DomainState])
    des.enableTimeToLive(storeTtl)
    des
  }

*BUT when trying to restore from savepoint I am getting this error:*

java.lang.RuntimeException: Error while getting state
        at 
org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:62)
        at 
org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:144)
        ...


Caused by: org.apache.flink.util.StateMigrationException: The new
state serializer cannot be incompatible.
        at 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:527)
        at 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:475)
        at 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:613)
        at 
org.apache.flink.runtime.state.ttl.TtlStateFactory.createTtlStateContext(TtlStateFactory.java:197)
        at 
org.apache.flink.runtime.state.ttl.TtlStateFactory.createValueState(TtlStateFactory.java:137)
        at 
org.apache.flink.runtime.state.ttl.TtlStateFactory.createState(TtlStateFactory.java:126)
        at 
org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:71)
        at 
org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:286)
        at 
org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:335)
        at 
org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:124)
        at 
org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:60)
        ... 11 more


Do you have any idea how can I resolve it ?


Best wishes

Reply via email to