Hi, I run simple streaming job where I compute hourly impressions for campaigns:
.keyBy(imp => imp.campaign_id) .window(TumblingEventTimeWindows.of(...)) .aggregate(new BudgetSpendingByImpsAggregateFunction(), new BudgetSpendingByImpsWindowFunction()) Where aggregate function just sums impressions: class BudgetSpendingByImpsAggregateFunction extends AggregateFunction[ImpressionEvent, BudgetSpending, BudgetSpending]{ override def add(value: ImpressionEvent, accumulator: BudgetSpending): BudgetSpending = { accumulator + value } ... } BudgetSpending is just simple scala case class accumulator case class BudgetSpending(var impressions: Int = 0){ def +(imp: ImpressionEvent): BudgetSpending =_ } I am trying to add a new counter to BudgetSpending accumulator class: @SerialVersionUID(-7854299638715463891L) case class BudgetSpending(var impressions: Int = 0, var spent: Double = 0) But when I run the job with savepoint from previous version I get error: java.lang.Exception: Exception while creating StreamOperatorStateContext. at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:191) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:227) at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:730) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:295) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703) at java.lang.Thread.run(Thread.java:745) Caused by: org.apache.flink.util.FlinkException: Could not restore keyed state backend for WindowOperator_74af6afa20f38ce575bfc2d1386aa434_(1/2) from any of the 1 provided restore options. at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:137) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:276) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:132) ... 5 more Caused by: java.lang.ArrayIndexOutOfBoundsException: 2 at ga.p2502.streams.flink.budget.spending.HourlyBudgetSpendingJob$$anon$18$$anon$8.createInstance(HourlyBudgetSpendingJob.scala:60) at ga.p2502.streams.flink.budget.spending.HourlyBudgetSpendingJob$$anon$18$$anon$8.createInstance(HourlyBudgetSpendingJob.scala:60) at org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:133) at org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:32) at org.apache.flink.runtime.state.heap.StateTableByKeyGroupReaders$StateTableByKeyGroupReaderV2V3.readMappingsInKeyGroup(StateTableByKeyGroupReaders.java:133) at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restorePartitionedState(HeapKeyedStateBackend.java:430) at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restore(HeapKeyedStateBackend.java:315) at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restore(HeapKeyedStateBackend.java:95) at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:151) at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:123) ... 7 more How I can solve this problem? Thanks a lot. -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/