I'm attempting to update a Beam application (intended to run on flink 1.10 on EMR) from 2.21 to 2.27. In the process I've encountered an issue with checkpointing/savepointing and stateful do fns. This issue is not reproducible locally on the Flink MiniCluster (or at least, all of my tests worked locally). After backtesting through prior versions, it appears that 2.22 is the earliest version that this problem appears in.
public static void main(String[] args) { FlinkPipelineOptions options = PipelineOptionsFactory.fromArgs(args).as(FlinkPipelineOptions.class); options.setRunner(FlinkRunner.class); options.setCheckpointingInterval(30000L); Pipeline pipeline = Pipeline.create(options); pipeline .apply("UnboundedSource", GenerateSequence.from(0).withRate(1, Duration.standardSeconds(5))) .apply( "ApplyTriggering", Window.<Long>configure() .discardingFiredPanes() .triggering( Repeatedly.forever( AfterProcessingTime.pastFirstElementInPane() .plusDelayOf(Duration.standardSeconds(10))))) .apply("KeyByNull", WithKeys.<Void, Long>of((Void) null).withKeyType(voids())) .apply( "StatefulFn", ParDo.of( new DoFn<KV<Void, Long>, KV<Long, Long>>() { private static final long serialVersionUID = 1L; @StateId("SUM") private final StateSpec<ValueState<Long>> sumSpec = StateSpecs.value(); @StateId("COUNT") private final StateSpec<ValueState<Long>> countSpec = StateSpecs.value(); @ProcessElement public void processElement( @Element KV<Void, Long> element, @StateId("SUM") ValueState<Long> sumState, @StateId("COUNT") ValueState<Long> countState, OutputReceiver<KV<Long, Long>> out) { long currentSum = Optional.ofNullable(sumState.read()).orElse(0L) + element.getValue(); sumState.write(currentSum); long currentCount = Optional.ofNullable(countState.read()).orElse(0L) + 1; countState.write(currentCount); out.output(KV.of(currentCount, currentSum)); } })) .apply( MapElements.into(voids()) .via( kv -> { System.out.println( String.format( "Time: %s, Count: %d, Sum: %d", Instant.now(), kv.getKey(), kv.getValue())); return null; })); pipeline.run(); I run this pipeline from the EMR master node (with flink 1.10.0, and rocksdb as a state backend) using this command: flink run -m yarn-cluster -yjm 1g -ytm 1g -ys 1 -c my.test.package.StatefulTestPipeline -p 1 -d pipeline.jar At first this pipeline runs as expected (output is printed on stdout for the task manager): Time: 2021-02-13T17:25:08.820Z, Count: 1, Sum: 0 But when the first checkpoint (or savepoint) starts, an error appears in the logs and no more output appears: 17:25:11.231 [StatefulFn/ParMultiDo(Anonymous) -> MapElements/Map/ParMultiDo(Anonymous) (1/1)] ERROR org.apache.flink.runtime.taskmanager.Task - FATAL - exception in resource cleanup of task StatefulFn/ParMultiDo(Anonymous) -> MapElements/Map/ParMultiDo(Anonymous) (1/1) (d60f456dd213162dbc957a705609f561). java.lang.IllegalStateException: null at org.apache.flink.util.Preconditions.checkState(Preconditions.java:179) ~[flink-dist_2.11-1.10.0.jar:1.10.0] at org.apache.flink.runtime.taskmanager.Task.notifyFinalState(Task.java:926) ~[falls-risk-pipeline-0.5.0-MLECOSYSTM-1579-SNAPSHOT.jar:0.5.0-MLECOSYSTM-1579-SNAPSHOT] at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:832) [falls-risk-pipeline-0.5.0-MLECOSYSTM-1579-SNAPSHOT.jar:0.5.0-MLECOSYSTM-1579-SNAPSHOT] at org.apache.flink.runtime.taskmanager.Task.run(Task.java:533) [falls-risk-pipeline-0.5.0-MLECOSYSTM-1579-SNAPSHOT.jar:0.5.0-MLECOSYSTM-1579-SNAPSHOT] at java.lang.Thread.run(Thread.java:748) [?:1.8.0_252] 17:25:11.245 [StatefulFn/ParMultiDo(Anonymous) -> MapElements/Map/ParMultiDo(Anonymous) (1/1)] ERROR org.apache.flink.runtime.taskexecutor.TaskExecutor - FATAL - exception in resource cleanup of task StatefulFn/ParMultiDo(Anonymous) -> MapElements/Map/ParMultiDo(Anonymous) (1/1) (d60f456dd213162dbc957a705609f561). java.lang.IllegalStateException: null at org.apache.flink.util.Preconditions.checkState(Preconditions.java:179) ~[flink-dist_2.11-1.10.0.jar:1.10.0] at org.apache.flink.runtime.taskmanager.Task.notifyFinalState(Task.java:926) [falls-risk-pipeline-0.5.0-MLECOSYSTM-1579-SNAPSHOT.jar:0.5.0-MLECOSYSTM-1579-SNAPSHOT] at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:832) [falls-risk-pipeline-0.5.0-MLECOSYSTM-1579-SNAPSHOT.jar:0.5.0-MLECOSYSTM-1579-SNAPSHOT] at org.apache.flink.runtime.taskmanager.Task.run(Task.java:533) [falls-risk-pipeline-0.5.0-MLECOSYSTM-1579-SNAPSHOT.jar:0.5.0-MLECOSYSTM-1579-SNAPSHOT] at java.lang.Thread.run(Thread.java:748) [?:1.8.0_252] This exception appears to indicate that this particular task is done, however in the flink dashboard, none of them read as finished/failed/canceled. They all show up as running. I attempted to run this same pipeline on flink 1.10.0 and beam 2.27 with the same results. I also tried on flink 1.11.2 and beam 2.27, however none of the subtasks started (they got stuck in the Created state), and I haven't had a chance to dig farther in to see if a native flink stream works. I skimmed through the jira queue, and it’s possible that this is related to https://issues.apache.org/jira/browse/BEAM-10927 CONFIDENTIALITY NOTICE This message and any included attachments are from Cerner Corporation and are intended only for the addressee. The information contained in this message is confidential and may constitute inside or non-public information under international, federal, or state securities laws. Unauthorized forwarding, printing, copying, distribution, or use of such information is strictly prohibited and may be unlawful. If you are not the addressee, please promptly delete this message and notify the sender of the delivery error by e-mail or you may call Cerner's corporate offices in Kansas City, Missouri, U.S.A at (+1) (816)221-1024.