[ https://issues.apache.org/jira/browse/FLINK-37833?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Venkata krishnan Sowrirajan updated FLINK-37833: ------------------------------------------------ Description: Context: # [CodegenUtils.hashCodeForType|https://github.com/apache/flink/blob/master/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/CodeGenUtils.scala#L319] function generates hash function code for [BinaryHashPartitioner|https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/partitioner/BinaryHashPartitioner.java] evaluates [BYTE_ARRAY_BASE_OFFSET|https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/data/binary/BinaryRowDataUtil.java#L31] on the JobManager’s (JM) side and executes the code in TaskManager (TM) to compute the hashcode. # [MurmurHashUtil.hashUnsafeBytes|https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/util/MurmurHashUtil.java#L52] uses Java’s _Unsafe_ API to access the byte[] (16 bytes) as ints of 4 bytes, operates on it and computes the hashcode. Note: this is what Spark does too for efficient hash computation. # JVM optimizes the record layout of the objects based on JVM memory size, JVM args (-XX:-UseCompressedOops), processor architecture (64-bit, 32-bit) etc. JVM optimizes the record layout of a byte[] and its base offset as below: # |JVM Config / Scenario|Likely ARRAY_BYTE_BASE_OFFSET| |64-bit JVM with compressed OOPs (default)|16| |64-bit JVM without compressed OOPs|24| |32-bit JVM|Typically 12 or less| |Custom JVM / non-HotSpot|Depends| # In our environment, when JM heap size was > 32G while TM heap size was < 32G. This means the ARRAY_BYTE_BASE_OFFSET of byte[] on JM was 24 while on TM was 16. # Due to the above HashPartitioner codegen bug, the offset to read from was 24 (ARRAY_BYTE_BASE_OFFSET computed on JM and sent to TM) while the base offset on TM should have been 16 given the JVM heap size is < 32G. The issue here is {_}Offset misalignment relative to base{_}. # This made the entire shuffle of the data by the byte[] key go totally wrong. > Code generated for binary key in BatchExecExchange causes incorrect shuffle > --------------------------------------------------------------------------- > > Key: FLINK-37833 > URL: https://issues.apache.org/jira/browse/FLINK-37833 > Project: Flink > Issue Type: Bug > Reporter: Venkata krishnan Sowrirajan > Priority: Major > > Context: > # > [CodegenUtils.hashCodeForType|https://github.com/apache/flink/blob/master/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/CodeGenUtils.scala#L319] > function generates hash function code for > [BinaryHashPartitioner|https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/partitioner/BinaryHashPartitioner.java] > evaluates > [BYTE_ARRAY_BASE_OFFSET|https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/data/binary/BinaryRowDataUtil.java#L31] > on the JobManager’s (JM) side and executes the code in TaskManager (TM) to > compute the hashcode. > # > [MurmurHashUtil.hashUnsafeBytes|https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/util/MurmurHashUtil.java#L52] > uses Java’s _Unsafe_ API to access the byte[] (16 bytes) as ints of 4 bytes, > operates on it and computes the hashcode. Note: this is what Spark does too > for efficient hash computation. > # JVM optimizes the record layout of the objects based on JVM memory size, > JVM args (-XX:-UseCompressedOops), processor architecture (64-bit, 32-bit) > etc. JVM optimizes the record layout of a byte[] and its base offset as below: > # > |JVM Config / Scenario|Likely ARRAY_BYTE_BASE_OFFSET| > |64-bit JVM with compressed OOPs (default)|16| > |64-bit JVM without compressed OOPs|24| > |32-bit JVM|Typically 12 or less| > |Custom JVM / non-HotSpot|Depends| > # In our environment, when JM heap size was > 32G while TM heap size was < > 32G. This means the ARRAY_BYTE_BASE_OFFSET of byte[] on JM was 24 while on TM > was 16. > # Due to the above HashPartitioner codegen bug, the offset to read from was > 24 (ARRAY_BYTE_BASE_OFFSET computed on JM and sent to TM) while the base > offset on TM should have been 16 given the JVM heap size is < 32G. The issue > here is {_}Offset misalignment relative to base{_}. > # This made the entire shuffle of the data by the byte[] key go totally > wrong. -- This message was sent by Atlassian Jira (v8.20.10#820010)