[ https://issues.apache.org/jira/browse/FLINK-37833?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Jiangjie Qin resolved FLINK-37833. ---------------------------------- Resolution: Fixed Patch Merged. master: 0fcde771ba23eae7f1756d8961c385c2d7a16b88 release-1.20: de4a5b71dafad1e9ca0954746d43436263c46228 release-1.19: aa277c283795953b6e8717f0e0f4f30341c05d46 > Code generated for binary key in BatchExecExchange causes incorrect shuffle > --------------------------------------------------------------------------- > > Key: FLINK-37833 > URL: https://issues.apache.org/jira/browse/FLINK-37833 > Project: Flink > Issue Type: Bug > Components: Runtime / Network > Affects Versions: 2.0.0, 1.19.2, 1.20.1 > Reporter: Venkata krishnan Sowrirajan > Assignee: Venkata krishnan Sowrirajan > Priority: Major > Labels: pull-request-available > Fix For: 2.1.0 > > > Context: > * > [CodegenUtils.hashCodeForType|https://github.com/apache/flink/blob/master/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/CodeGenUtils.scala#L319] > function generates hash function code for > [BinaryHashPartitioner|https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/partitioner/BinaryHashPartitioner.java] > evaluates > [BYTE_ARRAY_BASE_OFFSET|https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/data/binary/BinaryRowDataUtil.java#L31] > on the JobManager’s (JM) side and executes the code in TaskManager (TM) to > compute the hashcode. > * > [MurmurHashUtil.hashUnsafeBytes|https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/util/MurmurHashUtil.java#L52] > uses Java’s _Unsafe_ API to access the byte[] (16 bytes) as ints of 4 bytes, > operates on it and computes the hashcode. Note: this is what Spark does too > for efficient hash computation. > * JVM optimizes the record layout of the objects based on JVM memory size, > JVM args (-XX:-UseCompressedOops), processor architecture (64-bit, 32-bit) > etc. JVM optimizes the record layout of a byte[] and its base offset as below: > ## > |JVM Config / Scenario|Likely ARRAY_BYTE_BASE_OFFSET| > |64-bit JVM with compressed OOPs (default)|16| > |64-bit JVM without compressed OOPs|24| > |32-bit JVM|Typically 12 or less| > |Custom JVM / non-HotSpot|Depends| > * In our environment, when JM heap size was > 32G while TM heap size was < > 32G. This means the ARRAY_BYTE_BASE_OFFSET of byte[] on JM was 24 while on TM > was 16. > * Due to the above HashPartitioner codegen bug, the offset to read from was > 24 (ARRAY_BYTE_BASE_OFFSET computed on JM and sent to TM) while the base > offset on TM should have been 16 given the JVM heap size is < 32G. The issue > here is {_}Offset misalignment relative to base{_}. > * This made the entire shuffle of the data by the byte[] key go totally wrong > The fix is to not evaluate BYTE_ARRAY_BASE_OFFSET > ([CodeGenUtils.hashCodeForType|https://github.com/apache/flink/blob/master/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/CodeGenUtils.scala#L319]) > in JM rather only in TM -- This message was sent by Atlassian Jira (v8.20.10#820010)