[ https://issues.apache.org/jira/browse/FLINK-12406?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Ruidong Li updated FLINK-12406: ------------------------------- Description: After each job finishes, the new {{BLOCKING_PERSISTENT}} result partitions are generated, and locations of these result partitions should be report back to client via {{JobExecutionResult}}, they will be later used for Table {{cache()}} and {{invalidateCache()}} Description - The client uses {{ExecutionEnvironment}} to submit a batch job and wait for the {{JobResult}} from {{JM}} - When a job finishes, {{BLOCKING_PERSISTENT}} {{ResultPartition}} locations will be collected from each {{ExecutionVertex}} in {{ExecutionGraph}} - On JM side, the {{BLOCKING_PERSISTENT}} {{ResultPartition}} locations flow in this path ({{ExecutionGraph}} -> {{ArchivedExecutionGraph}} -> {{JobResult}}) - On client side, A {{JobExecutionResult}} will be created with the returned {{JobResult}} - The {{ExecutionEnvironment}} sees the {{JobExecutionResult}} containing the locations, and stores them in itself Failure Handling - If any error occurs during collecting locations of {{BLOCKING_PERSISTENT}} {{ResultPartition}}, we do not terminate the process but leave incomplete locations of some {{IntermediateDataSet}}, we keep record of these IntermediateDataSetIDs and report back to client - So the Client can use these informations and decide what to do, generally the data can be read if locations are complete, or a delete request will be proposed(in later PRs) if the locations are incomplete Brief change log - Add a new class {{ResultPartitionDescriptor}}, which stores location of a {{BLOCKING_PERSISTENT}} {{ResultPartition}}, currently we only support {{ResultPartition}} in {{TaskManager}}. - Add a new class {{BlockingPersistentResultPartitionMeta}}, which contains all mappings from {{IntermediateDataSetID}} to its {{BLOCKING_PERSISTENT}} {{ResultPartition}} locations - Add a new method {{getBlockingPersistentResultPartitionMeta()}} in {{AccessExecutionGraph}}, which returns a {{BlockingPersistentResultPartitionMeta}} - Add an instance of {{BlockingPersistentResultPartitionMeta}} in {{JobExecutionResult}}, {{JobResult}}, {{ArchivedExecutionGraph}} and {{ExecutionEnvironment}} - When a job finishes, the locations will flow in this path: {{ExecutionGraph}} -> {{ArchivedExecutionGraph}} -> {{JobResult}} -> {{JobExecutionResult}} -> {{ExecutionEnvironment}} was: After each job finishes, the new {{BLOCKING_PERSISTENT}} result partitions are generated, and locations of these result partitions should be report back to client via {{JobExecutionResult}}, they will be later used for Table {{cache()}} and {{invalidateCache()}} Brief Changes: - Add a new class {{IntermediateResultDescriptor}}, which stores location of a {{BLOCKING_PERSISTENT}} {{ResultPartition}}, currently we only support {{ResultPartition}} in {{TaskManager}}. - Add a new method {{getResultPartitionDescriptors()}} in {{AccessExecutionGraph}} - Add a new filed in {{JobExecutionResult}}, {{JobResult}}, {{ArchivedExecutionGraph}} and {{ExecutionEnvironment}}, which keeps a mapping from {{IntermediateDataSetID}} to its {{ResultPartition}} locations - When a job finishes, the metadata will flow in this path: {{ExecutionGraph}} -> {{ArchivedExecutionGraph}} -> {{JobResult}} -> {{JobExecutionResult}} -> {{ExecutionEnvironment}} > Report BLOCKING_PERSISTENT result partition meta back to client > --------------------------------------------------------------- > > Key: FLINK-12406 > URL: https://issues.apache.org/jira/browse/FLINK-12406 > Project: Flink > Issue Type: Sub-task > Components: API / DataSet, Runtime / Coordination > Reporter: Ruidong Li > Assignee: Ruidong Li > Priority: Major > Labels: pull-request-available > Time Spent: 10m > Remaining Estimate: 0h > > After each job finishes, the new {{BLOCKING_PERSISTENT}} result partitions > are generated, and locations of these result partitions should be report back > to client via {{JobExecutionResult}}, they will be later used for Table > {{cache()}} and {{invalidateCache()}} > > Description > - The client uses {{ExecutionEnvironment}} to submit a batch job and wait for > the {{JobResult}} from {{JM}} > - When a job finishes, {{BLOCKING_PERSISTENT}} {{ResultPartition}} locations > will be collected from each {{ExecutionVertex}} in {{ExecutionGraph}} > - On JM side, the {{BLOCKING_PERSISTENT}} {{ResultPartition}} locations flow > in this path ({{ExecutionGraph}} -> {{ArchivedExecutionGraph}} -> > {{JobResult}}) > - On client side, A {{JobExecutionResult}} will be created with the returned > {{JobResult}} > - The {{ExecutionEnvironment}} sees the {{JobExecutionResult}} containing the > locations, and stores them in itself > Failure Handling > - If any error occurs during collecting locations of {{BLOCKING_PERSISTENT}} > {{ResultPartition}}, > we do not terminate the process but leave incomplete locations of some > {{IntermediateDataSet}}, we keep record of these IntermediateDataSetIDs and > report back to client > - So the Client can use these informations and decide what to do, generally > the data can be read if locations are complete, or a delete request will be > proposed(in later PRs) if the locations are incomplete > Brief change log > - Add a new class {{ResultPartitionDescriptor}}, which stores location of a > {{BLOCKING_PERSISTENT}} {{ResultPartition}}, currently we only support > {{ResultPartition}} in {{TaskManager}}. > - Add a new class {{BlockingPersistentResultPartitionMeta}}, which contains > all mappings from {{IntermediateDataSetID}} to its {{BLOCKING_PERSISTENT}} > {{ResultPartition}} locations > - Add a new method {{getBlockingPersistentResultPartitionMeta()}} in > {{AccessExecutionGraph}}, which returns a > {{BlockingPersistentResultPartitionMeta}} > - Add an instance of {{BlockingPersistentResultPartitionMeta}} in > {{JobExecutionResult}}, {{JobResult}}, {{ArchivedExecutionGraph}} and > {{ExecutionEnvironment}} > - When a job finishes, the locations will flow in this path: > {{ExecutionGraph}} -> {{ArchivedExecutionGraph}} -> {{JobResult}} -> > {{JobExecutionResult}} -> {{ExecutionEnvironment}} -- This message was sent by Atlassian JIRA (v7.6.3#76005)