Hi David,

Thanks for the response. I'm actually specifying --allowNonRestoredState
while I submit the job to the yarn session but it still fails with the same
error:
StateMigrationException: The new state serializer cannot be incompatible.

Maybe we cannot resume from incremental checkpoint with state schema
changes?
BTW, I'm running it on Flink 1.10. I forgot to update it in the original
thread.

Thanks,
Sivaprasanna


On Thu, Jul 23, 2020 at 7:52 PM David Anderson <da...@alpinegizmo.com>
wrote:

> I believe this should work, with a couple of caveats:
>
> - You can't do this with unaligned checkpoints
> - If you have dropped some state, you must specify --allowNonRestoredState
> when you restart the job
>
> David
>
> On Wed, Jul 22, 2020 at 4:06 PM Sivaprasanna <sivaprasanna...@gmail.com>
> wrote:
>
>> Hi,
>>
>> We are trying out state schema migration for one of our stateful
>> pipelines. We use few Avro type states. Changes made to the job:
>>     1. Updated the schema for one of the states (added a new 'boolean'
>> field with default value).
>>     2. Modified the code by removing a couple of ValueStates.
>>
>> To push these changes, I stopped the live job and resubmitted the new jar
>> with the latest *checkpoint* path. However, the job failed with the
>> following error:
>>
>> java.lang.RuntimeException: Error while getting state
>>     at
>> org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:62)
>>     at
>> org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:144)
>>     ...
>>     ...
>> Caused by: org.apache.flink.util.StateMigrationException: The new state
>> serializer cannot be incompatible.
>>     at
>> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:543)
>>
>>     at
>> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:491)
>>
>>     at
>> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:652)
>>
>> I was going through the state schema evolution doc. The document mentions
>> that we need to take a *savepoint* and restart the job with the savepoint
>> path. We are using RocksDB backend with incremental checkpoint enabled. Can
>> we not use the latest checkpoint available when we are dealing with state
>> schema changes?
>>
>> Complete stacktrace is attached with this mail.
>>
>> -
>> Sivaprasanna
>>
>

Reply via email to