Hi,

We are trying out state schema migration for one of our stateful pipelines.
We use few Avro type states. Changes made to the job:
    1. Updated the schema for one of the states (added a new 'boolean'
field with default value).
    2. Modified the code by removing a couple of ValueStates.

To push these changes, I stopped the live job and resubmitted the new jar
with the latest *checkpoint* path. However, the job failed with the
following error:

java.lang.RuntimeException: Error while getting state
    at
org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:62)
    at
org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:144)
    ...
    ...
Caused by: org.apache.flink.util.StateMigrationException: The new state
serializer cannot be incompatible.
    at
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:543)

    at
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:491)

    at
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:652)

I was going through the state schema evolution doc. The document mentions
that we need to take a *savepoint* and restart the job with the savepoint
path. We are using RocksDB backend with incremental checkpoint enabled. Can
we not use the latest checkpoint available when we are dealing with state
schema changes?

Complete stacktrace is attached with this mail.

-
Sivaprasanna

Attachment: job_manager.log
Description: Binary data

Reply via email to