[ 
https://issues.apache.org/jira/browse/FLINK-30938?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17691244#comment-17691244
 ] 

Yufan Sheng edited comment on FLINK-30938 at 2/20/23 5:38 PM:
--------------------------------------------------------------

Thanks for assigning this test task to me. I have finished the verification by 
following the instructions from [~Weijie Guo]. The {{AdaptiveBatchScheduler}} 
and {{SpeculativeExecution}} are working as expert. Here are the detailed test 
process and report from my side.

h2. Building the flink on {{release-1.17}} branch

I build the flink locally on {{release-1.17}} branch. The latest commit in my 
local repository is {{6b4745}}. To get the build more faster. I skipped all the 
code quality check and tests by using command {{mvn clean install -DskipTests 
-Pfast}}.

 !flink-1.17-branch-log.png|width=500! 

The running flink dashboard also proven this is a build from the {{6b4745}} 
commit.

 !flink-dashborad-version.png|width=550! 

We start the local standalone flink cluster for simplifying the test. The task 
manager is configured with 4 slots.

 !taskmanager-slots.png|width=700! 

h2. Test hybrid shuffle with {{AdaptiveBatchScheduler}}

The default config value for {{execution.batch-shuffle-mode}} is 
{{ALL_EXCHANGES_BLOCKING}}, which means that this is a blocking shuffle. To get 
the hybrid shuffle mode enabled. We change this value to 
{{ALL_EXCHANGES_HYBRID_SELECTIVE}}, which means the data could be consumed 
immediately when the downstream task is available. No need to persist the data 
to disk.

Given the AdaptiveBatchScheduler is enabled by default. The default 
{{jobmanager.partition.hybrid.partition-data-consume-constraint}} in this 
scheduler is {{UNFINISHED_PRODUCERS}}. This means that the produced data can be 
consumed immediately, no need to wait for the producer finished.

To get hybrid shuffle feature testable. The last thing we need to do is setting 
enough slots for a task manager. The default flink standalone cluster only has 
on job manager and one task manager. The test code for this verification is 
shown in attachments: [^testAdaptiveBatchJob]. It will start a source, a sink 
and two map functions. So we set the slot to 4 by setting 
{{taskmanager.numberOfTaskSlots: 4}} in {{flink.yml}}.

Finally, we submit the job [^testAdaptiveBatchJob] to flink cluster. 

h3. Hybrid shuffle in {{AdaptiveBatchScheduler}} report

Since all the map operator will sleep for 5 seconds. The first thing we can 
find in job graph is that the map operator and sink operator are running 
simultaneously. If this is a default block
 shuffle, the sink should start after all the map operators have been finished.

!AdaptiveBatchScheduler-job-graph.png|width=600!

The screenshot of the execution timeline also shows that the source, map and 
sink are almost running in the same time.

!AdaptiveBatchScheduler-timeline.png|width=600!

Finally, the log also confirm this.

{code}
2023-02-20 23:59:16,558 INFO  org.apache.flink.runtime.taskmanager.Task         
           [] - Source: Sequence Source (1/1)#0 
(fb0eccc08c3516b1b8ed16977616b147_bc764cd8ddf7a0cff126f51c16239658_0_0) 
switched from INITIALIZING to RUNNING.
2023-02-20 23:59:16,563 INFO  org.apache.flink.runtime.taskmanager.Task         
           [] - Sink: Unnamed (1/1)#0 
(fb0eccc08c3516b1b8ed16977616b147_ea632d67b7d595e5b851708ae9ad79d6_0_0) 
switched from INITIALIZING to RUNNING.
2023-02-20 23:59:16,564 INFO  org.apache.flink.runtime.taskmanager.Task         
           [] - Map (2/2)#0 
(fb0eccc08c3516b1b8ed16977616b147_0a448493b4782967b150582570326227_1_0) 
switched from INITIALIZING to RUNNING.
2023-02-20 23:59:16,564 INFO  org.apache.flink.runtime.taskmanager.Task         
           [] - Map (1/2)#0 
(fb0eccc08c3516b1b8ed16977616b147_0a448493b4782967b150582570326227_0_0) 
switched from INITIALIZING to RUNNING.
{code}

