Hello, I'm using Flink 1.17.1 and I have stateTTL enabled in one of my Flink jobs where I'm using the RocksDB for checkpointing. I have a value state of Pojo class (which is generated from Avro schema). I added a new field to my schema along with the default value to make sure it is backwards compatible, however when I redeployed the job, I got StateMigrationException. I have similar setup with other Flink jobs where adding a column doesn't cause any trouble.
This is my stateTTL config: StateTtlConfig .newBuilder(Time.days(7)) .cleanupInRocksdbCompactFilter(1000) .build This is how I enable it: val myStateDescriptor: ValueStateDescriptor[MyPojoClass] = new ValueStateDescriptor[MyPojoClass]( "test-name", classOf[MyPojoClass]) myStateDescriptor.enableTimeToLive(initStateTTLConfig()) This is the exception I end up with: Caused by: org.apache.flink.util.StateMigrationException: The new state serializer (org.apache.flink.runtime.state.ttl.TtlStateFactory$TtlSerializer@a13bf51) must not be incompatible with the old state serializer (org.apache.flink.runtime.state.ttl.TtlStateFactory$TtlSerializer@a13bf51). at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:755) at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:667) at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createOrUpdateInternalState(RocksDBKeyedStateBackend.java:883) at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createOrUpdateInternalState(RocksDBKeyedStateBackend.java:870) at org.apache.flink.runtime.state.ttl.TtlStateFactory.createTtlStateContext(TtlStateFactory.java:222) at org.apache.flink.runtime.state.ttl.TtlStateFactory.createValueState(TtlStateFactory.java:145) at org.apache.flink.runtime.state.ttl.TtlStateFactory.createState(TtlStateFactory.java:129) at org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:69) at org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:362) at org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:413) at org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:115) at org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:60) ... 25 more Does anyone know what is causing the issue? Cheers, Irakli