No problem :) I wasn’t able to find documentation on what can and cannot be upgraded for case classes. I had assumed the same rules that applied to POJO scheme upgrading applied to case classes. Has someone put together rules for case classes? I also should have mentioned we are running 1.9 Flink.
Sent from my iPhone > On Oct 8, 2019, at 3:03 PM, Aleksandar Mastilovic > <amastilo...@sightmachine.com> wrote: > > I’m pretty sure java.util.Optional is not serializable: > https://stackoverflow.com/questions/24547673/why-java-util-optional-is-not-serializable-how-to-serialize-the-object-with-suc > > However on a second look I can now see you’re using Scala’s Option, which IS > serializable :) My apologies for that. > > So your problem was that you had previous version in your save point which of > course cannot be deserialized into the new version without custom code that > would handle a missing Option. > >> On Oct 8, 2019, at 11:38 AM, Steven Nelson <snel...@sourceallies.com> wrote: >> >> Are you sure? I just restarted the job with new new version, but not from a >> savepoint and took a new savepoint and it seemed to work from there. It just >> seemed like it couldn’t upgrade the schema during restore. >> >> Sent from my iPhone >> >>> On Oct 8, 2019, at 1:22 PM, Aleksandar Mastilovic >>> <amastilo...@sightmachine.com> wrote: >>> >>> The Option class is not serializable, if you put something serializable >>> into that case class you wouldn’t have problems. >>> >>>> On Oct 8, 2019, at 8:17 AM, Steven Nelson <snel...@sourceallies.com> wrote: >>>> >>>> Hello! We are working with a Scala based pipeline. >>>> >>>> We changed >>>> >>>> case class Record(orgId: Int) >>>> >>>> To >>>> >>>> case class Record(orgId: Int, operationId:Option[String] = None) >>>> >>>> And now our savepoints fail with this exception: >>>> org.apache.flink.util.StateMigrationException: The new state serializer >>>> cannot be incompatible. >>>> at >>>> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:534) >>>> at >>>> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:482) >>>> at >>>> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:643) >>>> at >>>> org.apache.flink.runtime.state.KeyedStateFactory.createInternalState(KeyedStateFactory.java:47) >>>> at >>>> org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:72) >>>> at >>>> org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:279) >>>> at >>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.getOrCreateKeyedState(AbstractStreamOperator.java:577) >>>> at >>>> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.open(WindowOperator.java:240) >>>> at >>>> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:529) >>>> at >>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:393) >>>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705) >>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530) >>>> at java.lang.Thread.run(Thread.java:748) >>>> >>>> I was under the impression we could add items to case classes and still be >>>> able to use existing state to start the job. >>>> >>>> -Steve >>>> >>> >