h2. Test hybird shuffle in Speculative Execution

To get the *Speculative Execution* enabled, we add the 
{{execution.batch.speculative.enabled: true}} to {{flink.yml}}. We also change 
the {{execution.batch-shuffle-mode}} to {{ALL_EXCHANGES_HYBRID_FULL}} for 
persisting all the datas.

The test code [^testSpeculativeExecution] in this verification is sleep forever 
when the index of sub task plus the number of attempt is even. To get the 
speculative execution detects the slower tasks. We also add a baseline for 
detecting this hanged tasks more faster.

{code}
slow-task-detector.execution-time.baseline-ratio: 0.2
slow-task-detector.execution-time.baseline-lower-bound: 0
slow-task-detector.execution-time.baseline-multiplier: 1
execution.batch.speculative.block-slow-node-duration: 0
{code}

Finally, we restart the flink cluster with new configurations and submit the 
[^testSpeculativeExecution] job to flink cluster. 

h3. Hybrid shuffle in Speculative Execution report

The default value of data consumption constraints in speculative execution is 
{{ONLY_FINISHED_PRODUCERS}}. This means that if a upstream producer has been 
finished. We will start a down stream consumer for consuming the data produced 
by upstream. The screenshot of execution timeline confirms this.

!Speculative-Execution-timeline.png|width=600!

We can find that once the only once source task has been finished, we start two 
map tasks simultaneously. The first one will finished soon. And the second one 
will hang forever. So the downstream sink task started on the half way of the 
map running time. Because the first map task has been finished.

