Hi David, Thanks for the response. I'm actually specifying --allowNonRestoredState while I submit the job to the yarn session but it still fails with the same error: StateMigrationException: The new state serializer cannot be incompatible.
Maybe we cannot resume from incremental checkpoint with state schema changes? BTW, I'm running it on Flink 1.10. I forgot to update it in the original thread. Thanks, Sivaprasanna On Thu, Jul 23, 2020 at 7:52 PM David Anderson <da...@alpinegizmo.com> wrote: > I believe this should work, with a couple of caveats: > > - You can't do this with unaligned checkpoints > - If you have dropped some state, you must specify --allowNonRestoredState > when you restart the job > > David > > On Wed, Jul 22, 2020 at 4:06 PM Sivaprasanna <sivaprasanna...@gmail.com> > wrote: > >> Hi, >> >> We are trying out state schema migration for one of our stateful >> pipelines. We use few Avro type states. Changes made to the job: >> 1. Updated the schema for one of the states (added a new 'boolean' >> field with default value). >> 2. Modified the code by removing a couple of ValueStates. >> >> To push these changes, I stopped the live job and resubmitted the new jar >> with the latest *checkpoint* path. However, the job failed with the >> following error: >> >> java.lang.RuntimeException: Error while getting state >> at >> org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:62) >> at >> org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:144) >> ... >> ... >> Caused by: org.apache.flink.util.StateMigrationException: The new state >> serializer cannot be incompatible. >> at >> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:543) >> >> at >> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:491) >> >> at >> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:652) >> >> I was going through the state schema evolution doc. The document mentions >> that we need to take a *savepoint* and restart the job with the savepoint >> path. We are using RocksDB backend with incremental checkpoint enabled. Can >> we not use the latest checkpoint available when we are dealing with state >> schema changes? >> >> Complete stacktrace is attached with this mail. >> >> - >> Sivaprasanna >> >