[ 
https://issues.apache.org/jira/browse/FLINK-12406?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ruidong Li updated FLINK-12406:
-------------------------------
    Description: 
After each job finishes, the new {{BLOCKING_PERSISTENT}} result partitions are 
generated, and locations of these result partitions should be report back to 
client via {{JobExecutionResult}}, they will be later used for Table 
{{cache()}} and {{invalidateCache()}}

 

Description
- The client uses {{ExecutionEnvironment}} to submit a batch job and wait for 
the {{JobResult}} from {{JM}}
- When a job finishes, {{BLOCKING_PERSISTENT}} {{ResultPartition}} locations 
will be collected from each {{ExecutionVertex}} in {{ExecutionGraph}}
- On JM side, the {{BLOCKING_PERSISTENT}} {{ResultPartition}} locations flow in 
this path ({{ExecutionGraph}} -> {{ArchivedExecutionGraph}} -> {{JobResult}})
- On client side, A {{JobExecutionResult}} will be created with the returned 
{{JobResult}}
- The {{ExecutionEnvironment}} sees the {{JobExecutionResult}} containing the 
locations, and stores them in itself

Failure Handling
- If any error occurs during collecting locations of {{BLOCKING_PERSISTENT}} 
{{ResultPartition}}, 
we do not terminate the process but leave incomplete locations of some 
{{IntermediateDataSet}}, we keep record of these IntermediateDataSetIDs and 
report back to client 
- So the Client can use these informations and decide what to do, generally
the data can be read if locations are complete, or a delete request will be 
proposed(in later PRs) if the locations are incomplete

Brief change log
 - Add a new class {{ResultPartitionDescriptor}}, which stores location of a 
{{BLOCKING_PERSISTENT}} {{ResultPartition}}, currently we only support 
{{ResultPartition}} in {{TaskManager}}.
 - Add a new class {{BlockingPersistentResultPartitionMeta}}, which contains 
all mappings from {{IntermediateDataSetID}} to its {{BLOCKING_PERSISTENT}} 
{{ResultPartition}} locations
 - Add a new method {{getBlockingPersistentResultPartitionMeta()}} in 
{{AccessExecutionGraph}}, which returns a 
{{BlockingPersistentResultPartitionMeta}}
 - Add an instance of {{BlockingPersistentResultPartitionMeta}} in 
{{JobExecutionResult}}, {{JobResult}}, {{ArchivedExecutionGraph}} and 
{{ExecutionEnvironment}}
 - When a job finishes, the locations will flow in this path: 
{{ExecutionGraph}} -> {{ArchivedExecutionGraph}} -> {{JobResult}} -> 
{{JobExecutionResult}} -> {{ExecutionEnvironment}}

  was:
After each job finishes, the new {{BLOCKING_PERSISTENT}} result partitions are 
generated, and locations of these result partitions should be report back to 
client via {{JobExecutionResult}}, they will be later used for Table 
{{cache()}} and {{invalidateCache()}}

 

Brief Changes:
 - Add a new class {{IntermediateResultDescriptor}}, which stores location of a 
{{BLOCKING_PERSISTENT}} {{ResultPartition}}, currently we only support 
{{ResultPartition}} in {{TaskManager}}.
 - Add a new method {{getResultPartitionDescriptors()}} in 
{{AccessExecutionGraph}}
 - Add a new filed in {{JobExecutionResult}}, {{JobResult}}, 
{{ArchivedExecutionGraph}} and {{ExecutionEnvironment}}, which keeps a mapping 
from {{IntermediateDataSetID}} to its {{ResultPartition}} locations
 - When a job finishes, the metadata will flow in this path: 
{{ExecutionGraph}} -> {{ArchivedExecutionGraph}} -> {{JobResult}} -> 
{{JobExecutionResult}} -> {{ExecutionEnvironment}}


> Report BLOCKING_PERSISTENT result partition meta back to client
> ---------------------------------------------------------------
>
>                 Key: FLINK-12406
>                 URL: https://issues.apache.org/jira/browse/FLINK-12406
>             Project: Flink
>          Issue Type: Sub-task
>          Components: API / DataSet, Runtime / Coordination
>            Reporter: Ruidong Li
>            Assignee: Ruidong Li
>            Priority: Major
>              Labels: pull-request-available
>          Time Spent: 10m
>  Remaining Estimate: 0h
>
> After each job finishes, the new {{BLOCKING_PERSISTENT}} result partitions 
> are generated, and locations of these result partitions should be report back 
> to client via {{JobExecutionResult}}, they will be later used for Table 
> {{cache()}} and {{invalidateCache()}}
>  
> Description
> - The client uses {{ExecutionEnvironment}} to submit a batch job and wait for 
> the {{JobResult}} from {{JM}}
> - When a job finishes, {{BLOCKING_PERSISTENT}} {{ResultPartition}} locations 
> will be collected from each {{ExecutionVertex}} in {{ExecutionGraph}}
> - On JM side, the {{BLOCKING_PERSISTENT}} {{ResultPartition}} locations flow 
> in this path ({{ExecutionGraph}} -> {{ArchivedExecutionGraph}} -> 
> {{JobResult}})
> - On client side, A {{JobExecutionResult}} will be created with the returned 
> {{JobResult}}
> - The {{ExecutionEnvironment}} sees the {{JobExecutionResult}} containing the 
> locations, and stores them in itself
> Failure Handling
> - If any error occurs during collecting locations of {{BLOCKING_PERSISTENT}} 
> {{ResultPartition}}, 
> we do not terminate the process but leave incomplete locations of some 
> {{IntermediateDataSet}}, we keep record of these IntermediateDataSetIDs and 
> report back to client 
> - So the Client can use these informations and decide what to do, generally
> the data can be read if locations are complete, or a delete request will be 
> proposed(in later PRs) if the locations are incomplete
> Brief change log
>  - Add a new class {{ResultPartitionDescriptor}}, which stores location of a 
> {{BLOCKING_PERSISTENT}} {{ResultPartition}}, currently we only support 
> {{ResultPartition}} in {{TaskManager}}.
>  - Add a new class {{BlockingPersistentResultPartitionMeta}}, which contains 
> all mappings from {{IntermediateDataSetID}} to its {{BLOCKING_PERSISTENT}} 
> {{ResultPartition}} locations
>  - Add a new method {{getBlockingPersistentResultPartitionMeta()}} in 
> {{AccessExecutionGraph}}, which returns a 
> {{BlockingPersistentResultPartitionMeta}}
>  - Add an instance of {{BlockingPersistentResultPartitionMeta}} in 
> {{JobExecutionResult}}, {{JobResult}}, {{ArchivedExecutionGraph}} and 
> {{ExecutionEnvironment}}
>  - When a job finishes, the locations will flow in this path: 
> {{ExecutionGraph}} -> {{ArchivedExecutionGraph}} -> {{JobResult}} -> 
> {{JobExecutionResult}} -> {{ExecutionEnvironment}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to