[ 
https://issues.apache.org/jira/browse/FLINK-2590?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14723434#comment-14723434
 ] 

ASF GitHub Bot commented on FLINK-2590:
---------------------------------------

Github user s1ck commented on the pull request:

    https://github.com/apache/flink/pull/1075#issuecomment-136380943
  
    There is already a test case for zipWithUniqueId() in 
https://github.com/apache/flink/blob/master/flink-tests/src/test/java/org/apache/flink/test/util/DataSetUtilsITCase.java#L66
    However, this test is under the assumption that there is only one task 
running, which is why it did not fail in the first place.
    If there are multiple tasks, the resulting unique id is not deterministic 
for a single dataset element. I would implement a test, that creates a dataset, 
applies the `zipWithUniqueId` method, calls `distinct(0)` on the created ids 
and checks the number of resulting elements (must be equal to the input 
dataset). Would this be sufficient?
    Furthermore, the current test cases for `DataSetUtils` assume a resulting 
dataset as string and check this after each test run. My proposed test would 
not fit in that scheme. Should I create a new test case class for this method?
    
    @StephanEwen I wanted to do this, but static doesn't work with anonymous 
classes. However, I can declare the UDF as a private inner class (didn't want 
to change much code).
    @HuangWHWHW the `log2` method already existed and in the issue, I proposed 
to rename it. Maybe `getBitSize(long value)`? As for the "proof": if each task 
id is smaller than the total number of parallel tasks t, its bit representation 
is also smaller than the bit representation of t. Thus, when we shift the 
counter by the number of bits of t, there cannot be a collision for different 
task ids


> DataSetUtils.zipWithUniqueID creates duplicate IDs
> --------------------------------------------------
>
>                 Key: FLINK-2590
>                 URL: https://issues.apache.org/jira/browse/FLINK-2590
>             Project: Flink
>          Issue Type: Bug
>          Components: Java API, Scala API
>    Affects Versions: 0.10, master
>            Reporter: Martin Junghanns
>            Assignee: Martin Junghanns
>            Priority: Minor
>
> The function creates IDs using the following code:
> {code:java}
> shifter = log2(numberOfParallelSubtasks)
> id = counter << shifter + taskId;
> {code}
> As the binary function + is executed before the bitshift <<, this results in 
> cases where different tasks create the same ID. It essentially calculates
> {code}
> counter*2^(shifter+taskId)
> {code}
> which is 0 for counter = 0 and all values of shifter and taskID.
> Consider the following example.
> numberOfParallelSubtaks = 8 
> shifter = log2(8) = 4 (maybe rename the function?)
> produces:
> {code}
> start: 1, shifter: 4 taskId: 4 label: 256
> start: 2, shifter: 4 taskId: 3 label: 256
> start: 4, shifter: 4 taskId: 2 label: 256
> {code}
> I would suggest the following:
> {code}
> counter*2^(shifter)+taskId
> {code}
> which in code is equivalent to
> {code}
> shifter = log2(numberOfParallelSubtasks);
> id = (counter << shifter) + taskId;
> {code}
> and for our example produces:
> {code}
> start: 1, shifter: 4 taskId: 4 label: 20
> start: 2, shifter: 4 taskId: 3 label: 35
> start: 4, shifter: 4 taskId: 2 label: 66
> {code}
> So we move the counter to the left and add the task id. As there is space for 
> 2^shifter numbers, this prevents collisions.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to