Hi Sivaprasanna I think state schema evolution can work for incremental checkpoint. And I tried with a simple Pojo schema, It also works. maybe you need to check the schema, from the exception stack, the schema before and after are incompatible.
Best, Congxian Sivaprasanna <sivaprasanna...@gmail.com> 于2020年7月24日周五 上午12:06写道: > Hi David, > > Thanks for the response. I'm actually specifying --allowNonRestoredState > while I submit the job to the yarn session but it still fails with the same > error: > StateMigrationException: The new state serializer cannot be incompatible. > > Maybe we cannot resume from incremental checkpoint with state schema > changes? > BTW, I'm running it on Flink 1.10. I forgot to update it in the original > thread. > > Thanks, > Sivaprasanna > > > On Thu, Jul 23, 2020 at 7:52 PM David Anderson <da...@alpinegizmo.com> > wrote: > >> I believe this should work, with a couple of caveats: >> >> - You can't do this with unaligned checkpoints >> - If you have dropped some state, you must specify >> --allowNonRestoredState when you restart the job >> >> David >> >> On Wed, Jul 22, 2020 at 4:06 PM Sivaprasanna <sivaprasanna...@gmail.com> >> wrote: >> >>> Hi, >>> >>> We are trying out state schema migration for one of our stateful >>> pipelines. We use few Avro type states. Changes made to the job: >>> 1. Updated the schema for one of the states (added a new 'boolean' >>> field with default value). >>> 2. Modified the code by removing a couple of ValueStates. >>> >>> To push these changes, I stopped the live job and resubmitted the new >>> jar with the latest *checkpoint* path. However, the job failed with the >>> following error: >>> >>> java.lang.RuntimeException: Error while getting state >>> at >>> org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:62) >>> at >>> org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:144) >>> ... >>> ... >>> Caused by: org.apache.flink.util.StateMigrationException: The new state >>> serializer cannot be incompatible. >>> at >>> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:543) >>> >>> at >>> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:491) >>> >>> at >>> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:652) >>> >>> I was going through the state schema evolution doc. The document >>> mentions that we need to take a *savepoint* and restart the job with the >>> savepoint path. We are using RocksDB backend with incremental checkpoint >>> enabled. Can we not use the latest checkpoint available when we are dealing >>> with state schema changes? >>> >>> Complete stacktrace is attached with this mail. >>> >>> - >>> Sivaprasanna >>> >>