Hi, Adding a new field to a case class breaks serialisation format in savepoint at the moment and requires state migration which is currently not supported in Flink implicitly.
Although, I would expect the failure earlier while performing the compatibility check upon restore. According to the source code, the compatibility check in CaseClassSerializer is inherited from TupleSerializerBase which does not check the change of field number, because I think it is not expected in a certain tuple class. I will cc Gordon and Aljoscha, maybe they will add more details. Best, Andrey > On 14 Sep 2018, at 09:48, tisonet <tis...@seznam.cz> wrote: > > Hi, > > I run simple streaming job where I compute hourly impressions for campaigns: > > .keyBy(imp => imp.campaign_id) > .window(TumblingEventTimeWindows.of(...)) > .aggregate(new BudgetSpendingByImpsAggregateFunction(), new > BudgetSpendingByImpsWindowFunction()) > > Where aggregate function just sums impressions: > > class BudgetSpendingByImpsAggregateFunction extends > AggregateFunction[ImpressionEvent, BudgetSpending, BudgetSpending]{ > override def add(value: ImpressionEvent, accumulator: BudgetSpending): > BudgetSpending = { > accumulator + value > } > ... > } > > BudgetSpending is just simple scala case class accumulator > > case class BudgetSpending(var impressions: Int = 0){ > def +(imp: ImpressionEvent): BudgetSpending =_ > } > > > I am trying to add a new counter to BudgetSpending accumulator class: > > @SerialVersionUID(-7854299638715463891L) > case class BudgetSpending(var impressions: Int = 0, var spent: Double = 0) > > But when I run the job with savepoint from previous version I get error: > > java.lang.Exception: Exception while creating StreamOperatorStateContext. > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:191) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:227) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:730) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:295) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703) > at java.lang.Thread.run(Thread.java:745) > Caused by: org.apache.flink.util.FlinkException: Could not restore keyed > state backend for WindowOperator_74af6afa20f38ce575bfc2d1386aa434_(1/2) from > any of the 1 provided restore options. > at > org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:137) > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:276) > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:132) > ... 5 more > Caused by: java.lang.ArrayIndexOutOfBoundsException: 2 > at > ga.p2502.streams.flink.budget.spending.HourlyBudgetSpendingJob$$anon$18$$anon$8.createInstance(HourlyBudgetSpendingJob.scala:60) > at > ga.p2502.streams.flink.budget.spending.HourlyBudgetSpendingJob$$anon$18$$anon$8.createInstance(HourlyBudgetSpendingJob.scala:60) > at > org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:133) > at > org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:32) > at > org.apache.flink.runtime.state.heap.StateTableByKeyGroupReaders$StateTableByKeyGroupReaderV2V3.readMappingsInKeyGroup(StateTableByKeyGroupReaders.java:133) > at > org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restorePartitionedState(HeapKeyedStateBackend.java:430) > at > org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restore(HeapKeyedStateBackend.java:315) > at > org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restore(HeapKeyedStateBackend.java:95) > at > org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:151) > at > org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:123) > ... 7 more > > > How I can solve this problem? Thanks a lot. > > > > > -- > Sent from: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/