Hi Sudharsan, How did you cancel thus single job. According to the High Availability Document:
“In order to recover submitted jobs, Flink persists metadata and the job artifacts. The HA data will be kept until the respective job either succeeds, is cancelled or fails terminally. Once this happens, all the HA data, including the metadata stored in the HA services, will be deleted." So I think the job data should be deleted if you use the action “cancel” (instead of “stop") to cancel the job. Also I paste the HA and savepoint doc link below, hopes these may help you. HA: https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/ha/overview/ Savepoint: https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/savepoints/ Best, Yuan > On 19 Jun 2022, at 12:51 AM, Sudharsan R <[email protected]> wrote: > > Hello, > We are running a single job in a flink 1.11.1 cluster on a k8s cluster. We > use zookeeper HA mode. > > To upgrade our application code, we do a flink cli job cancel with savepoint. > We then bring down the whole flink cluster. We bring it back up and submit > the new app code with this savepoint. > > Here's a specific scenario: > 1. A checkpoint was initiated by the flink infra. > 2. We triggered a cancel with savepoint while the checkpoint was in progress. > 3. Based on logs, the checkpoint completes and immediately after this the > savepoint also seems to complete. At this point, my expectation is that > zookeeper would have no state for this job on this cluster. > 4. The new cluster comes up. We submit a job from our savepoint. However, the > old job also seems to have been recovered! The UI shows this job. The logs > also seem to indicate this. > Please see a list of interesting events: > 21:09:28 Starting job 2ddc7c290891ec2d169068d1992586d4 from savepoint ……. > Jun 17, 2022 @ 21:09:25.036 Submitting Job with > JobId=2ddc7c290891ec2d169068d1992586d4. > 21:08:27 Recovered JobGraph(jobId: 28e0ef806b40c27111614081e18d72f9) > 21:08:27 Successfully recovered 1 persisted job graphs. > 21:07:27 Starting standalonesession dameon on …. > 21:07:25 New jobmanager pod comes up > > 21:07:14 Last message seen from old manager job > 21:07:00 Cancelling tasks to cancelled messages > 21:06:42 savepoint stored in …. > 21:05:16 Last message of type Received last message for now expired > checkpoint attempt 101289 > 21:04:52 Received late message for now expired checkpoint attempt 101289 …. > 21:04:49 Triggering checkpoint 101290 (type=SAVEPOINT) > 21:04:48: ERROR > org.apache.flink.contrib.streaming.state.snapshot.RocksIncrementalSnapshotStrategy: > Could not properly discard states. > 21:04:48 ERROR > org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory: Could > not delete the checkpoint stream file > 21:04:47 Submitting Job with JobId=2ddc7c290891ec2d169068d1992586d4. > 21:04:37 Triggering checkpoint 101289 (type=CHECKPOINT) > > I don't see any zookeeper errors around this time(server or flink logs). The > ERROR events(21:04:48) are interesting. Although, it's much before the > savepoint completion (21:06:42). > > What if anything could i be possibly doing wrong? We could try to clean out > the zookeeper state prior to job submission as a safety measure. But, i would > have expected this to work neverthless. > > Thanks > Sudharsan >
