Hi,

I run simple streaming job where I compute hourly impressions for campaigns:

.keyBy(imp => imp.campaign_id)
.window(TumblingEventTimeWindows.of(...))
.aggregate(new BudgetSpendingByImpsAggregateFunction(), new
BudgetSpendingByImpsWindowFunction())

Where aggregate function just sums impressions:

class BudgetSpendingByImpsAggregateFunction extends
AggregateFunction[ImpressionEvent, BudgetSpending, BudgetSpending]{
    override def add(value: ImpressionEvent, accumulator: BudgetSpending):
BudgetSpending = {
        accumulator + value
    }
...
}

BudgetSpending is just simple scala case class accumulator 

case class BudgetSpending(var impressions: Int = 0){
    def +(imp: ImpressionEvent): BudgetSpending =_
}


I am trying to add a new counter to BudgetSpending accumulator class:

@SerialVersionUID(-7854299638715463891L)
case class BudgetSpending(var impressions: Int = 0, var spent: Double = 0)

But when I run the job with savepoint from previous version I get error:

java.lang.Exception: Exception while creating StreamOperatorStateContext.
        at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:191)
        at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:227)
        at
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:730)
        at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:295)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
        at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.flink.util.FlinkException: Could not restore keyed
state backend for WindowOperator_74af6afa20f38ce575bfc2d1386aa434_(1/2) from
any of the 1 provided restore options.
        at
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:137)
        at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:276)
        at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:132)
        ... 5 more
Caused by: java.lang.ArrayIndexOutOfBoundsException: 2
        at
ga.p2502.streams.flink.budget.spending.HourlyBudgetSpendingJob$$anon$18$$anon$8.createInstance(HourlyBudgetSpendingJob.scala:60)
        at
ga.p2502.streams.flink.budget.spending.HourlyBudgetSpendingJob$$anon$18$$anon$8.createInstance(HourlyBudgetSpendingJob.scala:60)
        at
org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:133)
        at
org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:32)
        at
org.apache.flink.runtime.state.heap.StateTableByKeyGroupReaders$StateTableByKeyGroupReaderV2V3.readMappingsInKeyGroup(StateTableByKeyGroupReaders.java:133)
        at
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restorePartitionedState(HeapKeyedStateBackend.java:430)
        at
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restore(HeapKeyedStateBackend.java:315)
        at
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restore(HeapKeyedStateBackend.java:95)
        at
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:151)
        at
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:123)
        ... 7 more


How I can solve this problem? Thanks a lot. 




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Reply via email to