[ 
https://issues.apache.org/jira/browse/FLINK-5480?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15823913#comment-15823913
 ] 

ASF GitHub Bot commented on FLINK-5480:
---------------------------------------

Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3117#discussion_r96222455
  
    --- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/graph/StreamingJobGraphGeneratorNodeHashTest.java
 ---
    @@ -422,6 +425,43 @@ public void 
testManualHashAssignmentForStartNodeInInChain() throws Exception {
                env.getStreamGraph().getJobGraph();
        }
     
    +   @Test
    +   public void testUserProvidedHashing() {
    +           StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createLocalEnvironment();
    +
    +           List<String> userHashes = 
Arrays.asList("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa", 
"bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb");
    +
    +           env.addSource(new NoOpSourceFunction(), 
"src").provideAdditionalNodeHash(userHashes.get(0))
    +                           .map(new NoOpMapFunction())
    +                           .filter(new NoOpFilterFunction())
    +                           .keyBy(new NoOpKeySelector())
    +                           .reduce(new 
NoOpReduceFunction()).name("reduce").provideAdditionalNodeHash(userHashes.get(1));
    +
    +           StreamGraph streamGraph = env.getStreamGraph();
    +           int idx = 1;
    +           for (JobVertex jobVertex : 
streamGraph.getJobGraph().getVertices()) {
    +                   
Assert.assertEquals(jobVertex.getIdAlternatives().get(1).toString(), 
userHashes.get(idx));
    +                   --idx;
    +           }
    +   }
    +
    +   @Test
    +   public void testUserProvidedHashingOnChainNotSupported() {
    +           StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createLocalEnvironment();
    +
    +           env.addSource(new NoOpSourceFunction(), 
"src").provideAdditionalNodeHash("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa")
    +                           .map(new 
NoOpMapFunction()).provideAdditionalNodeHash("bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb")
    --- End diff --
    
    the fact that this fails the job should probably be documented in the 
javadocs of ```provideAdditionalNodeHash```.


> User-provided hashes for operators
> ----------------------------------
>
>                 Key: FLINK-5480
>                 URL: https://issues.apache.org/jira/browse/FLINK-5480
>             Project: Flink
>          Issue Type: Improvement
>          Components: DataStream API
>    Affects Versions: 1.2.0
>            Reporter: Stefan Richter
>            Assignee: Stefan Richter
>
> We could allow users to provided (alternative) hashes for operators in a 
> StreamGraph. This can make migration between Flink versions easier, in case 
> the automatically produced hashes between versions are incompatible. For 
> example, users could just copy the old hashes from the web ui to their job.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to