{code:java}
2023-02-21 00:11:37,993 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Source: 
Sequence Source (1/1) 
(ee83d53171224378cb74bdd5681a1399_bc764cd8ddf7a0cff126f51c16239658_0_0) 
switched from RUNNING to FINISHED.
2023-02-21 00:11:37,996 INFO  
org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager [] 
- Clearing resource requirements of job c9562d7c3c921ac7d2f0383c7e250827
2023-02-21 00:11:38,000 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Map (1/2) 
(ee83d53171224378cb74bdd5681a1399_0a448493b4782967b150582570326227_0_0) 
switched from CREATED to SCHEDULED.
2023-02-21 00:11:38,001 INFO  
org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager [] 
- Received resource requirements from job c9562d7c3c921ac7d2f0383c7e250827: 
[ResourceRequirement{resourceProfile=ResourceProfile{UNKNOWN}, 
numberOfRequiredSlots=1}]
2023-02-21 00:11:38,001 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Map (1/2) 
(ee83d53171224378cb74bdd5681a1399_0a448493b4782967b150582570326227_0_0) 
switched from SCHEDULED to DEPLOYING.
2023-02-21 00:11:38,001 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Deploying Map 
(1/2) (attempt #0) with attempt id 
ee83d53171224378cb74bdd5681a1399_0a448493b4782967b150582570326227_0_0 and 
vertex id 0a448493b4782967b150582570326227_0 to localhost:41185-563559 @ 
localhost (dataPort=38645) with allocation id 772ba5994150e188cf2ed4743eafb537
2023-02-21 00:11:38,005 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Map (2/2) 
(ee83d53171224378cb74bdd5681a1399_0a448493b4782967b150582570326227_1_0) 
switched from CREATED to SCHEDULED.
2023-02-21 00:11:38,006 INFO  
org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager [] 
- Received resource requirements from job c9562d7c3c921ac7d2f0383c7e250827: 
[ResourceRequirement{resourceProfile=ResourceProfile{UNKNOWN}, 
numberOfRequiredSlots=2}]
2023-02-21 00:11:38,068 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Map (1/2) 
(ee83d53171224378cb74bdd5681a1399_0a448493b4782967b150582570326227_0_0) 
switched from DEPLOYING to INITIALIZING.
2023-02-21 00:11:38,074 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Map (2/2) 
(ee83d53171224378cb74bdd5681a1399_0a448493b4782967b150582570326227_1_0) 
switched from SCHEDULED to DEPLOYING.
2023-02-21 00:11:38,074 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Deploying Map 
(2/2) (attempt #0) with attempt id 
ee83d53171224378cb74bdd5681a1399_0a448493b4782967b150582570326227_1_0 and 
vertex id 0a448493b4782967b150582570326227_1 to localhost:41185-563559 @ 
localhost (dataPort=38645) with allocation id c3cc3ffeb221d7cd327722feaa5d5557
2023-02-21 00:11:38,104 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Map (2/2) 
(ee83d53171224378cb74bdd5681a1399_0a448493b4782967b150582570326227_1_0) 
switched from DEPLOYING to INITIALIZING.
2023-02-21 00:11:38,139 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Map (1/2) 
(ee83d53171224378cb74bdd5681a1399_0a448493b4782967b150582570326227_0_0) 
switched from INITIALIZING to RUNNING.
2023-02-21 00:11:38,140 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Map (2/2) 
(ee83d53171224378cb74bdd5681a1399_0a448493b4782967b150582570326227_1_0) 
switched from INITIALIZING to RUNNING.
2023-02-21 00:11:38,167 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Map (2/2) 
(ee83d53171224378cb74bdd5681a1399_0a448493b4782967b150582570326227_1_0) 
switched from RUNNING to FINISHED.
2023-02-21 00:11:38,168 INFO  
org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager [] 
- Received resource requirements from job c9562d7c3c921ac7d2f0383c7e250827: 
[ResourceRequirement{resourceProfile=ResourceProfile{UNKNOWN}, 
numberOfRequiredSlots=1}]
2023-02-21 00:11:38,168 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Sink: Unnamed 
(1/1) (ee83d53171224378cb74bdd5681a1399_ea632d67b7d595e5b851708ae9ad79d6_0_0) 
switched from CREATED to SCHEDULED.
2023-02-21 00:11:38,169 INFO  
org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager [] 
- Received resource requirements from job c9562d7c3c921ac7d2f0383c7e250827: 
[ResourceRequirement{resourceProfile=ResourceProfile{UNKNOWN}, 
numberOfRequiredSlots=2}]
2023-02-21 00:11:38,169 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Sink: Unnamed 
(1/1) (ee83d53171224378cb74bdd5681a1399_ea632d67b7d595e5b851708ae9ad79d6_0_0) 
switched from SCHEDULED to DEPLOYING.
2023-02-21 00:11:38,169 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Deploying 
Sink: Unnamed (1/1) (attempt #0) with attempt id 
ee83d53171224378cb74bdd5681a1399_ea632d67b7d595e5b851708ae9ad79d6_0_0 and 
vertex id ea632d67b7d595e5b851708ae9ad79d6_0 to localhost:41185-563559 @ 
localhost (dataPort=38645) with allocation id c3cc3ffeb221d7cd327722feaa5d5557
2023-02-21 00:11:38,187 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Sink: Unnamed 
(1/1) (ee83d53171224378cb74bdd5681a1399_ea632d67b7d595e5b851708ae9ad79d6_0_0) 
switched from DEPLOYING to INITIALIZING.
2023-02-21 00:11:38,200 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Sink: Unnamed 
(1/1) (ee83d53171224378cb74bdd5681a1399_ea632d67b7d595e5b851708ae9ad79d6_0_0) 
switched from INITIALIZING to RUNNING.
2023-02-21 00:11:38,335 INFO  org.apache.flink.runtime.jobmaster.JobMaster      
           [] - Map (1/2) (0a448493b4782967b150582570326227_0) is detected as a 
slow vertex, create and deploy 1 new speculative executions for it.
2023-02-21 00:11:38,339 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Map (1/2) 
(ee83d53171224378cb74bdd5681a1399_0a448493b4782967b150582570326227_0_1) 
switched from CREATED to SCHEDULED.
2023-02-21 00:11:38,340 INFO  
org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager [] 
- Received resource requirements from job c9562d7c3c921ac7d2f0383c7e250827: 
[ResourceRequirement{resourceProfile=ResourceProfile{UNKNOWN}, 
numberOfRequiredSlots=3}]
2023-02-21 00:11:38,415 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Map (1/2) 
(ee83d53171224378cb74bdd5681a1399_0a448493b4782967b150582570326227_0_1) 
switched from SCHEDULED to DEPLOYING.
2023-02-21 00:11:38,415 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Deploying Map 
(1/2) (attempt #1) with attempt id 
ee83d53171224378cb74bdd5681a1399_0a448493b4782967b150582570326227_0_1 and 
vertex id 0a448493b4782967b150582570326227_0 to localhost:41185-563559 @ 
localhost (dataPort=38645) with allocation id c053b04b2b8a67a771a10bb4339cbc2b
2023-02-21 00:11:38,435 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Map (1/2) 
(ee83d53171224378cb74bdd5681a1399_0a448493b4782967b150582570326227_0_1) 
switched from DEPLOYING to INITIALIZING.
2023-02-21 00:11:38,443 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Map (1/2) 
(ee83d53171224378cb74bdd5681a1399_0a448493b4782967b150582570326227_0_1) 
switched from INITIALIZING to RUNNING.
2023-02-21 00:11:38,463 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Map (1/2) 
(ee83d53171224378cb74bdd5681a1399_0a448493b4782967b150582570326227_0_1) 
switched from RUNNING to FINISHED.
2023-02-21 00:11:38,466 INFO  
org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager [] 
- Received resource requirements from job c9562d7c3c921ac7d2f0383c7e250827: 
[ResourceRequirement{resourceProfile=ResourceProfile{UNKNOWN}, 
numberOfRequiredSlots=2}]
2023-02-21 00:11:38,474 INFO  org.apache.flink.runtime.jobmaster.JobMaster      
           [] - Canceling 1 un-finished executions of 
0a448493b4782967b150582570326227_0 because one of its executions has finished.
2023-02-21 00:11:38,475 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Map (1/2) 
(ee83d53171224378cb74bdd5681a1399_0a448493b4782967b150582570326227_0_0) 
switched from RUNNING to CANCELING.
2023-02-21 00:11:38,492 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Sink: Unnamed 
(1/1) (ee83d53171224378cb74bdd5681a1399_ea632d67b7d595e5b851708ae9ad79d6_0_0) 
switched from RUNNING to FINISHED.
2023-02-21 00:11:38,493 INFO  
org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager [] 
- Received resource requirements from job c9562d7c3c921ac7d2f0383c7e250827: 
[ResourceRequirement{resourceProfile=ResourceProfile{UNKNOWN}, 
numberOfRequiredSlots=1}]
2023-02-21 00:11:38,499 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Map (1/2) 
(ee83d53171224378cb74bdd5681a1399_0a448493b4782967b150582570326227_0_0) 
switched from CANCELING to CANCELED.
2023-02-21 00:11:38,500 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Job Test 
Speculative Execution Job (c9562d7c3c921ac7d2f0383c7e250827) switched from 
state RUNNING to FINISHED.
{code}

The execution log shows a detailed view of the speculative execution. The 
{{Deploying Map (1/2)}} has shown twice because its task index is 0. It will 
sleep forever in the first execution. And when we redeployed this map task, the 
attempt number is 1. This make the sum of task index and attempt number is 1, 
which isn't a even number. So the map task can bypass the sleep logic and 
finished as expect. Because the {{full spilling strategy}} we enabled before, 
only the map task will be restarted.


was (Author: syhily):
Thanks for assigning this test task to me. I have finished the verification by 
following the instructions from [~Weijie Guo]. The {{AdaptiveBatchScheduler}} 
and {{SpeculativeExecution}} are working as expert. Here are the detailed test 
process and report from my side.

h2. Building the flink on {{release-1.17}} branch

I build the flink locally on {{release-1.17}} branch. The latest commit in my 
local repository is {{6b4745}}. To get the build more faster. I skipped all the 
code quality check and tests by using command {{mvn clean install -DskipTests 
-Pfast}}.

 !flink-1.17-branch-log.png|width=500! 

The running flink dashboard also proven this is a build from the {{6b4745}} 
commit.

 !flink-dashborad-version.png|width=550! 

We start the local standalone flink cluster for simplifying the test. The task 
manager is configured with 4 slots.

 !taskmanager-slots.png|width=700! 

h2. Test hybrid shuffle with {{AdaptiveBatchScheduler}}

The default config value for {{execution.batch-shuffle-mode}} is 
{{ALL_EXCHANGES_BLOCKING}}, which means that this is a blocking shuffle. To get 
the hybrid shuffle mode enabled. We change this value to 
{{ALL_EXCHANGES_HYBRID_SELECTIVE}}, which means the data could be consumed 
immediately when the downstream task is available. No need to persist the data 
to disk.

Given the AdaptiveBatchScheduler is enabled by default. The default 
{{jobmanager.partition.hybrid.partition-data-consume-constraint}} in this 
scheduler is {{UNFINISHED_PRODUCERS}}. This means that the produced data can be 
consumed immediately, no need to wait for the producer finished.

To get hybrid shuffle feature testable. The last thing we need to do is setting 
enough slots for a task manager. The default flink standalone cluster only has 
on job manager and one task manager. The test code for this verification is 
shown in attachments: [^testAdaptiveBatchJob]. It will start a source, a sink 
and two map functions. So we set the slot to 4 by setting 
{{taskmanager.numberOfTaskSlots: 4}} in {{flink.yml}}.

Finally, we submit the job [^testAdaptiveBatchJob] to flink cluster. 

h3. Hybrid shuffle in {{AdaptiveBatchScheduler}} report

Since all the map operator will sleep for 5 seconds. The first thing we can 
find in job graph is that the map operator and sink operator are running 
simultaneously. It this is a default block
 shuffle, the sink should start after all the map operators have been finished.

!AdaptiveBatchScheduler-job-graph.png|width=600!

The screenshot of the execution timeline also shows that the source, map and 
sink are almost running in the same time.

!AdaptiveBatchScheduler-timeline.png|width=600!

Finally, the log also confirm this.

{code}
2023-02-20 23:59:16,558 INFO  org.apache.flink.runtime.taskmanager.Task         
           [] - Source: Sequence Source (1/1)#0 
(fb0eccc08c3516b1b8ed16977616b147_bc764cd8ddf7a0cff126f51c16239658_0_0) 
switched from INITIALIZING to RUNNING.
2023-02-20 23:59:16,563 INFO  org.apache.flink.runtime.taskmanager.Task         
           [] - Sink: Unnamed (1/1)#0 
(fb0eccc08c3516b1b8ed16977616b147_ea632d67b7d595e5b851708ae9ad79d6_0_0) 
switched from INITIALIZING to RUNNING.
2023-02-20 23:59:16,564 INFO  org.apache.flink.runtime.taskmanager.Task         
           [] - Map (2/2)#0 
(fb0eccc08c3516b1b8ed16977616b147_0a448493b4782967b150582570326227_1_0) 
switched from INITIALIZING to RUNNING.
2023-02-20 23:59:16,564 INFO  org.apache.flink.runtime.taskmanager.Task         
           [] - Map (1/2)#0 
(fb0eccc08c3516b1b8ed16977616b147_0a448493b4782967b150582570326227_0_0) 
switched from INITIALIZING to RUNNING.
{code}

h2. Test hybird shuffle in Speculative Execution

To get the *Speculative Execution* enabled, we add the 
{{execution.batch.speculative.enabled: true}} to {{flink.yml}}. We also change 
the {{execution.batch-shuffle-mode}} to {{ALL_EXCHANGES_HYBRID_FULL}} for 
persisting all the datas.

The test code [^testSpeculativeExecution] in this verification is sleep forever 
when the index of sub task plus the number of attempt is even. To get the 
speculative execution detects the slower tasks. We also add a baseline for 
detecting this hanged tasks more faster.

{code}
slow-task-detector.execution-time.baseline-ratio: 0.2
slow-task-detector.execution-time.baseline-lower-bound: 0
slow-task-detector.execution-time.baseline-multiplier: 1
execution.batch.speculative.block-slow-node-duration: 0
{code}

Finally, we restart the flink cluster with new configurations and submit the 
[^testSpeculativeExecution] job to flink cluster. 

h3. Hybrid shuffle in Speculative Execution report

The default value of data consumption constraints in speculative execution is 
{{ONLY_FINISHED_PRODUCERS}}. This means that if a upstream producer has been 
finished. We will start a down stream consumer for consuming the data produced 
by upstream. The screenshot of execution timeline confirms this.

!Speculative-Execution-timeline.png|width=600!

We can find that once the only once source task has been finished, we start two 
map tasks simultaneously. The first one will finished soon. And the second one 
will hang forever. So the downstream sink task started on the half way of the 
map running time. Because the first map task has been finished.

{code:java}
2023-02-21 00:11:37,993 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Source: 
Sequence Source (1/1) 
(ee83d53171224378cb74bdd5681a1399_bc764cd8ddf7a0cff126f51c16239658_0_0) 
switched from RUNNING to FINISHED.
2023-02-21 00:11:37,996 INFO  
org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager [] 
- Clearing resource requirements of job c9562d7c3c921ac7d2f0383c7e250827
2023-02-21 00:11:38,000 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Map (1/2) 
(ee83d53171224378cb74bdd5681a1399_0a448493b4782967b150582570326227_0_0) 
switched from CREATED to SCHEDULED.
2023-02-21 00:11:38,001 INFO  
org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager [] 
- Received resource requirements from job c9562d7c3c921ac7d2f0383c7e250827: 
[ResourceRequirement{resourceProfile=ResourceProfile{UNKNOWN}, 
numberOfRequiredSlots=1}]
2023-02-21 00:11:38,001 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Map (1/2) 
(ee83d53171224378cb74bdd5681a1399_0a448493b4782967b150582570326227_0_0) 
switched from SCHEDULED to DEPLOYING.
2023-02-21 00:11:38,001 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Deploying Map 
(1/2) (attempt #0) with attempt id 
ee83d53171224378cb74bdd5681a1399_0a448493b4782967b150582570326227_0_0 and 
vertex id 0a448493b4782967b150582570326227_0 to localhost:41185-563559 @ 
localhost (dataPort=38645) with allocation id 772ba5994150e188cf2ed4743eafb537
2023-02-21 00:11:38,005 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Map (2/2) 
(ee83d53171224378cb74bdd5681a1399_0a448493b4782967b150582570326227_1_0) 
switched from CREATED to SCHEDULED.
2023-02-21 00:11:38,006 INFO  
org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager [] 
- Received resource requirements from job c9562d7c3c921ac7d2f0383c7e250827: 
[ResourceRequirement{resourceProfile=ResourceProfile{UNKNOWN}, 
numberOfRequiredSlots=2}]
2023-02-21 00:11:38,068 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Map (1/2) 
(ee83d53171224378cb74bdd5681a1399_0a448493b4782967b150582570326227_0_0) 
switched from DEPLOYING to INITIALIZING.
2023-02-21 00:11:38,074 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Map (2/2) 
(ee83d53171224378cb74bdd5681a1399_0a448493b4782967b150582570326227_1_0) 
switched from SCHEDULED to DEPLOYING.
2023-02-21 00:11:38,074 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Deploying Map 
(2/2) (attempt #0) with attempt id 
ee83d53171224378cb74bdd5681a1399_0a448493b4782967b150582570326227_1_0 and 
vertex id 0a448493b4782967b150582570326227_1 to localhost:41185-563559 @ 
localhost (dataPort=38645) with allocation id c3cc3ffeb221d7cd327722feaa5d5557
2023-02-21 00:11:38,104 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Map (2/2) 
(ee83d53171224378cb74bdd5681a1399_0a448493b4782967b150582570326227_1_0) 
switched from DEPLOYING to INITIALIZING.
2023-02-21 00:11:38,139 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Map (1/2) 
(ee83d53171224378cb74bdd5681a1399_0a448493b4782967b150582570326227_0_0) 
switched from INITIALIZING to RUNNING.
2023-02-21 00:11:38,140 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Map (2/2) 
(ee83d53171224378cb74bdd5681a1399_0a448493b4782967b150582570326227_1_0) 
switched from INITIALIZING to RUNNING.
2023-02-21 00:11:38,167 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Map (2/2) 
(ee83d53171224378cb74bdd5681a1399_0a448493b4782967b150582570326227_1_0) 
switched from RUNNING to FINISHED.
2023-02-21 00:11:38,168 INFO  
org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager [] 
- Received resource requirements from job c9562d7c3c921ac7d2f0383c7e250827: 
[ResourceRequirement{resourceProfile=ResourceProfile{UNKNOWN}, 
numberOfRequiredSlots=1}]
2023-02-21 00:11:38,168 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Sink: Unnamed 
(1/1) (ee83d53171224378cb74bdd5681a1399_ea632d67b7d595e5b851708ae9ad79d6_0_0) 
switched from CREATED to SCHEDULED.
2023-02-21 00:11:38,169 INFO  
org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager [] 
- Received resource requirements from job c9562d7c3c921ac7d2f0383c7e250827: 
[ResourceRequirement{resourceProfile=ResourceProfile{UNKNOWN}, 
numberOfRequiredSlots=2}]
2023-02-21 00:11:38,169 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Sink: Unnamed 
(1/1) (ee83d53171224378cb74bdd5681a1399_ea632d67b7d595e5b851708ae9ad79d6_0_0) 
switched from SCHEDULED to DEPLOYING.
2023-02-21 00:11:38,169 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Deploying 
Sink: Unnamed (1/1) (attempt #0) with attempt id 
ee83d53171224378cb74bdd5681a1399_ea632d67b7d595e5b851708ae9ad79d6_0_0 and 
vertex id ea632d67b7d595e5b851708ae9ad79d6_0 to localhost:41185-563559 @ 
localhost (dataPort=38645) with allocation id c3cc3ffeb221d7cd327722feaa5d5557
2023-02-21 00:11:38,187 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Sink: Unnamed 
(1/1) (ee83d53171224378cb74bdd5681a1399_ea632d67b7d595e5b851708ae9ad79d6_0_0) 
switched from DEPLOYING to INITIALIZING.
2023-02-21 00:11:38,200 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Sink: Unnamed 
(1/1) (ee83d53171224378cb74bdd5681a1399_ea632d67b7d595e5b851708ae9ad79d6_0_0) 
switched from INITIALIZING to RUNNING.
2023-02-21 00:11:38,335 INFO  org.apache.flink.runtime.jobmaster.JobMaster      
           [] - Map (1/2) (0a448493b4782967b150582570326227_0) is detected as a 
slow vertex, create and deploy 1 new speculative executions for it.
2023-02-21 00:11:38,339 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Map (1/2) 
(ee83d53171224378cb74bdd5681a1399_0a448493b4782967b150582570326227_0_1) 
switched from CREATED to SCHEDULED.
2023-02-21 00:11:38,340 INFO  
org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager [] 
- Received resource requirements from job c9562d7c3c921ac7d2f0383c7e250827: 
[ResourceRequirement{resourceProfile=ResourceProfile{UNKNOWN}, 
numberOfRequiredSlots=3}]
2023-02-21 00:11:38,415 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Map (1/2) 
(ee83d53171224378cb74bdd5681a1399_0a448493b4782967b150582570326227_0_1) 
switched from SCHEDULED to DEPLOYING.
2023-02-21 00:11:38,415 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Deploying Map 
(1/2) (attempt #1) with attempt id 
ee83d53171224378cb74bdd5681a1399_0a448493b4782967b150582570326227_0_1 and 
vertex id 0a448493b4782967b150582570326227_0 to localhost:41185-563559 @ 
localhost (dataPort=38645) with allocation id c053b04b2b8a67a771a10bb4339cbc2b
2023-02-21 00:11:38,435 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Map (1/2) 
(ee83d53171224378cb74bdd5681a1399_0a448493b4782967b150582570326227_0_1) 
switched from DEPLOYING to INITIALIZING.
2023-02-21 00:11:38,443 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Map (1/2) 
(ee83d53171224378cb74bdd5681a1399_0a448493b4782967b150582570326227_0_1) 
switched from INITIALIZING to RUNNING.
2023-02-21 00:11:38,463 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Map (1/2) 
(ee83d53171224378cb74bdd5681a1399_0a448493b4782967b150582570326227_0_1) 
switched from RUNNING to FINISHED.
2023-02-21 00:11:38,466 INFO  
org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager [] 
- Received resource requirements from job c9562d7c3c921ac7d2f0383c7e250827: 
[ResourceRequirement{resourceProfile=ResourceProfile{UNKNOWN}, 
numberOfRequiredSlots=2}]
2023-02-21 00:11:38,474 INFO  org.apache.flink.runtime.jobmaster.JobMaster      
           [] - Canceling 1 un-finished executions of 
0a448493b4782967b150582570326227_0 because one of its executions has finished.
2023-02-21 00:11:38,475 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Map (1/2) 
(ee83d53171224378cb74bdd5681a1399_0a448493b4782967b150582570326227_0_0) 
switched from RUNNING to CANCELING.
2023-02-21 00:11:38,492 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Sink: Unnamed 
(1/1) (ee83d53171224378cb74bdd5681a1399_ea632d67b7d595e5b851708ae9ad79d6_0_0) 
switched from RUNNING to FINISHED.
2023-02-21 00:11:38,493 INFO  
org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager [] 
- Received resource requirements from job c9562d7c3c921ac7d2f0383c7e250827: 
[ResourceRequirement{resourceProfile=ResourceProfile{UNKNOWN}, 
numberOfRequiredSlots=1}]
2023-02-21 00:11:38,499 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Map (1/2) 
(ee83d53171224378cb74bdd5681a1399_0a448493b4782967b150582570326227_0_0) 
switched from CANCELING to CANCELED.
2023-02-21 00:11:38,500 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Job Test 
Speculative Execution Job (c9562d7c3c921ac7d2f0383c7e250827) switched from 
state RUNNING to FINISHED.
{code}

The execution log shows a detailed view of the speculative execution. The 
{{Deploying Map (1/2)}} has shown twice because its task index is 0. It will 
sleep forever in the first execution. And when we redeployed this map task, the 
attempt number is 1. This make the sum of task index and attempt number is 1, 
which isn't a even number. So the map task can bypass the sleep logic and 
finished as expect. Because the {{full spilling strategy}} we enabled before, 
only the map task will be restarted.

> Release Testing: Verify FLINK-29766 Adaptive Batch Scheduler should also work 
> with hybrid shuffle mode
> ------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-30938
>                 URL: https://issues.apache.org/jira/browse/FLINK-30938
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Runtime / Coordination
>    Affects Versions: 1.17.0
>            Reporter: Weijie Guo
>            Assignee: Yufan Sheng
>            Priority: Blocker
>              Labels: release-testing
>         Attachments: AdaptiveBatchScheduler-job-graph.png, 
> AdaptiveBatchScheduler-timeline.png, Speculative-Execution-timeline.png, 
> flink-1.17-branch-log.png, flink-dashborad-version.png, 
> taskmanager-slots.png, testAdaptiveBatchJob, testSpeculativeExecution
>
>
> This ticket aims for verifying FLINK-29766: Adaptive Batch Scheduler should 
> also work with hybrid shuffle mode.
> More details about this feature and how to use it can be found in this 
> [documentation|https://nightlies.apache.org/flink/flink-docs-master/docs/ops/batch/batch_shuffle/#hybrid-shuffle].
> The verification is divided into two parts:
> Part I: Verify hybrid shuffle can work with AdaptiveBatchScheduler
> Write a simple Flink batch job using hybrid shuffle mode and submit this job. 
> Note that in flink-1.17, AdaptiveBatchScheduler is the default scheduler for 
> batch job, so you do not need other configuration.
> Suppose your job's topology like source -> map -> sink, if your cluster have 
> enough slots, you should find that source and map are running at the same 
> time.
> Part II: Verify hybrid shuffle can work with Speculative Execution
> Write a Flink batch job using hybrid shuffle mode which has a subtask running 
> much slower than others (e.g. sleep indefinitely if it runs on a certain 
> host, the hostname can be retrieved via 
> InetAddress.getLocalHost().getHostName(), or if its (subtaskIndex + 
> attemptNumer) % 2 == 0)
> Modify Flink configuration file to enable speculative execution and tune the 
> configuration as you like
> Submit the job. Checking the web UI, logs, metrics and produced result.
> You should find that once a producer task's one subtask finished, all its 
> consumer tasks can be scheduled in log.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to