[ https://issues.apache.org/jira/browse/FLINK-2590?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14723434#comment-14723434 ]
ASF GitHub Bot commented on FLINK-2590: --------------------------------------- Github user s1ck commented on the pull request: https://github.com/apache/flink/pull/1075#issuecomment-136380943 There is already a test case for zipWithUniqueId() in https://github.com/apache/flink/blob/master/flink-tests/src/test/java/org/apache/flink/test/util/DataSetUtilsITCase.java#L66 However, this test is under the assumption that there is only one task running, which is why it did not fail in the first place. If there are multiple tasks, the resulting unique id is not deterministic for a single dataset element. I would implement a test, that creates a dataset, applies the `zipWithUniqueId` method, calls `distinct(0)` on the created ids and checks the number of resulting elements (must be equal to the input dataset). Would this be sufficient? Furthermore, the current test cases for `DataSetUtils` assume a resulting dataset as string and check this after each test run. My proposed test would not fit in that scheme. Should I create a new test case class for this method? @StephanEwen I wanted to do this, but static doesn't work with anonymous classes. However, I can declare the UDF as a private inner class (didn't want to change much code). @HuangWHWHW the `log2` method already existed and in the issue, I proposed to rename it. Maybe `getBitSize(long value)`? As for the "proof": if each task id is smaller than the total number of parallel tasks t, its bit representation is also smaller than the bit representation of t. Thus, when we shift the counter by the number of bits of t, there cannot be a collision for different task ids > DataSetUtils.zipWithUniqueID creates duplicate IDs > -------------------------------------------------- > > Key: FLINK-2590 > URL: https://issues.apache.org/jira/browse/FLINK-2590 > Project: Flink > Issue Type: Bug > Components: Java API, Scala API > Affects Versions: 0.10, master > Reporter: Martin Junghanns > Assignee: Martin Junghanns > Priority: Minor > > The function creates IDs using the following code: > {code:java} > shifter = log2(numberOfParallelSubtasks) > id = counter << shifter + taskId; > {code} > As the binary function + is executed before the bitshift <<, this results in > cases where different tasks create the same ID. It essentially calculates > {code} > counter*2^(shifter+taskId) > {code} > which is 0 for counter = 0 and all values of shifter and taskID. > Consider the following example. > numberOfParallelSubtaks = 8 > shifter = log2(8) = 4 (maybe rename the function?) > produces: > {code} > start: 1, shifter: 4 taskId: 4 label: 256 > start: 2, shifter: 4 taskId: 3 label: 256 > start: 4, shifter: 4 taskId: 2 label: 256 > {code} > I would suggest the following: > {code} > counter*2^(shifter)+taskId > {code} > which in code is equivalent to > {code} > shifter = log2(numberOfParallelSubtasks); > id = (counter << shifter) + taskId; > {code} > and for our example produces: > {code} > start: 1, shifter: 4 taskId: 4 label: 20 > start: 2, shifter: 4 taskId: 3 label: 35 > start: 4, shifter: 4 taskId: 2 label: 66 > {code} > So we move the counter to the left and add the task id. As there is space for > 2^shifter numbers, this prevents collisions. -- This message was sent by Atlassian JIRA (v6.3.4#6332)