Are you sure? I just restarted the job with new new version, but not from a 
savepoint and took a new savepoint and it seemed to work from there. It just 
seemed like it couldn’t upgrade the schema during restore.

Sent from my iPhone

> On Oct 8, 2019, at 1:22 PM, Aleksandar Mastilovic 
> <amastilo...@sightmachine.com> wrote:
> 
> The Option class is not serializable, if you put something serializable into 
> that case class you wouldn’t have problems.
> 
>> On Oct 8, 2019, at 8:17 AM, Steven Nelson <snel...@sourceallies.com> wrote:
>> 
>> Hello! We are working with a Scala based pipeline.
>> 
>> We changed 
>> 
>> case class Record(orgId: Int)
>> 
>> To
>> 
>> case class Record(orgId: Int, operationId:Option[String] = None)
>> 
>> And now our savepoints fail with this exception:
>> org.apache.flink.util.StateMigrationException: The new state serializer 
>> cannot be incompatible.
>>     at 
>> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:534)
>>     at 
>> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:482)
>>     at 
>> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:643)
>>     at 
>> org.apache.flink.runtime.state.KeyedStateFactory.createInternalState(KeyedStateFactory.java:47)
>>     at 
>> org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:72)
>>     at 
>> org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:279)
>>     at 
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.getOrCreateKeyedState(AbstractStreamOperator.java:577)
>>     at 
>> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.open(WindowOperator.java:240)
>>     at 
>> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:529)
>>     at 
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:393)
>>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
>>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
>>     at java.lang.Thread.run(Thread.java:748)
>> 
>> I was under the impression we could add items to case classes and still be 
>> able to use existing state to start the job.
>> 
>> -Steve
>> 
> 

Reply via email to