Vladislav Keda created FLINK-36139:
--------------------------------------

             Summary: ClassCastException when checkpointing AggregateFunction
                 Key: FLINK-36139
                 URL: https://issues.apache.org/jira/browse/FLINK-36139
             Project: Flink
          Issue Type: Bug
          Components: API / DataStream
    Affects Versions: 1.18.1
         Environment: Kubernetes Native Session Cluster
            Reporter: Vladislav Keda


Let's consider simple example of AggregateFunction with custom Accumulator:

 
{code:java}
    static class BatchingFunction implements AggregateFunction<Row, 
BatchingFunction.Accumulator, Row>, ResultTypeQueryable<Row> {

        @Getter        
        private final RowTypeInfo producedType;
        
        public BatchingFunction() {
            this.producedType = ...
        }
        
        @Override
        public Accumulator createAccumulator() {
            return new Accumulator();
        }

        @Override
        public Accumulator add(Row row, Accumulator acc) {
            acc.add(row);
            return acc;
        }
   
        @Override
        public Accumulator merge(Accumulator acc1, Accumulator acc2) {
            acc1.merge(acc2);
            return acc1;
        }

        @Override
        public Row getResult(Accumulator accumulator) {
            ...
        }

        private static class Accumulator implements Serializable {
            
            private final List<Row> rows = new ArrayList<>();

            List<Row> getAll() {
                return rows;
            }

            Accumulator merge(Accumulator acc2) {
                this.rows.addAll(acc2.rows);
                acc2.clear();
                return this;
            }

            void add(Row row) {
                rows.add(row);
            }

            void clear() {
                rows.clear();
            }
        }
    }
 {code}

When resubmitting a job on a Flink Kubernetes Session cluster with aligned 
checkpoints that include this BatchingFunction, a ClassCastException is thrown 
in the JobManager logs:

 
{code:java}
org.apache.flink.util.SerializedThrowable: 
org.apache.flink.runtime.checkpoint.CheckpointException: Asynchronous task 
checkpoint failed.    at 
org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.handleExecutionException(AsyncCheckpointRunnable.java:320)
 ~[flink-dist-1.18.1.jar:1.18.1]    at 
org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.run(AsyncCheckpointRunnable.java:155)
 ~[flink-dist-1.18.1.jar:1.18.1]    at 
java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) [?:?]    at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) [?:?]    at 
java.lang.Thread.run(Unknown Source) [?:?]Caused by: 
org.apache.flink.util.SerializedThrowable: java.lang.Exception: Could not 
materialize checkpoint 1 for operator transform -> (Sink: metrics_transform, 
sink: Writer -> sink: Committer) (1/1)#0.    at 
org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.handleExecutionException(AsyncCheckpointRunnable.java:298)
 ~[flink-dist-1.18.1.jar:1.18.1]    ... 4 moreCaused by: 
org.apache.flink.util.SerializedThrowable: 
java.util.concurrent.ExecutionException: java.lang.ClassCastException: class 
ru.glowbyte.streaming.core.operators.transformations.Batch$BatchingFunction$Accumulator
 cannot be cast to class org.apache.flink.types.Row 
(ru.glowbyte.streaming.core.operators.transformations.Batch$BatchingFunction$Accumulator
 is in unnamed module of loader org.apache.flink.util.ChildFirstClassLoader 
@3e54db3e; org.apache.flink.types.Row is in unnamed module of loader 'app')    
at java.util.concurrent.FutureTask.report(Unknown Source) ~[?:?]    at 
java.util.concurrent.FutureTask.get(Unknown Source) ~[?:?]    at 
org.apache.flink.util.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:511)
 ~[flink-dist-1.18.1.jar:1.18.1]    at 
org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:54)
 ~[flink-dist-1.18.1.jar:1.18.1]    at 
org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.finalizeNonFinishedSnapshots(AsyncCheckpointRunnable.java:191)
 ~[flink-dist-1.18.1.jar:1.18.1]    at 
org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.run(AsyncCheckpointRunnable.java:124)
 ~[flink-dist-1.18.1.jar:1.18.1]    ... 3 moreCaused by: 
org.apache.flink.util.SerializedThrowable: java.lang.ClassCastException: class 
ru.glowbyte.streaming.core.operators.transformations.Batch$BatchingFunction$Accumulator
 cannot be cast to class org.apache.flink.types.Row 
(ru.glowbyte.streaming.core.operators.transformations.Batch$BatchingFunction$Accumulator
 is in unnamed module of loader org.apache.flink.util.ChildFirstClassLoader 
@3e54db3e; org.apache.flink.types.Row is in unnamed module of loader 'app')    
at 
org.apache.flink.api.java.typeutils.runtime.RowSerializer.serialize(RowSerializer.java:69)
 ~[flink-dist-1.18.1.jar:1.18.1]    at 
org.apache.flink.runtime.state.heap.CopyOnWriteStateMapSnapshot.writeState(CopyOnWriteStateMapSnapshot.java:147)
 ~[flink-dist-1.18.1.jar:1.18.1]    at 
org.apache.flink.runtime.state.heap.AbstractStateTableSnapshot.writeStateInKeyGroup(AbstractStateTableSnapshot.java:116)
 ~[flink-dist-1.18.1.jar:1.18.1]    at 
org.apache.flink.runtime.state.heap.CopyOnWriteStateTableSnapshot.writeStateInKeyGroup(CopyOnWriteStateTableSnapshot.java:38)
 ~[flink-dist-1.18.1.jar:1.18.1]    at 
org.apache.flink.runtime.state.heap.HeapSnapshotStrategy.lambda$asyncSnapshot$3(HeapSnapshotStrategy.java:172)
 ~[flink-dist-1.18.1.jar:1.18.1]    at 
org.apache.flink.runtime.state.SnapshotStrategyRunner$1.callInternal(SnapshotStrategyRunner.java:91)
 ~[flink-dist-1.18.1.jar:1.18.1]    at 
org.apache.flink.runtime.state.SnapshotStrategyRunner$1.callInternal(SnapshotStrategyRunner.java:88)
 ~[flink-dist-1.18.1.jar:1.18.1]    at 
org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:78)
 ~[flink-dist-1.18.1.jar:1.18.1]    at 
java.util.concurrent.FutureTask.run(Unknown Source) ~[?:?]    at 
org.apache.flink.util.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:508)
 ~[flink-dist-1.18.1.jar:1.18.1]    at 
org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:54)
 ~[flink-dist-1.18.1.jar:1.18.1]    at 
org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.finalizeNonFinishedSnapshots(AsyncCheckpointRunnable.java:191)
 ~[flink-dist-1.18.1.jar:1.18.1]    at 
org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.run(AsyncCheckpointRunnable.java:124)
 ~[flink-dist-1.18.1.jar:1.18.1]    ... 3 more {code}

In this case Flink uses incorrect serializer to write Accumulator objects to 
state. I would also like to note that this behavior is stochastic, since when I 
launch job first time on a new cluster such errors are not observed during 
checkpoints.

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to