Hi Sivaprasanna
   I think state schema evolution can work for incremental checkpoint. And
I tried with a simple Pojo schema, It also works. maybe you need to check
the schema, from the exception stack, the schema before and after are
incompatible.

Best,
Congxian


Sivaprasanna <sivaprasanna...@gmail.com> 于2020年7月24日周五 上午12:06写道:

> Hi David,
>
> Thanks for the response. I'm actually specifying --allowNonRestoredState
> while I submit the job to the yarn session but it still fails with the same
> error:
> StateMigrationException: The new state serializer cannot be incompatible.
>
> Maybe we cannot resume from incremental checkpoint with state schema
> changes?
> BTW, I'm running it on Flink 1.10. I forgot to update it in the original
> thread.
>
> Thanks,
> Sivaprasanna
>
>
> On Thu, Jul 23, 2020 at 7:52 PM David Anderson <da...@alpinegizmo.com>
> wrote:
>
>> I believe this should work, with a couple of caveats:
>>
>> - You can't do this with unaligned checkpoints
>> - If you have dropped some state, you must specify
>> --allowNonRestoredState when you restart the job
>>
>> David
>>
>> On Wed, Jul 22, 2020 at 4:06 PM Sivaprasanna <sivaprasanna...@gmail.com>
>> wrote:
>>
>>> Hi,
>>>
>>> We are trying out state schema migration for one of our stateful
>>> pipelines. We use few Avro type states. Changes made to the job:
>>>     1. Updated the schema for one of the states (added a new 'boolean'
>>> field with default value).
>>>     2. Modified the code by removing a couple of ValueStates.
>>>
>>> To push these changes, I stopped the live job and resubmitted the new
>>> jar with the latest *checkpoint* path. However, the job failed with the
>>> following error:
>>>
>>> java.lang.RuntimeException: Error while getting state
>>>     at
>>> org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:62)
>>>     at
>>> org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:144)
>>>     ...
>>>     ...
>>> Caused by: org.apache.flink.util.StateMigrationException: The new state
>>> serializer cannot be incompatible.
>>>     at
>>> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:543)
>>>
>>>     at
>>> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:491)
>>>
>>>     at
>>> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:652)
>>>
>>> I was going through the state schema evolution doc. The document
>>> mentions that we need to take a *savepoint* and restart the job with the
>>> savepoint path. We are using RocksDB backend with incremental checkpoint
>>> enabled. Can we not use the latest checkpoint available when we are dealing
>>> with state schema changes?
>>>
>>> Complete stacktrace is attached with this mail.
>>>
>>> -
>>> Sivaprasanna
>>>
>>

Reply via email to