[ https://issues.apache.org/jira/browse/FLINK-30938?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17691244#comment-17691244 ]
Yufan Sheng edited comment on FLINK-30938 at 2/20/23 5:38 PM: -------------------------------------------------------------- Thanks for assigning this test task to me. I have finished the verification by following the instructions from [~Weijie Guo]. The {{AdaptiveBatchScheduler}} and {{SpeculativeExecution}} are working as expert. Here are the detailed test process and report from my side. h2. Building the flink on {{release-1.17}} branch I build the flink locally on {{release-1.17}} branch. The latest commit in my local repository is {{6b4745}}. To get the build more faster. I skipped all the code quality check and tests by using command {{mvn clean install -DskipTests -Pfast}}. !flink-1.17-branch-log.png|width=500! The running flink dashboard also proven this is a build from the {{6b4745}} commit. !flink-dashborad-version.png|width=550! We start the local standalone flink cluster for simplifying the test. The task manager is configured with 4 slots. !taskmanager-slots.png|width=700! h2. Test hybrid shuffle with {{AdaptiveBatchScheduler}} The default config value for {{execution.batch-shuffle-mode}} is {{ALL_EXCHANGES_BLOCKING}}, which means that this is a blocking shuffle. To get the hybrid shuffle mode enabled. We change this value to {{ALL_EXCHANGES_HYBRID_SELECTIVE}}, which means the data could be consumed immediately when the downstream task is available. No need to persist the data to disk. Given the AdaptiveBatchScheduler is enabled by default. The default {{jobmanager.partition.hybrid.partition-data-consume-constraint}} in this scheduler is {{UNFINISHED_PRODUCERS}}. This means that the produced data can be consumed immediately, no need to wait for the producer finished. To get hybrid shuffle feature testable. The last thing we need to do is setting enough slots for a task manager. The default flink standalone cluster only has on job manager and one task manager. The test code for this verification is shown in attachments: [^testAdaptiveBatchJob]. It will start a source, a sink and two map functions. So we set the slot to 4 by setting {{taskmanager.numberOfTaskSlots: 4}} in {{flink.yml}}. Finally, we submit the job [^testAdaptiveBatchJob] to flink cluster. h3. Hybrid shuffle in {{AdaptiveBatchScheduler}} report Since all the map operator will sleep for 5 seconds. The first thing we can find in job graph is that the map operator and sink operator are running simultaneously. If this is a default block shuffle, the sink should start after all the map operators have been finished. !AdaptiveBatchScheduler-job-graph.png|width=600! The screenshot of the execution timeline also shows that the source, map and sink are almost running in the same time. !AdaptiveBatchScheduler-timeline.png|width=600! Finally, the log also confirm this. {code} 2023-02-20 23:59:16,558 INFO org.apache.flink.runtime.taskmanager.Task [] - Source: Sequence Source (1/1)#0 (fb0eccc08c3516b1b8ed16977616b147_bc764cd8ddf7a0cff126f51c16239658_0_0) switched from INITIALIZING to RUNNING. 2023-02-20 23:59:16,563 INFO org.apache.flink.runtime.taskmanager.Task [] - Sink: Unnamed (1/1)#0 (fb0eccc08c3516b1b8ed16977616b147_ea632d67b7d595e5b851708ae9ad79d6_0_0) switched from INITIALIZING to RUNNING. 2023-02-20 23:59:16,564 INFO org.apache.flink.runtime.taskmanager.Task [] - Map (2/2)#0 (fb0eccc08c3516b1b8ed16977616b147_0a448493b4782967b150582570326227_1_0) switched from INITIALIZING to RUNNING. 2023-02-20 23:59:16,564 INFO org.apache.flink.runtime.taskmanager.Task [] - Map (1/2)#0 (fb0eccc08c3516b1b8ed16977616b147_0a448493b4782967b150582570326227_0_0) switched from INITIALIZING to RUNNING. {code} h2. Test hybird shuffle in Speculative Execution To get the *Speculative Execution* enabled, we add the {{execution.batch.speculative.enabled: true}} to {{flink.yml}}. We also change the {{execution.batch-shuffle-mode}} to {{ALL_EXCHANGES_HYBRID_FULL}} for persisting all the datas. The test code [^testSpeculativeExecution] in this verification is sleep forever when the index of sub task plus the number of attempt is even. To get the speculative execution detects the slower tasks. We also add a baseline for detecting this hanged tasks more faster. {code} slow-task-detector.execution-time.baseline-ratio: 0.2 slow-task-detector.execution-time.baseline-lower-bound: 0 slow-task-detector.execution-time.baseline-multiplier: 1 execution.batch.speculative.block-slow-node-duration: 0 {code} Finally, we restart the flink cluster with new configurations and submit the [^testSpeculativeExecution] job to flink cluster. h3. Hybrid shuffle in Speculative Execution report The default value of data consumption constraints in speculative execution is {{ONLY_FINISHED_PRODUCERS}}. This means that if a upstream producer has been finished. We will start a down stream consumer for consuming the data produced by upstream. The screenshot of execution timeline confirms this. !Speculative-Execution-timeline.png|width=600! We can find that once the only once source task has been finished, we start two map tasks simultaneously. The first one will finished soon. And the second one will hang forever. So the downstream sink task started on the half way of the map running time. Because the first map task has been finished. {code:java} 2023-02-21 00:11:37,993 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: Sequence Source (1/1) (ee83d53171224378cb74bdd5681a1399_bc764cd8ddf7a0cff126f51c16239658_0_0) switched from RUNNING to FINISHED. 2023-02-21 00:11:37,996 INFO org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager [] - Clearing resource requirements of job c9562d7c3c921ac7d2f0383c7e250827 2023-02-21 00:11:38,000 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Map (1/2) (ee83d53171224378cb74bdd5681a1399_0a448493b4782967b150582570326227_0_0) switched from CREATED to SCHEDULED. 2023-02-21 00:11:38,001 INFO org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager [] - Received resource requirements from job c9562d7c3c921ac7d2f0383c7e250827: [ResourceRequirement{resourceProfile=ResourceProfile{UNKNOWN}, numberOfRequiredSlots=1}] 2023-02-21 00:11:38,001 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Map (1/2) (ee83d53171224378cb74bdd5681a1399_0a448493b4782967b150582570326227_0_0) switched from SCHEDULED to DEPLOYING. 2023-02-21 00:11:38,001 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Deploying Map (1/2) (attempt #0) with attempt id ee83d53171224378cb74bdd5681a1399_0a448493b4782967b150582570326227_0_0 and vertex id 0a448493b4782967b150582570326227_0 to localhost:41185-563559 @ localhost (dataPort=38645) with allocation id 772ba5994150e188cf2ed4743eafb537 2023-02-21 00:11:38,005 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Map (2/2) (ee83d53171224378cb74bdd5681a1399_0a448493b4782967b150582570326227_1_0) switched from CREATED to SCHEDULED. 2023-02-21 00:11:38,006 INFO org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager [] - Received resource requirements from job c9562d7c3c921ac7d2f0383c7e250827: [ResourceRequirement{resourceProfile=ResourceProfile{UNKNOWN}, numberOfRequiredSlots=2}] 2023-02-21 00:11:38,068 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Map (1/2) (ee83d53171224378cb74bdd5681a1399_0a448493b4782967b150582570326227_0_0) switched from DEPLOYING to INITIALIZING. 2023-02-21 00:11:38,074 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Map (2/2) (ee83d53171224378cb74bdd5681a1399_0a448493b4782967b150582570326227_1_0) switched from SCHEDULED to DEPLOYING. 2023-02-21 00:11:38,074 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Deploying Map (2/2) (attempt #0) with attempt id ee83d53171224378cb74bdd5681a1399_0a448493b4782967b150582570326227_1_0 and vertex id 0a448493b4782967b150582570326227_1 to localhost:41185-563559 @ localhost (dataPort=38645) with allocation id c3cc3ffeb221d7cd327722feaa5d5557 2023-02-21 00:11:38,104 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Map (2/2) (ee83d53171224378cb74bdd5681a1399_0a448493b4782967b150582570326227_1_0) switched from DEPLOYING to INITIALIZING. 2023-02-21 00:11:38,139 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Map (1/2) (ee83d53171224378cb74bdd5681a1399_0a448493b4782967b150582570326227_0_0) switched from INITIALIZING to RUNNING. 2023-02-21 00:11:38,140 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Map (2/2) (ee83d53171224378cb74bdd5681a1399_0a448493b4782967b150582570326227_1_0) switched from INITIALIZING to RUNNING. 2023-02-21 00:11:38,167 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Map (2/2) (ee83d53171224378cb74bdd5681a1399_0a448493b4782967b150582570326227_1_0) switched from RUNNING to FINISHED. 2023-02-21 00:11:38,168 INFO org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager [] - Received resource requirements from job c9562d7c3c921ac7d2f0383c7e250827: [ResourceRequirement{resourceProfile=ResourceProfile{UNKNOWN}, numberOfRequiredSlots=1}] 2023-02-21 00:11:38,168 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Sink: Unnamed (1/1) (ee83d53171224378cb74bdd5681a1399_ea632d67b7d595e5b851708ae9ad79d6_0_0) switched from CREATED to SCHEDULED. 2023-02-21 00:11:38,169 INFO org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager [] - Received resource requirements from job c9562d7c3c921ac7d2f0383c7e250827: [ResourceRequirement{resourceProfile=ResourceProfile{UNKNOWN}, numberOfRequiredSlots=2}] 2023-02-21 00:11:38,169 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Sink: Unnamed (1/1) (ee83d53171224378cb74bdd5681a1399_ea632d67b7d595e5b851708ae9ad79d6_0_0) switched from SCHEDULED to DEPLOYING. 2023-02-21 00:11:38,169 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Deploying Sink: Unnamed (1/1) (attempt #0) with attempt id ee83d53171224378cb74bdd5681a1399_ea632d67b7d595e5b851708ae9ad79d6_0_0 and vertex id ea632d67b7d595e5b851708ae9ad79d6_0 to localhost:41185-563559 @ localhost (dataPort=38645) with allocation id c3cc3ffeb221d7cd327722feaa5d5557 2023-02-21 00:11:38,187 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Sink: Unnamed (1/1) (ee83d53171224378cb74bdd5681a1399_ea632d67b7d595e5b851708ae9ad79d6_0_0) switched from DEPLOYING to INITIALIZING. 2023-02-21 00:11:38,200 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Sink: Unnamed (1/1) (ee83d53171224378cb74bdd5681a1399_ea632d67b7d595e5b851708ae9ad79d6_0_0) switched from INITIALIZING to RUNNING. 2023-02-21 00:11:38,335 INFO org.apache.flink.runtime.jobmaster.JobMaster [] - Map (1/2) (0a448493b4782967b150582570326227_0) is detected as a slow vertex, create and deploy 1 new speculative executions for it. 2023-02-21 00:11:38,339 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Map (1/2) (ee83d53171224378cb74bdd5681a1399_0a448493b4782967b150582570326227_0_1) switched from CREATED to SCHEDULED. 2023-02-21 00:11:38,340 INFO org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager [] - Received resource requirements from job c9562d7c3c921ac7d2f0383c7e250827: [ResourceRequirement{resourceProfile=ResourceProfile{UNKNOWN}, numberOfRequiredSlots=3}] 2023-02-21 00:11:38,415 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Map (1/2) (ee83d53171224378cb74bdd5681a1399_0a448493b4782967b150582570326227_0_1) switched from SCHEDULED to DEPLOYING. 2023-02-21 00:11:38,415 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Deploying Map (1/2) (attempt #1) with attempt id ee83d53171224378cb74bdd5681a1399_0a448493b4782967b150582570326227_0_1 and vertex id 0a448493b4782967b150582570326227_0 to localhost:41185-563559 @ localhost (dataPort=38645) with allocation id c053b04b2b8a67a771a10bb4339cbc2b 2023-02-21 00:11:38,435 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Map (1/2) (ee83d53171224378cb74bdd5681a1399_0a448493b4782967b150582570326227_0_1) switched from DEPLOYING to INITIALIZING. 2023-02-21 00:11:38,443 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Map (1/2) (ee83d53171224378cb74bdd5681a1399_0a448493b4782967b150582570326227_0_1) switched from INITIALIZING to RUNNING. 2023-02-21 00:11:38,463 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Map (1/2) (ee83d53171224378cb74bdd5681a1399_0a448493b4782967b150582570326227_0_1) switched from RUNNING to FINISHED. 2023-02-21 00:11:38,466 INFO org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager [] - Received resource requirements from job c9562d7c3c921ac7d2f0383c7e250827: [ResourceRequirement{resourceProfile=ResourceProfile{UNKNOWN}, numberOfRequiredSlots=2}] 2023-02-21 00:11:38,474 INFO org.apache.flink.runtime.jobmaster.JobMaster [] - Canceling 1 un-finished executions of 0a448493b4782967b150582570326227_0 because one of its executions has finished. 2023-02-21 00:11:38,475 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Map (1/2) (ee83d53171224378cb74bdd5681a1399_0a448493b4782967b150582570326227_0_0) switched from RUNNING to CANCELING. 2023-02-21 00:11:38,492 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Sink: Unnamed (1/1) (ee83d53171224378cb74bdd5681a1399_ea632d67b7d595e5b851708ae9ad79d6_0_0) switched from RUNNING to FINISHED. 2023-02-21 00:11:38,493 INFO org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager [] - Received resource requirements from job c9562d7c3c921ac7d2f0383c7e250827: [ResourceRequirement{resourceProfile=ResourceProfile{UNKNOWN}, numberOfRequiredSlots=1}] 2023-02-21 00:11:38,499 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Map (1/2) (ee83d53171224378cb74bdd5681a1399_0a448493b4782967b150582570326227_0_0) switched from CANCELING to CANCELED. 2023-02-21 00:11:38,500 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job Test Speculative Execution Job (c9562d7c3c921ac7d2f0383c7e250827) switched from state RUNNING to FINISHED. {code} The execution log shows a detailed view of the speculative execution. The {{Deploying Map (1/2)}} has shown twice because its task index is 0. It will sleep forever in the first execution. And when we redeployed this map task, the attempt number is 1. This make the sum of task index and attempt number is 1, which isn't a even number. So the map task can bypass the sleep logic and finished as expect. Because the {{full spilling strategy}} we enabled before, only the map task will be restarted. was (Author: syhily): Thanks for assigning this test task to me. I have finished the verification by following the instructions from [~Weijie Guo]. The {{AdaptiveBatchScheduler}} and {{SpeculativeExecution}} are working as expert. Here are the detailed test process and report from my side. h2. Building the flink on {{release-1.17}} branch I build the flink locally on {{release-1.17}} branch. The latest commit in my local repository is {{6b4745}}. To get the build more faster. I skipped all the code quality check and tests by using command {{mvn clean install -DskipTests -Pfast}}. !flink-1.17-branch-log.png|width=500! The running flink dashboard also proven this is a build from the {{6b4745}} commit. !flink-dashborad-version.png|width=550! We start the local standalone flink cluster for simplifying the test. The task manager is configured with 4 slots. !taskmanager-slots.png|width=700! h2. Test hybrid shuffle with {{AdaptiveBatchScheduler}} The default config value for {{execution.batch-shuffle-mode}} is {{ALL_EXCHANGES_BLOCKING}}, which means that this is a blocking shuffle. To get the hybrid shuffle mode enabled. We change this value to {{ALL_EXCHANGES_HYBRID_SELECTIVE}}, which means the data could be consumed immediately when the downstream task is available. No need to persist the data to disk. Given the AdaptiveBatchScheduler is enabled by default. The default {{jobmanager.partition.hybrid.partition-data-consume-constraint}} in this scheduler is {{UNFINISHED_PRODUCERS}}. This means that the produced data can be consumed immediately, no need to wait for the producer finished. To get hybrid shuffle feature testable. The last thing we need to do is setting enough slots for a task manager. The default flink standalone cluster only has on job manager and one task manager. The test code for this verification is shown in attachments: [^testAdaptiveBatchJob]. It will start a source, a sink and two map functions. So we set the slot to 4 by setting {{taskmanager.numberOfTaskSlots: 4}} in {{flink.yml}}. Finally, we submit the job [^testAdaptiveBatchJob] to flink cluster. h3. Hybrid shuffle in {{AdaptiveBatchScheduler}} report Since all the map operator will sleep for 5 seconds. The first thing we can find in job graph is that the map operator and sink operator are running simultaneously. It this is a default block shuffle, the sink should start after all the map operators have been finished. !AdaptiveBatchScheduler-job-graph.png|width=600! The screenshot of the execution timeline also shows that the source, map and sink are almost running in the same time. !AdaptiveBatchScheduler-timeline.png|width=600! Finally, the log also confirm this. {code} 2023-02-20 23:59:16,558 INFO org.apache.flink.runtime.taskmanager.Task [] - Source: Sequence Source (1/1)#0 (fb0eccc08c3516b1b8ed16977616b147_bc764cd8ddf7a0cff126f51c16239658_0_0) switched from INITIALIZING to RUNNING. 2023-02-20 23:59:16,563 INFO org.apache.flink.runtime.taskmanager.Task [] - Sink: Unnamed (1/1)#0 (fb0eccc08c3516b1b8ed16977616b147_ea632d67b7d595e5b851708ae9ad79d6_0_0) switched from INITIALIZING to RUNNING. 2023-02-20 23:59:16,564 INFO org.apache.flink.runtime.taskmanager.Task [] - Map (2/2)#0 (fb0eccc08c3516b1b8ed16977616b147_0a448493b4782967b150582570326227_1_0) switched from INITIALIZING to RUNNING. 2023-02-20 23:59:16,564 INFO org.apache.flink.runtime.taskmanager.Task [] - Map (1/2)#0 (fb0eccc08c3516b1b8ed16977616b147_0a448493b4782967b150582570326227_0_0) switched from INITIALIZING to RUNNING. {code} h2. Test hybird shuffle in Speculative Execution To get the *Speculative Execution* enabled, we add the {{execution.batch.speculative.enabled: true}} to {{flink.yml}}. We also change the {{execution.batch-shuffle-mode}} to {{ALL_EXCHANGES_HYBRID_FULL}} for persisting all the datas. The test code [^testSpeculativeExecution] in this verification is sleep forever when the index of sub task plus the number of attempt is even. To get the speculative execution detects the slower tasks. We also add a baseline for detecting this hanged tasks more faster. {code} slow-task-detector.execution-time.baseline-ratio: 0.2 slow-task-detector.execution-time.baseline-lower-bound: 0 slow-task-detector.execution-time.baseline-multiplier: 1 execution.batch.speculative.block-slow-node-duration: 0 {code} Finally, we restart the flink cluster with new configurations and submit the [^testSpeculativeExecution] job to flink cluster. h3. Hybrid shuffle in Speculative Execution report The default value of data consumption constraints in speculative execution is {{ONLY_FINISHED_PRODUCERS}}. This means that if a upstream producer has been finished. We will start a down stream consumer for consuming the data produced by upstream. The screenshot of execution timeline confirms this. !Speculative-Execution-timeline.png|width=600! We can find that once the only once source task has been finished, we start two map tasks simultaneously. The first one will finished soon. And the second one will hang forever. So the downstream sink task started on the half way of the map running time. Because the first map task has been finished. {code:java} 2023-02-21 00:11:37,993 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: Sequence Source (1/1) (ee83d53171224378cb74bdd5681a1399_bc764cd8ddf7a0cff126f51c16239658_0_0) switched from RUNNING to FINISHED. 2023-02-21 00:11:37,996 INFO org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager [] - Clearing resource requirements of job c9562d7c3c921ac7d2f0383c7e250827 2023-02-21 00:11:38,000 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Map (1/2) (ee83d53171224378cb74bdd5681a1399_0a448493b4782967b150582570326227_0_0) switched from CREATED to SCHEDULED. 2023-02-21 00:11:38,001 INFO org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager [] - Received resource requirements from job c9562d7c3c921ac7d2f0383c7e250827: [ResourceRequirement{resourceProfile=ResourceProfile{UNKNOWN}, numberOfRequiredSlots=1}] 2023-02-21 00:11:38,001 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Map (1/2) (ee83d53171224378cb74bdd5681a1399_0a448493b4782967b150582570326227_0_0) switched from SCHEDULED to DEPLOYING. 2023-02-21 00:11:38,001 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Deploying Map (1/2) (attempt #0) with attempt id ee83d53171224378cb74bdd5681a1399_0a448493b4782967b150582570326227_0_0 and vertex id 0a448493b4782967b150582570326227_0 to localhost:41185-563559 @ localhost (dataPort=38645) with allocation id 772ba5994150e188cf2ed4743eafb537 2023-02-21 00:11:38,005 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Map (2/2) (ee83d53171224378cb74bdd5681a1399_0a448493b4782967b150582570326227_1_0) switched from CREATED to SCHEDULED. 2023-02-21 00:11:38,006 INFO org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager [] - Received resource requirements from job c9562d7c3c921ac7d2f0383c7e250827: [ResourceRequirement{resourceProfile=ResourceProfile{UNKNOWN}, numberOfRequiredSlots=2}] 2023-02-21 00:11:38,068 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Map (1/2) (ee83d53171224378cb74bdd5681a1399_0a448493b4782967b150582570326227_0_0) switched from DEPLOYING to INITIALIZING. 2023-02-21 00:11:38,074 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Map (2/2) (ee83d53171224378cb74bdd5681a1399_0a448493b4782967b150582570326227_1_0) switched from SCHEDULED to DEPLOYING. 2023-02-21 00:11:38,074 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Deploying Map (2/2) (attempt #0) with attempt id ee83d53171224378cb74bdd5681a1399_0a448493b4782967b150582570326227_1_0 and vertex id 0a448493b4782967b150582570326227_1 to localhost:41185-563559 @ localhost (dataPort=38645) with allocation id c3cc3ffeb221d7cd327722feaa5d5557 2023-02-21 00:11:38,104 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Map (2/2) (ee83d53171224378cb74bdd5681a1399_0a448493b4782967b150582570326227_1_0) switched from DEPLOYING to INITIALIZING. 2023-02-21 00:11:38,139 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Map (1/2) (ee83d53171224378cb74bdd5681a1399_0a448493b4782967b150582570326227_0_0) switched from INITIALIZING to RUNNING. 2023-02-21 00:11:38,140 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Map (2/2) (ee83d53171224378cb74bdd5681a1399_0a448493b4782967b150582570326227_1_0) switched from INITIALIZING to RUNNING. 2023-02-21 00:11:38,167 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Map (2/2) (ee83d53171224378cb74bdd5681a1399_0a448493b4782967b150582570326227_1_0) switched from RUNNING to FINISHED. 2023-02-21 00:11:38,168 INFO org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager [] - Received resource requirements from job c9562d7c3c921ac7d2f0383c7e250827: [ResourceRequirement{resourceProfile=ResourceProfile{UNKNOWN}, numberOfRequiredSlots=1}] 2023-02-21 00:11:38,168 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Sink: Unnamed (1/1) (ee83d53171224378cb74bdd5681a1399_ea632d67b7d595e5b851708ae9ad79d6_0_0) switched from CREATED to SCHEDULED. 2023-02-21 00:11:38,169 INFO org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager [] - Received resource requirements from job c9562d7c3c921ac7d2f0383c7e250827: [ResourceRequirement{resourceProfile=ResourceProfile{UNKNOWN}, numberOfRequiredSlots=2}] 2023-02-21 00:11:38,169 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Sink: Unnamed (1/1) (ee83d53171224378cb74bdd5681a1399_ea632d67b7d595e5b851708ae9ad79d6_0_0) switched from SCHEDULED to DEPLOYING. 2023-02-21 00:11:38,169 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Deploying Sink: Unnamed (1/1) (attempt #0) with attempt id ee83d53171224378cb74bdd5681a1399_ea632d67b7d595e5b851708ae9ad79d6_0_0 and vertex id ea632d67b7d595e5b851708ae9ad79d6_0 to localhost:41185-563559 @ localhost (dataPort=38645) with allocation id c3cc3ffeb221d7cd327722feaa5d5557 2023-02-21 00:11:38,187 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Sink: Unnamed (1/1) (ee83d53171224378cb74bdd5681a1399_ea632d67b7d595e5b851708ae9ad79d6_0_0) switched from DEPLOYING to INITIALIZING. 2023-02-21 00:11:38,200 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Sink: Unnamed (1/1) (ee83d53171224378cb74bdd5681a1399_ea632d67b7d595e5b851708ae9ad79d6_0_0) switched from INITIALIZING to RUNNING. 2023-02-21 00:11:38,335 INFO org.apache.flink.runtime.jobmaster.JobMaster [] - Map (1/2) (0a448493b4782967b150582570326227_0) is detected as a slow vertex, create and deploy 1 new speculative executions for it. 2023-02-21 00:11:38,339 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Map (1/2) (ee83d53171224378cb74bdd5681a1399_0a448493b4782967b150582570326227_0_1) switched from CREATED to SCHEDULED. 2023-02-21 00:11:38,340 INFO org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager [] - Received resource requirements from job c9562d7c3c921ac7d2f0383c7e250827: [ResourceRequirement{resourceProfile=ResourceProfile{UNKNOWN}, numberOfRequiredSlots=3}] 2023-02-21 00:11:38,415 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Map (1/2) (ee83d53171224378cb74bdd5681a1399_0a448493b4782967b150582570326227_0_1) switched from SCHEDULED to DEPLOYING. 2023-02-21 00:11:38,415 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Deploying Map (1/2) (attempt #1) with attempt id ee83d53171224378cb74bdd5681a1399_0a448493b4782967b150582570326227_0_1 and vertex id 0a448493b4782967b150582570326227_0 to localhost:41185-563559 @ localhost (dataPort=38645) with allocation id c053b04b2b8a67a771a10bb4339cbc2b 2023-02-21 00:11:38,435 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Map (1/2) (ee83d53171224378cb74bdd5681a1399_0a448493b4782967b150582570326227_0_1) switched from DEPLOYING to INITIALIZING. 2023-02-21 00:11:38,443 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Map (1/2) (ee83d53171224378cb74bdd5681a1399_0a448493b4782967b150582570326227_0_1) switched from INITIALIZING to RUNNING. 2023-02-21 00:11:38,463 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Map (1/2) (ee83d53171224378cb74bdd5681a1399_0a448493b4782967b150582570326227_0_1) switched from RUNNING to FINISHED. 2023-02-21 00:11:38,466 INFO org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager [] - Received resource requirements from job c9562d7c3c921ac7d2f0383c7e250827: [ResourceRequirement{resourceProfile=ResourceProfile{UNKNOWN}, numberOfRequiredSlots=2}] 2023-02-21 00:11:38,474 INFO org.apache.flink.runtime.jobmaster.JobMaster [] - Canceling 1 un-finished executions of 0a448493b4782967b150582570326227_0 because one of its executions has finished. 2023-02-21 00:11:38,475 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Map (1/2) (ee83d53171224378cb74bdd5681a1399_0a448493b4782967b150582570326227_0_0) switched from RUNNING to CANCELING. 2023-02-21 00:11:38,492 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Sink: Unnamed (1/1) (ee83d53171224378cb74bdd5681a1399_ea632d67b7d595e5b851708ae9ad79d6_0_0) switched from RUNNING to FINISHED. 2023-02-21 00:11:38,493 INFO org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager [] - Received resource requirements from job c9562d7c3c921ac7d2f0383c7e250827: [ResourceRequirement{resourceProfile=ResourceProfile{UNKNOWN}, numberOfRequiredSlots=1}] 2023-02-21 00:11:38,499 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Map (1/2) (ee83d53171224378cb74bdd5681a1399_0a448493b4782967b150582570326227_0_0) switched from CANCELING to CANCELED. 2023-02-21 00:11:38,500 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job Test Speculative Execution Job (c9562d7c3c921ac7d2f0383c7e250827) switched from state RUNNING to FINISHED. {code} The execution log shows a detailed view of the speculative execution. The {{Deploying Map (1/2)}} has shown twice because its task index is 0. It will sleep forever in the first execution. And when we redeployed this map task, the attempt number is 1. This make the sum of task index and attempt number is 1, which isn't a even number. So the map task can bypass the sleep logic and finished as expect. Because the {{full spilling strategy}} we enabled before, only the map task will be restarted. > Release Testing: Verify FLINK-29766 Adaptive Batch Scheduler should also work > with hybrid shuffle mode > ------------------------------------------------------------------------------------------------------ > > Key: FLINK-30938 > URL: https://issues.apache.org/jira/browse/FLINK-30938 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Coordination > Affects Versions: 1.17.0 > Reporter: Weijie Guo > Assignee: Yufan Sheng > Priority: Blocker > Labels: release-testing > Attachments: AdaptiveBatchScheduler-job-graph.png, > AdaptiveBatchScheduler-timeline.png, Speculative-Execution-timeline.png, > flink-1.17-branch-log.png, flink-dashborad-version.png, > taskmanager-slots.png, testAdaptiveBatchJob, testSpeculativeExecution > > > This ticket aims for verifying FLINK-29766: Adaptive Batch Scheduler should > also work with hybrid shuffle mode. > More details about this feature and how to use it can be found in this > [documentation|https://nightlies.apache.org/flink/flink-docs-master/docs/ops/batch/batch_shuffle/#hybrid-shuffle]. > The verification is divided into two parts: > Part I: Verify hybrid shuffle can work with AdaptiveBatchScheduler > Write a simple Flink batch job using hybrid shuffle mode and submit this job. > Note that in flink-1.17, AdaptiveBatchScheduler is the default scheduler for > batch job, so you do not need other configuration. > Suppose your job's topology like source -> map -> sink, if your cluster have > enough slots, you should find that source and map are running at the same > time. > Part II: Verify hybrid shuffle can work with Speculative Execution > Write a Flink batch job using hybrid shuffle mode which has a subtask running > much slower than others (e.g. sleep indefinitely if it runs on a certain > host, the hostname can be retrieved via > InetAddress.getLocalHost().getHostName(), or if its (subtaskIndex + > attemptNumer) % 2 == 0) > Modify Flink configuration file to enable speculative execution and tune the > configuration as you like > Submit the job. Checking the web UI, logs, metrics and produced result. > You should find that once a producer task's one subtask finished, all its > consumer tasks can be scheduled in log. -- This message was sent by Atlassian Jira (v8.20.10#820010)