[ https://issues.apache.org/jira/browse/FLINK-16770?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17073388#comment-17073388 ]
Biao Liu edited comment on FLINK-16770 at 4/2/20, 5:52 AM: ----------------------------------------------------------- Hi [~yunta], thanks for the response. If I understand correctly, there is an inconsistent state of {{CompletedCheckpointStore}} while stopping a checkpoint which is doing asynchronous finalization. There are two strategies here, 1. The checkpoint which is doing finalization could be aborted when {{CheckpointCoordinator}} is being shut down or periodic scheduler is being stopped. This is the choice of current implementation. However we didn't handle the {{CompletedCheckpointStore}} well. For example it might be better that reverting the state of {{CompletedCheckpointStore}} when the {{PendingCheckpoint}} finds the discarding after asynchronous finalization. But I think it's not easy to do so. Because there might be a subsuming operation during {{CompletedCheckpointStore#addCheckpoint}}. 2. The checkpoint which is doing finalization could NOT be aborted when {{CheckpointCoordinator}} is being shut down or period scheduler is being stopped. I personally prefer this solution, because it could simply the concurrent conflict scenario and it's much easier to implement. I think introducing an atomic boolean might not be enough. It's better to rethink the relationship between {{PendingCheckpoint#abort}} and {{PendingCheckpoint#finalizeCheckpoint}}. And we also need to rewrite a part of error handling of the finalization. BTW, [~yunta] could you share the unit test case which could reproduce the scenario locally? I want to verify my assumption and solution. The original e2e test case is not stable. was (Author: sleepy): Hi [~yunta], thanks for the response. If I understand correctly, there is an inconsistent state of {{CompletedCheckpointStore}} while stopping a checkpoint which is doing asynchronous finalization. There are two strategy here, 1. The checkpoint which is doing finalization could be aborted when {{CheckpointCoordinator}} is being shut down or periodic scheduler is being stopped. This is the choice of current implementation. However we didn't handle the {{CompletedCheckpointStore}} well. For example it might be better that reverting the state of {{CompletedCheckpointStore}} when the {{PendingCheckpoint}} finds the discarding after asynchronous finalization. But I think it's not easy to do so. Because there might be a subsuming operation during {{CompletedCheckpointStore#addCheckpoint}}. 2. The checkpoint which is doing finalization could NOT be aborted when {{CheckpointCoordinator}} is being shut down or period scheduler is being stopped. I personally prefer this solution, because it could simply the concurrent conflict scenario and it's much easier to implement. I think introducing an atomic boolean might not be enough. It's better to rethink the relationship between {{PendingCheckpoint#abort}} and {{PendingCheckpoint#finalizeCheckpoint}}. And we also need to rewrite a part of error handling of the finalization. BTW, [~yunta] could you share the unit test case which could reproduce the scenario locally? I want to verify my suggestion and solution. The original e2e test case is not stable. > Resuming Externalized Checkpoint (rocks, incremental, scale up) end-to-end > test fails with no such file > ------------------------------------------------------------------------------------------------------- > > Key: FLINK-16770 > URL: https://issues.apache.org/jira/browse/FLINK-16770 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing, Tests > Affects Versions: 1.11.0 > Reporter: Zhijiang > Assignee: Yun Tang > Priority: Blocker > Labels: test-stability > Fix For: 1.11.0 > > Attachments: e2e-output.log, > flink-vsts-standalonesession-0-fv-az53.log > > > The log : > [https://dev.azure.com/rmetzger/Flink/_build/results?buildId=6603&view=logs&j=c88eea3b-64a0-564d-0031-9fdcd7b8abee&t=1e2bbe5b-4657-50be-1f07-d84bfce5b1f5] > > There was also the similar problem in > https://issues.apache.org/jira/browse/FLINK-16561, but for the case of no > parallelism change. And this case is for scaling up. Not quite sure whether > the root cause is the same one. > {code:java} > 2020-03-25T06:50:31.3894841Z Running 'Resuming Externalized Checkpoint > (rocks, incremental, scale up) end-to-end test' > 2020-03-25T06:50:31.3895308Z > ============================================================================== > 2020-03-25T06:50:31.3907274Z TEST_DATA_DIR: > /home/vsts/work/1/s/flink-end-to-end-tests/test-scripts/temp-test-directory-31390197304 > 2020-03-25T06:50:31.5500274Z Flink dist directory: > /home/vsts/work/1/s/flink-dist/target/flink-1.11-SNAPSHOT-bin/flink-1.11-SNAPSHOT > 2020-03-25T06:50:31.6354639Z Starting cluster. > 2020-03-25T06:50:31.8871932Z Starting standalonesession daemon on host > fv-az655. > 2020-03-25T06:50:33.5021784Z Starting taskexecutor daemon on host fv-az655. > 2020-03-25T06:50:33.5152274Z Waiting for Dispatcher REST endpoint to come > up... > 2020-03-25T06:50:34.5498116Z Waiting for Dispatcher REST endpoint to come > up... > 2020-03-25T06:50:35.6031346Z Waiting for Dispatcher REST endpoint to come > up... > 2020-03-25T06:50:36.9848425Z Waiting for Dispatcher REST endpoint to come > up... > 2020-03-25T06:50:38.0283377Z Dispatcher REST endpoint is up. > 2020-03-25T06:50:38.0285490Z Running externalized checkpoints test, with > ORIGINAL_DOP=2 NEW_DOP=4 and STATE_BACKEND_TYPE=rocks > STATE_BACKEND_FILE_ASYNC=true STATE_BACKEND_ROCKSDB_INCREMENTAL=true > SIMULATE_FAILURE=false ... > 2020-03-25T06:50:46.1754645Z Job (b8cb04e4b1e730585bc616aa352866d0) is > running. > 2020-03-25T06:50:46.1758132Z Waiting for job > (b8cb04e4b1e730585bc616aa352866d0) to have at least 1 completed checkpoints > ... > 2020-03-25T06:50:46.3478276Z Waiting for job to process up to 200 records, > current progress: 173 records ... > 2020-03-25T06:50:49.6332988Z Cancelling job b8cb04e4b1e730585bc616aa352866d0. > 2020-03-25T06:50:50.4875673Z Cancelled job b8cb04e4b1e730585bc616aa352866d0. > 2020-03-25T06:50:50.5468230Z ls: cannot access > '/home/vsts/work/1/s/flink-end-to-end-tests/test-scripts/temp-test-directory-31390197304/externalized-chckpt-e2e-backend-dir/b8cb04e4b1e730585bc616aa352866d0/chk-[1-9]*/_metadata': > No such file or directory > 2020-03-25T06:50:50.5606260Z Restoring job with externalized checkpoint at . > ... > 2020-03-25T06:50:58.4728245Z > 2020-03-25T06:50:58.4732663Z > ------------------------------------------------------------ > 2020-03-25T06:50:58.4735785Z The program finished with the following > exception: > 2020-03-25T06:50:58.4737759Z > 2020-03-25T06:50:58.4742666Z > org.apache.flink.client.program.ProgramInvocationException: The main method > caused an error: java.util.concurrent.ExecutionException: > org.apache.flink.runtime.client.JobSubmissionException: Failed to submit > JobGraph. > 2020-03-25T06:50:58.4746274Z at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335) > 2020-03-25T06:50:58.4749954Z at > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205) > 2020-03-25T06:50:58.4752753Z at > org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:142) > 2020-03-25T06:50:58.4755400Z at > org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:659) > 2020-03-25T06:50:58.4757862Z at > org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:210) > 2020-03-25T06:50:58.4760282Z at > org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:890) > 2020-03-25T06:50:58.4763591Z at > org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:963) > 2020-03-25T06:50:58.4764274Z at > java.security.AccessController.doPrivileged(Native Method) > 2020-03-25T06:50:58.4764809Z at > javax.security.auth.Subject.doAs(Subject.java:422) > 2020-03-25T06:50:58.4765434Z at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836) > 2020-03-25T06:50:58.4766180Z at > org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) > 2020-03-25T06:50:58.4773549Z at > org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:963) > 2020-03-25T06:50:58.4774502Z Caused by: java.lang.RuntimeException: > java.util.concurrent.ExecutionException: > org.apache.flink.runtime.client.JobSubmissionException: Failed to submit > JobGraph. > 2020-03-25T06:50:58.4775382Z at > org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:276) > 2020-03-25T06:50:58.4776163Z at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1741) > 2020-03-25T06:50:58.4777706Z at > org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:90) > 2020-03-25T06:50:58.4778334Z at > org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:58) > 2020-03-25T06:50:58.4779007Z at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1620) > 2020-03-25T06:50:58.4779654Z at > org.apache.flink.streaming.tests.DataStreamAllroundTestProgram.main(DataStreamAllroundTestProgram.java:215) > 2020-03-25T06:50:58.4780371Z at > sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > 2020-03-25T06:50:58.4784367Z at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > 2020-03-25T06:50:58.4785063Z at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > 2020-03-25T06:50:58.4785557Z at > java.lang.reflect.Method.invoke(Method.java:498) > 2020-03-25T06:50:58.4786204Z at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321) > 2020-03-25T06:50:58.4786547Z ... 11 more > 2020-03-25T06:50:58.4787007Z Caused by: > java.util.concurrent.ExecutionException: > org.apache.flink.runtime.client.JobSubmissionException: Failed to submit > JobGraph. > 2020-03-25T06:50:58.4787717Z at > java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) > 2020-03-25T06:50:58.4788203Z at > java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908) > 2020-03-25T06:50:58.4788835Z at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1736) > 2020-03-25T06:50:58.4789362Z ... 20 more > 2020-03-25T06:50:58.4789720Z Caused by: > org.apache.flink.runtime.client.JobSubmissionException: Failed to submit > JobGraph. > 2020-03-25T06:50:58.4790467Z at > org.apache.flink.client.program.rest.RestClusterClient.lambda$submitJob$7(RestClusterClient.java:359) > 2020-03-25T06:50:58.4791087Z at > java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:884) > 2020-03-25T06:50:58.4791650Z at > java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:866) > 2020-03-25T06:50:58.4792560Z at > java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) > 2020-03-25T06:50:58.4793617Z at > java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990) > 2020-03-25T06:50:58.4794496Z at > org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$8(FutureUtils.java:274) > 2020-03-25T06:50:58.4795255Z at > java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774) > 2020-03-25T06:50:58.4796264Z at > java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750) > 2020-03-25T06:50:58.4796867Z at > java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) > 2020-03-25T06:50:58.4797439Z at > java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:575) > 2020-03-25T06:50:58.4798000Z at > java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:943) > 2020-03-25T06:50:58.4798589Z at > java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456) > 2020-03-25T06:50:58.4799162Z at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > 2020-03-25T06:50:58.4799727Z at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > 2020-03-25T06:50:58.4800210Z at java.lang.Thread.run(Thread.java:748) > 2020-03-25T06:50:58.4800767Z Caused by: > org.apache.flink.runtime.rest.util.RestClientException: [Internal server > error., <Exception on server side: > 2020-03-25T06:50:58.4801351Z > org.apache.flink.runtime.client.JobSubmissionException: Failed to submit job. > 2020-03-25T06:50:58.4801938Z at > org.apache.flink.runtime.dispatcher.Dispatcher.lambda$internalSubmitJob$3(Dispatcher.java:336) > 2020-03-25T06:50:58.4803660Z at > java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:836) > 2020-03-25T06:50:58.4804555Z at > java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:811) > 2020-03-25T06:50:58.4805235Z at > java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456) > 2020-03-25T06:50:58.4805839Z at > akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) > 2020-03-25T06:50:58.4806515Z at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44) > 2020-03-25T06:50:58.4807184Z at > akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > 2020-03-25T06:50:58.4807807Z at > akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > 2020-03-25T06:50:58.4808417Z at > akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > 2020-03-25T06:50:58.4809055Z at > akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > 2020-03-25T06:50:58.4809783Z Caused by: java.lang.RuntimeException: > org.apache.flink.runtime.client.JobExecutionException: Could not set up > JobManager > 2020-03-25T06:50:58.4810756Z at > org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:36) > 2020-03-25T06:50:58.4811444Z at > java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604) > 2020-03-25T06:50:58.4811937Z ... 6 more > 2020-03-25T06:50:58.4812414Z Caused by: > org.apache.flink.runtime.client.JobExecutionException: Could not set up > JobManager > 2020-03-25T06:50:58.4813330Z at > org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl.<init>(JobManagerRunnerImpl.java:152) > 2020-03-25T06:50:58.4814154Z at > org.apache.flink.runtime.dispatcher.DefaultJobManagerRunnerFactory.createJobManagerRunner(DefaultJobManagerRunnerFactory.java:84) > 2020-03-25T06:50:58.4814846Z at > org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$6(Dispatcher.java:379) > 2020-03-25T06:50:58.4815622Z at > org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:34) > 2020-03-25T06:50:58.4816074Z ... 7 more > 2020-03-25T06:50:58.4816924Z Caused by: java.io.IOException: Cannot access > file system for checkpoint/savepoint path 'file://.'. > 2020-03-25T06:50:58.4817673Z at > org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorage.resolveCheckpointPointer(AbstractFsCheckpointStorage.java:233) > 2020-03-25T06:50:58.4818450Z at > org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorage.resolveCheckpoint(AbstractFsCheckpointStorage.java:110) > 2020-03-25T06:50:58.4819276Z at > org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreSavepoint(CheckpointCoordinator.java:1312) > 2020-03-25T06:50:58.4819943Z at > org.apache.flink.runtime.scheduler.SchedulerBase.tryRestoreExecutionGraphFromSavepoint(SchedulerBase.java:314) > 2020-03-25T06:50:58.4820633Z at > org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:247) > 2020-03-25T06:50:58.4821258Z at > org.apache.flink.runtime.scheduler.SchedulerBase.<init>(SchedulerBase.java:223) > 2020-03-25T06:50:58.4821862Z at > org.apache.flink.runtime.scheduler.DefaultScheduler.<init>(DefaultScheduler.java:118) > 2020-03-25T06:50:58.4822505Z at > org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:103) > 2020-03-25T06:50:58.4823115Z at > org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:282) > 2020-03-25T06:50:58.4823665Z at > org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:270) > 2020-03-25T06:50:58.4824485Z at > org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:98) > 2020-03-25T06:50:58.4825597Z at > org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:40) > 2020-03-25T06:50:58.4826400Z at > org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl.<init>(JobManagerRunnerImpl.java:146) > 2020-03-25T06:50:58.4826919Z ... 10 more > 2020-03-25T06:50:58.4829018Z Caused by: java.io.IOException: Found local file > path with authority '.' in path 'file://.'. Hint: Did you forget a slash? > (correct path would be 'file:///.') > 2020-03-25T06:50:58.4829875Z at > org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:441) > 2020-03-25T06:50:58.4830364Z at > org.apache.flink.core.fs.FileSystem.get(FileSystem.java:389) > 2020-03-25T06:50:58.4830807Z at > org.apache.flink.core.fs.Path.getFileSystem(Path.java:292) > 2020-03-25T06:50:58.4831408Z at > org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorage.resolveCheckpointPointer(AbstractFsCheckpointStorage.java:230) > 2020-03-25T06:50:58.4832021Z ... 22 more > 2020-03-25T06:50:58.4832151Z > 2020-03-25T06:50:58.4832356Z End of exception on server side>] > 2020-03-25T06:50:58.4832720Z at > org.apache.flink.runtime.rest.RestClient.parseResponse(RestClient.java:390) > 2020-03-25T06:50:58.4833238Z at > org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$3(RestClient.java:374) > 2020-03-25T06:50:58.4833884Z at > java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:966) > 2020-03-25T06:50:58.4834376Z at > java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:940) > 2020-03-25T06:50:58.4834724Z ... 4 more > 2020-03-25T06:50:58.5042321Z Resuming from externalized checkpoint job could > not be started. > 2020-03-25T06:50:58.5044210Z [FAIL] Test script contains errors. > 2020-03-25T06:50:58.5052826Z Checking of logs skipped. > 2020-03-25T06:50:58.5053164Z > 2020-03-25T06:50:58.5054116Z [FAIL] 'Resuming Externalized Checkpoint (rocks, > incremental, scale up) end-to-end test' failed after 0 minutes and 27 > seconds! Test exited with exit code 1 > 2020-03-25T06:50:58.5054639Z > 2020-03-25T06:50:58.8067813Z Stopping taskexecutor daemon (pid: 86888) on > host fv-az655. > 2020-03-25T06:50:59.0257270Z Stopping standalonesession daemon (pid: 86603) > on host fv-az655. > 2020-03-25T06:50:59.4920994Z > 2020-03-25T06:50:59.5000014Z ##[error]Bash exited with code '1'. > 2020-03-25T06:50:59.5015374Z ##[section]Finishing: Run e2e tests > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)