Hello everyone:

    I defined a UDAF function when I am using the  FLINK TABLE API to
achieve the aggregation operation. There is no problem with the task
running from beginning in cluster. But it throws an exception when it is
restart task from checkpoint,How can I resolve it ?

java.lang.RuntimeException: Error while getting state
at
org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:62)
at
org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:144)
at
org.apache.flink.table.runtime.aggregate.GroupAggProcessFunction.open(GroupAggProcessFunction.scala:74)
at
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
at
org.apache.flink.streaming.api.operators.LegacyKeyedProcessOperator.open(LegacyKeyedProcessOperator.java:60)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.util.StateMigrationException: For heap
backends, the new state serializer must not be incompatible.
at
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.tryRegisterStateTable(HeapKeyedStateBackend.java:227)
at
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.createInternalState(HeapKeyedStateBackend.java:270)
at
org.apache.flink.runtime.state.KeyedStateFactory.createInternalState(KeyedStateFactory.java:47)
at
org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:72)
at
org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:286)
at
org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:335)
at
org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:124)
at
org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:60)
... 9 more
groupBy: (risk_id, dev_only_id), select: (risk_id, dev_only_id,
UpdateColumn(org_name, evaluation_time_millis) AS TMP_586,
UpdateColumn(rule_weight_sum, evaluation_time_millis) AS TMP_581,
UpdateColumn(area_code, evaluation_time_millis) AS TMP_583, SUM(rule_risk)
AS TMP_580, UpdateColumn(ip, evaluation_time_millis) AS TMP_587,
UpdateColumn(evaluation_time_millis, evaluation_time_millis) AS TMP_588,
UpdateColumn(area_name, evaluation_time_millis) AS TMP_584,
UpdateColumn(evaluation_time, evaluation_time_millis) AS TMP_582,
UpdateColumn(org_code, evaluation_time_millis) AS TMP_585) -> select:
(risk_id, _UTF-16LE'''' AS risk_name, /(TMP_580, CAST(TMP_581)) AS
risk_value, TMP_582 AS evaluation_time, TMP_583 AS area_code, TMP_584 AS
area_name, TMP_585 AS org_code, TMP_586 AS org_name, dev_only_id, TMP_587
AS ip, CAST(TMP_581) AS rule_weight_sum, CAST(TMP_588) AS
evaluation_time_millis) -> to: Tuple2 -> Filter -> Map -> from: (risk_id,
risk_name, risk_value, evaluation_time, area_code, area_name, org_code,
org_name, dev_only_id, ip, risk_weight_sum, evaluation_time_millis) (3/6)

-- 

orlando

Reply via email to