[ https://issues.apache.org/jira/browse/FLINK-14163?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Zhu Zhu updated FLINK-14163: ---------------------------- Description: Currently {{Execution#producedPartitions}} is assigned after the partitions have completed the registration to shuffle master in {{Execution#registerProducedPartitions(...)}}. But the task deployment process (in {{Execution#deploy()) will create {{ResultPartitionDeploymentDescriptor}} directly from {{Execution#producedPartitions}} without checking whether it's assigned. This may lead to a task deployed without its result partitions. And eventually cause the job to hang. It is not problematic at the moment when using Flink default shuffle master {{NettyShuffleMaster}} since it returns a completed future on registration. However, if the behavior is changed or if users are using a customized {{ShuffleMaster}}, it may cause problems. Besides that, {{Execution#producedPartitions}} is also used for * generating downstream task input descriptor * retrieve {{ResultPartitionID}} for partition releasing To avoid issues to happen, we may need to change all the usages of {{Execution#producedPartitions} to a callback way, e.g. change {{Execution#producedPartitions} from {{Map<IntermediateResultPartitionID, ResultPartitionDeploymentDescriptor>}} to {{CompletableFuture<Map<IntermediateResultPartitionID, ResultPartitionDeploymentDescriptor>>}} and adjust all its usages. was: Currently {{Execution#producedPartitions}} is assigned after the partitions have completed the registration to shuffle master in {{Execution#registerProducedPartitions(...)}}. But the task deployment process (in {{Execution#deploy()) will create {{ResultPartitionDeploymentDescriptor}} directly from {{Execution#producedPartitions}} without checking whether it's assigned. This may lead to a task deployed without its result partitions. And eventually cause the job to hang. It is not problematic at the moment when using Flink default shuffle master {{NettyShuffleMaster}} since it returns a completed future on registration. However, if the behavior is changed or if users are using a customized {{ShuffleMaster}}, it may cause problems. Besides that, {{Execution#producedPartitions}} is also used for * generating downstream task input descriptor * retrieve {{ResultPartitionID}} for partition releasing To avoid issues to happen, we may need to change all the usages of {{Execution#producedPartitions} to a callback way, e.g. change {{Execution#producedPartitions} from {{Map<IntermediateResultPartitionID, ResultPartitionDeploymentDescriptor>}} to {{CompletableFuture<Map<IntermediateResultPartitionID, ResultPartitionDeploymentDescriptor>>}} and adjust all its usages. > A Task should be deployed only after all its partitions have completed the > registration > --------------------------------------------------------------------------------------- > > Key: FLINK-14163 > URL: https://issues.apache.org/jira/browse/FLINK-14163 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination > Affects Versions: 1.10.0 > Reporter: Zhu Zhu > Priority: Major > Fix For: 1.10.0 > > > Currently {{Execution#producedPartitions}} is assigned after the partitions > have completed the registration to shuffle master in > {{Execution#registerProducedPartitions(...)}}. > But the task deployment process (in {{Execution#deploy()) will create > {{ResultPartitionDeploymentDescriptor}} directly from > {{Execution#producedPartitions}} without checking whether it's assigned. > This may lead to a task deployed without its result partitions. And > eventually cause the job to hang. > It is not problematic at the moment when using Flink default shuffle master > {{NettyShuffleMaster}} since it returns a completed future on registration. > However, if the behavior is changed or if users are using a customized > {{ShuffleMaster}}, it may cause problems. > Besides that, {{Execution#producedPartitions}} is also used for > * generating downstream task input descriptor > * retrieve {{ResultPartitionID}} for partition releasing > To avoid issues to happen, we may need to change all the usages of > {{Execution#producedPartitions} to a callback way, e.g. change > {{Execution#producedPartitions} from {{Map<IntermediateResultPartitionID, > ResultPartitionDeploymentDescriptor>}} to > {{CompletableFuture<Map<IntermediateResultPartitionID, > ResultPartitionDeploymentDescriptor>>}} and adjust all its usages. -- This message was sent by Atlassian Jira (v8.3.4#803005)