No problem :)

I wasn’t able to find documentation on what can and cannot be upgraded for case 
classes. I had assumed the same rules that applied to POJO scheme upgrading 
applied to case classes. Has someone put together rules for case classes? I 
also should have mentioned we are running 1.9 Flink.



Sent from my iPhone

> On Oct 8, 2019, at 3:03 PM, Aleksandar Mastilovic 
> <amastilo...@sightmachine.com> wrote:
> 
> I’m pretty sure java.util.Optional is not serializable: 
> https://stackoverflow.com/questions/24547673/why-java-util-optional-is-not-serializable-how-to-serialize-the-object-with-suc
> 
> However on a second look I can now see you’re using Scala’s Option, which IS 
> serializable :) My apologies for that.
> 
> So your problem was that you had previous version in your save point which of 
> course cannot be deserialized into the new version without custom code that 
> would handle a missing Option.
> 
>> On Oct 8, 2019, at 11:38 AM, Steven Nelson <snel...@sourceallies.com> wrote:
>> 
>> Are you sure? I just restarted the job with new new version, but not from a 
>> savepoint and took a new savepoint and it seemed to work from there. It just 
>> seemed like it couldn’t upgrade the schema during restore.
>> 
>> Sent from my iPhone
>> 
>>> On Oct 8, 2019, at 1:22 PM, Aleksandar Mastilovic 
>>> <amastilo...@sightmachine.com> wrote:
>>> 
>>> The Option class is not serializable, if you put something serializable 
>>> into that case class you wouldn’t have problems.
>>> 
>>>> On Oct 8, 2019, at 8:17 AM, Steven Nelson <snel...@sourceallies.com> wrote:
>>>> 
>>>> Hello! We are working with a Scala based pipeline.
>>>> 
>>>> We changed 
>>>> 
>>>> case class Record(orgId: Int)
>>>> 
>>>> To
>>>> 
>>>> case class Record(orgId: Int, operationId:Option[String] = None)
>>>> 
>>>> And now our savepoints fail with this exception:
>>>> org.apache.flink.util.StateMigrationException: The new state serializer 
>>>> cannot be incompatible.
>>>>     at 
>>>> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:534)
>>>>     at 
>>>> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:482)
>>>>     at 
>>>> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:643)
>>>>     at 
>>>> org.apache.flink.runtime.state.KeyedStateFactory.createInternalState(KeyedStateFactory.java:47)
>>>>     at 
>>>> org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:72)
>>>>     at 
>>>> org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:279)
>>>>     at 
>>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.getOrCreateKeyedState(AbstractStreamOperator.java:577)
>>>>     at 
>>>> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.open(WindowOperator.java:240)
>>>>     at 
>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:529)
>>>>     at 
>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:393)
>>>>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
>>>>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
>>>>     at java.lang.Thread.run(Thread.java:748)
>>>> 
>>>> I was under the impression we could add items to case classes and still be 
>>>> able to use existing state to start the job.
>>>> 
>>>> -Steve
>>>> 
>>> 
> 

Reply via email to