I'm attempting to update a Beam application (intended to run on flink 1.10 on 
EMR) from 2.21 to 2.27.  In the process I've encountered an issue with 
checkpointing/savepointing and stateful do fns.  This issue is not reproducible 
locally on the Flink MiniCluster (or at least, all of my tests worked locally). 
 After backtesting through prior versions, it appears that 2.22 is the earliest 
version that this problem appears in.

public static void main(String[] args) {
    FlinkPipelineOptions options =
        PipelineOptionsFactory.fromArgs(args).as(FlinkPipelineOptions.class);
    options.setRunner(FlinkRunner.class);
    options.setCheckpointingInterval(30000L);
    Pipeline pipeline = Pipeline.create(options);
    pipeline
        .apply("UnboundedSource", GenerateSequence.from(0).withRate(1, 
Duration.standardSeconds(5)))
        .apply(
            "ApplyTriggering",
            Window.<Long>configure()
                .discardingFiredPanes()
                .triggering(
                    Repeatedly.forever(
                        AfterProcessingTime.pastFirstElementInPane()
                            .plusDelayOf(Duration.standardSeconds(10)))))
        .apply("KeyByNull", WithKeys.<Void, Long>of((Void) 
null).withKeyType(voids()))
        .apply(
            "StatefulFn",
            ParDo.of(
                new DoFn<KV<Void, Long>, KV<Long, Long>>() {

                  private static final long serialVersionUID = 1L;

                  @StateId("SUM")
                  private final StateSpec<ValueState<Long>> sumSpec = 
StateSpecs.value();

                  @StateId("COUNT")
                  private final StateSpec<ValueState<Long>> countSpec = 
StateSpecs.value();

                  @ProcessElement
                  public void processElement(
                      @Element KV<Void, Long> element,
                      @StateId("SUM") ValueState<Long> sumState,
                      @StateId("COUNT") ValueState<Long> countState,
                      OutputReceiver<KV<Long, Long>> out) {

                    long currentSum =
                        Optional.ofNullable(sumState.read()).orElse(0L) + 
element.getValue();
                    sumState.write(currentSum);

                    long currentCount = 
Optional.ofNullable(countState.read()).orElse(0L) + 1;
                    countState.write(currentCount);
                    out.output(KV.of(currentCount, currentSum));
                  }
                }))
        .apply(
            MapElements.into(voids())
                .via(
                    kv -> {
                      System.out.println(
                          String.format(
                             "Time: %s, Count: %d, Sum: %d",
                              Instant.now(), kv.getKey(), kv.getValue()));
                      return null;
                    }));
    pipeline.run();


I run this pipeline from the EMR master node (with flink 1.10.0, and rocksdb as 
a state backend) using this command:

flink run -m yarn-cluster -yjm 1g -ytm 1g -ys 1 -c 
my.test.package.StatefulTestPipeline -p 1 -d pipeline.jar


At first this pipeline runs as expected (output is printed on stdout for the 
task manager):

Time: 2021-02-13T17:25:08.820Z, Count: 1, Sum: 0
But when the first checkpoint (or savepoint) starts, an error appears in the 
logs and no more output appears:

17:25:11.231 [StatefulFn/ParMultiDo(Anonymous) -> 
MapElements/Map/ParMultiDo(Anonymous) (1/1)] ERROR 
org.apache.flink.runtime.taskmanager.Task - FATAL - exception in resource 
cleanup of task StatefulFn/ParMultiDo(Anonymous) -> 
MapElements/Map/ParMultiDo(Anonymous) (1/1) (d60f456dd213162dbc957a705609f561).

java.lang.IllegalStateException: null

    at org.apache.flink.util.Preconditions.checkState(Preconditions.java:179) 
~[flink-dist_2.11-1.10.0.jar:1.10.0]

    at 
org.apache.flink.runtime.taskmanager.Task.notifyFinalState(Task.java:926) 
~[falls-risk-pipeline-0.5.0-MLECOSYSTM-1579-SNAPSHOT.jar:0.5.0-MLECOSYSTM-1579-SNAPSHOT]

    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:832) 
[falls-risk-pipeline-0.5.0-MLECOSYSTM-1579-SNAPSHOT.jar:0.5.0-MLECOSYSTM-1579-SNAPSHOT]

    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:533) 
[falls-risk-pipeline-0.5.0-MLECOSYSTM-1579-SNAPSHOT.jar:0.5.0-MLECOSYSTM-1579-SNAPSHOT]

    at java.lang.Thread.run(Thread.java:748) [?:1.8.0_252]

17:25:11.245 [StatefulFn/ParMultiDo(Anonymous) -> 
MapElements/Map/ParMultiDo(Anonymous) (1/1)] ERROR 
org.apache.flink.runtime.taskexecutor.TaskExecutor - FATAL - exception in 
resource cleanup of task StatefulFn/ParMultiDo(Anonymous) -> 
MapElements/Map/ParMultiDo(Anonymous) (1/1) (d60f456dd213162dbc957a705609f561).

java.lang.IllegalStateException: null

    at org.apache.flink.util.Preconditions.checkState(Preconditions.java:179) 
~[flink-dist_2.11-1.10.0.jar:1.10.0]

    at 
org.apache.flink.runtime.taskmanager.Task.notifyFinalState(Task.java:926) 
[falls-risk-pipeline-0.5.0-MLECOSYSTM-1579-SNAPSHOT.jar:0.5.0-MLECOSYSTM-1579-SNAPSHOT]

    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:832) 
[falls-risk-pipeline-0.5.0-MLECOSYSTM-1579-SNAPSHOT.jar:0.5.0-MLECOSYSTM-1579-SNAPSHOT]

    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:533) 
[falls-risk-pipeline-0.5.0-MLECOSYSTM-1579-SNAPSHOT.jar:0.5.0-MLECOSYSTM-1579-SNAPSHOT]

    at java.lang.Thread.run(Thread.java:748) [?:1.8.0_252]

This exception appears to indicate that this particular task is done, however 
in the flink dashboard, none of them read as finished/failed/canceled. They all 
show up as running.

I attempted to run this same pipeline on flink 1.10.0 and beam 2.27 with the 
same results.
I also tried on flink 1.11.2 and beam 2.27, however none of the subtasks 
started (they got stuck in the Created state), and I haven't had a chance to 
dig farther in to see if a native flink stream works.

I skimmed through the jira queue, and it’s possible that this is related to 
https://issues.apache.org/jira/browse/BEAM-10927




CONFIDENTIALITY NOTICE This message and any included attachments are from 
Cerner Corporation and are intended only for the addressee. The information 
contained in this message is confidential and may constitute inside or 
non-public information under international, federal, or state securities laws. 
Unauthorized forwarding, printing, copying, distribution, or use of such 
information is strictly prohibited and may be unlawful. If you are not the 
addressee, please promptly delete this message and notify the sender of the 
delivery error by e-mail or you may call Cerner's corporate offices in Kansas 
City, Missouri, U.S.A at (+1) (816)221-1024.

Reply via email to