Vladislav Keda created FLINK-36139:
--------------------------------------
Summary: ClassCastException when checkpointing AggregateFunction
Key: FLINK-36139
URL: https://issues.apache.org/jira/browse/FLINK-36139
Project: Flink
Issue Type: Bug
Components: API / DataStream
Affects Versions: 1.18.1
Environment: Kubernetes Native Session Cluster
Reporter: Vladislav Keda
Let's consider simple example of AggregateFunction with custom Accumulator:
{code:java}
static class BatchingFunction implements AggregateFunction<Row,
BatchingFunction.Accumulator, Row>, ResultTypeQueryable<Row> {
@Getter
private final RowTypeInfo producedType;
public BatchingFunction() {
this.producedType = ...
}
@Override
public Accumulator createAccumulator() {
return new Accumulator();
}
@Override
public Accumulator add(Row row, Accumulator acc) {
acc.add(row);
return acc;
}
@Override
public Accumulator merge(Accumulator acc1, Accumulator acc2) {
acc1.merge(acc2);
return acc1;
}
@Override
public Row getResult(Accumulator accumulator) {
...
}
private static class Accumulator implements Serializable {
private final List<Row> rows = new ArrayList<>();
List<Row> getAll() {
return rows;
}
Accumulator merge(Accumulator acc2) {
this.rows.addAll(acc2.rows);
acc2.clear();
return this;
}
void add(Row row) {
rows.add(row);
}
void clear() {
rows.clear();
}
}
}
{code}
When resubmitting a job on a Flink Kubernetes Session cluster with aligned
checkpoints that include this BatchingFunction, a ClassCastException is thrown
in the JobManager logs:
{code:java}
org.apache.flink.util.SerializedThrowable:
org.apache.flink.runtime.checkpoint.CheckpointException: Asynchronous task
checkpoint failed. at
org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.handleExecutionException(AsyncCheckpointRunnable.java:320)
~[flink-dist-1.18.1.jar:1.18.1] at
org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.run(AsyncCheckpointRunnable.java:155)
~[flink-dist-1.18.1.jar:1.18.1] at
java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) [?:?] at
java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) [?:?] at
java.lang.Thread.run(Unknown Source) [?:?]Caused by:
org.apache.flink.util.SerializedThrowable: java.lang.Exception: Could not
materialize checkpoint 1 for operator transform -> (Sink: metrics_transform,
sink: Writer -> sink: Committer) (1/1)#0. at
org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.handleExecutionException(AsyncCheckpointRunnable.java:298)
~[flink-dist-1.18.1.jar:1.18.1] ... 4 moreCaused by:
org.apache.flink.util.SerializedThrowable:
java.util.concurrent.ExecutionException: java.lang.ClassCastException: class
ru.glowbyte.streaming.core.operators.transformations.Batch$BatchingFunction$Accumulator
cannot be cast to class org.apache.flink.types.Row
(ru.glowbyte.streaming.core.operators.transformations.Batch$BatchingFunction$Accumulator
is in unnamed module of loader org.apache.flink.util.ChildFirstClassLoader
@3e54db3e; org.apache.flink.types.Row is in unnamed module of loader 'app')
at java.util.concurrent.FutureTask.report(Unknown Source) ~[?:?] at
java.util.concurrent.FutureTask.get(Unknown Source) ~[?:?] at
org.apache.flink.util.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:511)
~[flink-dist-1.18.1.jar:1.18.1] at
org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:54)
~[flink-dist-1.18.1.jar:1.18.1] at
org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.finalizeNonFinishedSnapshots(AsyncCheckpointRunnable.java:191)
~[flink-dist-1.18.1.jar:1.18.1] at
org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.run(AsyncCheckpointRunnable.java:124)
~[flink-dist-1.18.1.jar:1.18.1] ... 3 moreCaused by:
org.apache.flink.util.SerializedThrowable: java.lang.ClassCastException: class
ru.glowbyte.streaming.core.operators.transformations.Batch$BatchingFunction$Accumulator
cannot be cast to class org.apache.flink.types.Row
(ru.glowbyte.streaming.core.operators.transformations.Batch$BatchingFunction$Accumulator
is in unnamed module of loader org.apache.flink.util.ChildFirstClassLoader
@3e54db3e; org.apache.flink.types.Row is in unnamed module of loader 'app')
at
org.apache.flink.api.java.typeutils.runtime.RowSerializer.serialize(RowSerializer.java:69)
~[flink-dist-1.18.1.jar:1.18.1] at
org.apache.flink.runtime.state.heap.CopyOnWriteStateMapSnapshot.writeState(CopyOnWriteStateMapSnapshot.java:147)
~[flink-dist-1.18.1.jar:1.18.1] at
org.apache.flink.runtime.state.heap.AbstractStateTableSnapshot.writeStateInKeyGroup(AbstractStateTableSnapshot.java:116)
~[flink-dist-1.18.1.jar:1.18.1] at
org.apache.flink.runtime.state.heap.CopyOnWriteStateTableSnapshot.writeStateInKeyGroup(CopyOnWriteStateTableSnapshot.java:38)
~[flink-dist-1.18.1.jar:1.18.1] at
org.apache.flink.runtime.state.heap.HeapSnapshotStrategy.lambda$asyncSnapshot$3(HeapSnapshotStrategy.java:172)
~[flink-dist-1.18.1.jar:1.18.1] at
org.apache.flink.runtime.state.SnapshotStrategyRunner$1.callInternal(SnapshotStrategyRunner.java:91)
~[flink-dist-1.18.1.jar:1.18.1] at
org.apache.flink.runtime.state.SnapshotStrategyRunner$1.callInternal(SnapshotStrategyRunner.java:88)
~[flink-dist-1.18.1.jar:1.18.1] at
org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:78)
~[flink-dist-1.18.1.jar:1.18.1] at
java.util.concurrent.FutureTask.run(Unknown Source) ~[?:?] at
org.apache.flink.util.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:508)
~[flink-dist-1.18.1.jar:1.18.1] at
org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:54)
~[flink-dist-1.18.1.jar:1.18.1] at
org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.finalizeNonFinishedSnapshots(AsyncCheckpointRunnable.java:191)
~[flink-dist-1.18.1.jar:1.18.1] at
org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.run(AsyncCheckpointRunnable.java:124)
~[flink-dist-1.18.1.jar:1.18.1] ... 3 more {code}
In this case Flink uses incorrect serializer to write Accumulator objects to
state. I would also like to note that this behavior is stochastic, since when I
launch job first time on a new cluster such errors are not observed during
checkpoints.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)