[ https://issues.apache.org/jira/browse/FLINK-12381?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16836192#comment-16836192 ]
Till Rohrmann commented on FLINK-12381: --------------------------------------- If you are using the {{StandaloneJobClusterEntrypoint}}, then a K8s deployment effectively defines the lifetime of a job. Differently said, restarts of the entrypoint pod won't be considered as a "new job submission" but as a "job recovery". Admittedly, this only makes sense if Flink can also retrieve checkpoints from the previous run which is currently only possible when HA is enabled. I guess the desired behavior would be if HA is disabled, then the {{StandaloneJobClusterEntrypoint}} should assign a random {{JobID}} when it starts. If HA is enabled, then the {{StandaloneJobClusterEntrypoint}} should generate the {{JobID}} in a predictable way (either being specified by the user, derived from some env variable, defaulting to 0 if nothing has been specified). > W/o HA, upon a full restart, checkpointing crashes > -------------------------------------------------- > > Key: FLINK-12381 > URL: https://issues.apache.org/jira/browse/FLINK-12381 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing, Runtime / Coordination > Affects Versions: 1.8.0 > Environment: Same as FLINK-\{12379, 12377, 12376} > Reporter: Henrik > Priority: Major > > {code:java} > Caused by: org.apache.hadoop.fs.FileAlreadyExistsException: > 'gs://example_bucket/flink/checkpoints/00000000000000000000000000000000/chk-16/_metadata' > already exists > at > com.google.cloud.hadoop.fs.gcs.GoogleHadoopOutputStream.createChannel(GoogleHadoopOutputStream.java:85) > at > com.google.cloud.hadoop.fs.gcs.GoogleHadoopOutputStream.<init>(GoogleHadoopOutputStream.java:74) > at > com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.create(GoogleHadoopFileSystemBase.java:797) > at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:929) > at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:910) > at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:807) > at > org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.create(HadoopFileSystem.java:141) > at > org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.create(HadoopFileSystem.java:37) > at > org.apache.flink.runtime.state.filesystem.FsCheckpointMetadataOutputStream.<init>(FsCheckpointMetadataOutputStream.java:65) > at > org.apache.flink.runtime.state.filesystem.FsCheckpointStorageLocation.createMetadataOutputStream(FsCheckpointStorageLocation.java:104) > at > org.apache.flink.runtime.checkpoint.PendingCheckpoint.finalizeCheckpoint(PendingCheckpoint.java:259) > at > org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:829) > ... 8 more > {code} > Instead, it should either just overwrite the checkpoint or fail to start the > job completely. Partial and undefined failure is not what should happen. > > Repro: > # Set up a single purpose job cluster (which could use much better docs btw!) > # Let it run with GCS checkpointing for a while with rocksdb/gs://example > # Kill it > # Start it -- This message was sent by Atlassian JIRA (v7.6.3#76005)