Thanks, Congxian & David. There was a mistake on the new schema we used.
After fixing that, we were able to migrate the state, and since we touched
important code blocks, and removed/refactored certain functionalities, we
took a savepoint instead of checkpoint. All good now. Thanks again : )

Sivaprasanna

On Fri, Jul 24, 2020 at 9:12 AM Congxian Qiu <qcx978132...@gmail.com> wrote:

> Hi Sivaprasanna
>    I think state schema evolution can work for incremental checkpoint. And
> I tried with a simple Pojo schema, It also works. maybe you need to check
> the schema, from the exception stack, the schema before and after are
> incompatible.
>
> Best,
> Congxian
>
>
> Sivaprasanna <sivaprasanna...@gmail.com> 于2020年7月24日周五 上午12:06写道:
>
>> Hi David,
>>
>> Thanks for the response. I'm actually specifying --allowNonRestoredState
>> while I submit the job to the yarn session but it still fails with the same
>> error:
>> StateMigrationException: The new state serializer cannot be incompatible.
>>
>> Maybe we cannot resume from incremental checkpoint with state schema
>> changes?
>> BTW, I'm running it on Flink 1.10. I forgot to update it in the original
>> thread.
>>
>> Thanks,
>> Sivaprasanna
>>
>>
>> On Thu, Jul 23, 2020 at 7:52 PM David Anderson <da...@alpinegizmo.com>
>> wrote:
>>
>>> I believe this should work, with a couple of caveats:
>>>
>>> - You can't do this with unaligned checkpoints
>>> - If you have dropped some state, you must specify
>>> --allowNonRestoredState when you restart the job
>>>
>>> David
>>>
>>> On Wed, Jul 22, 2020 at 4:06 PM Sivaprasanna <sivaprasanna...@gmail.com>
>>> wrote:
>>>
>>>> Hi,
>>>>
>>>> We are trying out state schema migration for one of our stateful
>>>> pipelines. We use few Avro type states. Changes made to the job:
>>>>     1. Updated the schema for one of the states (added a new 'boolean'
>>>> field with default value).
>>>>     2. Modified the code by removing a couple of ValueStates.
>>>>
>>>> To push these changes, I stopped the live job and resubmitted the new
>>>> jar with the latest *checkpoint* path. However, the job failed with the
>>>> following error:
>>>>
>>>> java.lang.RuntimeException: Error while getting state
>>>>     at
>>>> org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:62)
>>>>     at
>>>> org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:144)
>>>>     ...
>>>>     ...
>>>> Caused by: org.apache.flink.util.StateMigrationException: The new state
>>>> serializer cannot be incompatible.
>>>>     at
>>>> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:543)
>>>>
>>>>     at
>>>> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:491)
>>>>
>>>>     at
>>>> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:652)
>>>>
>>>> I was going through the state schema evolution doc. The document
>>>> mentions that we need to take a *savepoint* and restart the job with the
>>>> savepoint path. We are using RocksDB backend with incremental checkpoint
>>>> enabled. Can we not use the latest checkpoint available when we are dealing
>>>> with state schema changes?
>>>>
>>>> Complete stacktrace is attached with this mail.
>>>>
>>>> -
>>>> Sivaprasanna
>>>>
>>>

Reply via email to