[ 
https://issues.apache.org/jira/browse/FLINK-7?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14975823#comment-14975823
 ] 

ASF GitHub Bot commented on FLINK-7:
------------------------------------

Github user ChengXiangLi commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1255#discussion_r43088529
  
    --- Diff: flink-java/src/main/java/org/apache/flink/api/java/DataSet.java 
---
    @@ -1223,6 +1230,51 @@ public long count() throws Exception {
                final TypeInformation<K> keyType = 
TypeExtractor.getKeySelectorTypes(keyExtractor, getType());
                return new PartitionOperator<T>(this, PartitionMethod.HASH, new 
Keys.SelectorFunctionKeys<T, K>(clean(keyExtractor), this.getType(), keyType), 
Utils.getCallLocationName());
        }
    +
    +   /**
    +    * Range-partitions a DataSet using the specified KeySelector.
    +    * <p>
    +    * <b>Important:</b>This operation shuffles the whole DataSet over the 
network and can take significant amount of time.
    +    *
    +    * @param keySelector The KeySelector with which the DataSet is 
range-partitioned.
    +    * @return The partitioned DataSet.
    +    *
    +    * @see KeySelector
    +    */
    +   public <K extends Comparable<K>> DataSet<T> 
partitionByRange(KeySelector<T, K> keySelector) {
    +           final TypeInformation<K> keyType = 
TypeExtractor.getKeySelectorTypes(keySelector, getType());
    +           String callLocation = Utils.getCallLocationName();
    +
    +           // Extract key from input element by keySelector.
    +           KeyExtractorMapper<T, K> keyExtractorMapper = new 
KeyExtractorMapper<T, K>(keySelector);
    --- End diff --
    
    Yes, it's very low level job abstraction, not sure whether i can get 
everything required, i didn't find any precedent of this, but it deserve a try. 
    Besides, everything required(ship strategy type / target parallelism) is 
available at `OptimizedPlan` level, so i think it should be better to inject 
the sampling and partitionID assignment code by modification of `OptimizedPlan` 
at the  begining of `JobGraphGenerator::compileJobGraph` instead of the 
previous inject point as the next comment mentioned. The previous inject point 
is at the middle stage of building `JobGraph`, and require rewriting of 
`JobGraph`,  even lower level than `OptimizedPlan`.


> [GitHub] Enable Range Partitioner
> ---------------------------------
>
>                 Key: FLINK-7
>                 URL: https://issues.apache.org/jira/browse/FLINK-7
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Distributed Runtime
>            Reporter: GitHub Import
>            Assignee: Chengxiang Li
>             Fix For: pre-apache
>
>
> The range partitioner is currently disabled. We need to implement the 
> following aspects:
> 1) Distribution information, if available, must be propagated back together 
> with the ordering property.
> 2) A generic bucket lookup structure (currently specific to PactRecord).
> Tests to re-enable after fixing this issue:
>  - TeraSortITCase
>  - GlobalSortingITCase
>  - GlobalSortingMixedOrderITCase
> ---------------- Imported from GitHub ----------------
> Url: https://github.com/stratosphere/stratosphere/issues/7
> Created by: [StephanEwen|https://github.com/StephanEwen]
> Labels: core, enhancement, optimizer, 
> Milestone: Release 0.4
> Assignee: [fhueske|https://github.com/fhueske]
> Created at: Fri Apr 26 13:48:24 CEST 2013
> State: open



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to