Hello Beam Dev, I am having a hard time to get checkpoint work with FlinkRunner. I setup two simple pipelines, one read from unboundedsource Kinesis, and other read from text file (and test with/without `--streaming=true` config). But both are failed to save a checkpoint. The checkpoint are configured to save to file system. I am wonder if I am missing something?. Below are my pipelines and stack track for your references.
Appreciate if you can give me some pointers! Pipeline #1: pipeline.apply("ReadingFromKinesis123", KinesisIO.read() .withStreamName("test-stream") .withInitialPositionInStream(InitialPositionInStream.TRIM_HORIZON) .withAWSClientsProvider("foo", "bar", Regions.US_WEST_2, "http://localhost:30002")); Pipeline#2: Pipeline p = Pipeline.create(options); p.apply("ReadLines", TextIO.read().from(options.getInputFile())); p.run().waitUntilFinish(); One of my commands: bin/flink run -c org.apache.beam.examples.WordCount ../../word-count-beam/target/word-count-beam-bundled-0.1.jar --runner=FlinkRunner --checkpointingInterval=5000 --externalizedCheckpointsEnabled=true --streamName=test-stream --retainExternalizedCheckpointsOnCancellation=true --awsRegion=us-west-2 flink-conf.yaml: state.backend: filesystem state.checkpoints.dir: file:///tmp/flink-checkpoint/flink_app/ state.savepoints.dir: file:///tmp/flink-checkpoint/flink_app/savepoints/ Stacktrack: AsynchronousException{java.lang.Exception: Could not materialize checkpoint 7 for operator Source: ReadLines/Read -> DropInputs/ParMultiDo(NoOp) (1/1).} at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointExceptionHandler.tryHandleCheckpointException(StreamTask.java:1153) at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:947) at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:884) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.Exception: Could not materialize checkpoint 7 for operator Source: ReadLines/Read -> DropInputs/ParMultiDo(NoOp) (1/1). at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:942) ... 6 more Caused by: java.util.concurrent.ExecutionException: java.lang.AbstractMethodError: org.apache.flink.api.common.typeutils.TypeSerializer.snapshotConfiguration()Lorg/apache/flink/api/common/typeutils/TypeSerializerSnapshot; at java.util.concurrent.FutureTask.report(FutureTask.java:122) at java.util.concurrent.FutureTask.get(FutureTask.java:192) at org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:394) at org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:53) at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:853) ... 5 more Caused by: java.lang.AbstractMethodError: org.apache.flink.api.common.typeutils.TypeSerializer.snapshotConfiguration()Lorg/apache/flink/api/common/typeutils/TypeSerializerSnapshot; at org.apache.flink.runtime.state.RegisteredOperatorStateBackendMetaInfo.computeSnapshot(RegisteredOperatorStateBackendMetaInfo.java:170) at org.apache.flink.runtime.state.RegisteredOperatorStateBackendMetaInfo.snapshot(RegisteredOperatorStateBackendMetaInfo.java:103) at org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackendSnapshotStrategy.java:123) at org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackendSnapshotStrategy.java:108) at org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:75) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:391)