[ https://issues.apache.org/jira/browse/FLINK-22676?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Zhu Zhu closed FLINK-22676. --------------------------- Resolution: Done Done via 62a342b647fc1eac7f87769be92fda798649d6d4 > The partition tracker should support remote shuffle properly > ------------------------------------------------------------ > > Key: FLINK-22676 > URL: https://issues.apache.org/jira/browse/FLINK-22676 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Coordination, Runtime / Network > Affects Versions: 1.14.0 > Reporter: Jin Xing > Assignee: Jin Xing > Priority: Major > Labels: pull-request-available > Fix For: 1.14.0 > > > In current Flink, data partition is bound with the ResourceID of TM in > Execution#startTrackingPartitions and partition tracker will stop tracking > corresponding partitions when a TM > disconnects(JobMaster#disconnectTaskManager), i.e. the lifecycle of shuffle > data is bound with computing resource (TM). It works fine for internal > shuffle service, but doesn't for remote shuffle service. Note that shuffle > data is accommodated on remote, the lifecycle of a completed partition is > capable to be decoupled with TM, i.e. TM is totally fine to be released when > no computing task on it and further shuffle reading requests could be > directed to remote shuffle cluster. In addition, when a TM is lost, its > completed data partitions on remote shuffle cluster could avoid reproducing. > > The issue mentioned above is because Flink JobMasterPartitionTracker mixed up > partition's locationID (where the partition is located) and tmID (which TM > the partition is produced from). In TM internal shuffle, partition's > locationID is the same with tmID, but it is not in remote shuffle; > JobMasterPartitionTracker as an independent component should be able to > differentiate locationID and tmID of a partition, thus to handle the > lifecycle of a partition properly; > We propose that JobMasterPartitionTracker manages and indexes partitions with > both locationID and tmID. The process of registration and unregistration will > be like below: > *A. Partiiton Registration* > # Execution#registerProducedPartitions registers partition to ShuffleMaster > and get a ShuffleDescriptor. Current ShuffleDescriptor#storesLocalResourcesOn > only returns the location of the producing TM if the partition occupies local > resources there. > We proposes to change a proper name of this method and always return the > locationID of the partition. It might be as below: > {code:java} > ResourceID getLocationID(); {code} > # Execution#registerProducePartitions then registers partition to > JMPartitionTracker with tmID (ResourceID of TaskManager from > TaskManagerLocation) and the locationID (acquired in step 1). > JobMasterPartitionTracker will indexes a partition with both tmID and > locationID; > *B. Invokes from JM and ShuffleMaster* > JobMasterPartitionTracker listens invokes from both JM and > ShuffleMaster. > # When JMPartitionTracker hears from JobMaster#disconnectTaskManager that a > TM disconnects, it will check whether the disconnected tmID equals to a > certain locationID of a partition. If so, tracking of the corresponding > partition will be stopped. > # When JobMasterPartitionTracker hears from ShuffleMaster that a data > location gets lost, it will unregister corresponding partitions by locationID; > *C. Partition Unregistration* > When unregister a partition, JobMasterPartitionTracker removes the > corresponding indexes to tmID and locationID firstly, and then release the > partition by shuffle service types -- > # If the locationID equals to the tmID, it indicates the partition is > accommodated by TM internal shuffle service, JMPartitionTracker will invokes > TaskExecutorGateway for the release; > # If the locationID doesn't equal to tmID, it indicates the partition is > accommodated by external shuffle service, JMPartitionTracker will invokes > ShuffleMaster for the release; -- This message was sent by Atlassian Jira (v8.3.4#803005)