[ 
https://issues.apache.org/jira/browse/FLINK-37833?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Venkata krishnan Sowrirajan updated FLINK-37833:
------------------------------------------------
    Description: 
Context:
 # 
[CodegenUtils.hashCodeForType|https://github.com/apache/flink/blob/master/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/CodeGenUtils.scala#L319]
 function generates hash function code for 
[BinaryHashPartitioner|https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/partitioner/BinaryHashPartitioner.java]
 evaluates 
[BYTE_ARRAY_BASE_OFFSET|https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/data/binary/BinaryRowDataUtil.java#L31]
 on the JobManager’s (JM) side and executes the code in TaskManager (TM) to 
compute the hashcode.
 # 
[MurmurHashUtil.hashUnsafeBytes|https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/util/MurmurHashUtil.java#L52]
 uses Java’s _Unsafe_ API to access the byte[] (16 bytes) as ints of 4 bytes, 
operates on it and computes the hashcode. Note: this is what Spark does too for 
efficient hash computation.
 # JVM optimizes the record layout of the objects based on JVM memory size, JVM 
args (-XX:-UseCompressedOops), processor architecture (64-bit, 32-bit) etc. JVM 
optimizes the record layout of a byte[] and its base offset as below:
 # 
|JVM Config / Scenario|Likely ARRAY_BYTE_BASE_OFFSET|
|64-bit JVM with compressed OOPs (default)|16|
|64-bit JVM without compressed OOPs|24|
|32-bit JVM|Typically 12 or less|
|Custom JVM / non-HotSpot|Depends|

 # In our environment, when JM heap size was > 32G while TM heap size was < 
32G. This means the ARRAY_BYTE_BASE_OFFSET of byte[] on JM was 24 while on TM 
was 16.
 # Due to the above HashPartitioner codegen bug, the offset to read from was 24 
(ARRAY_BYTE_BASE_OFFSET computed on JM and sent to TM) while the base offset on 
TM should have been 16 given the JVM heap size is < 32G. The issue here is 
{_}Offset misalignment relative to base{_}.
 # This made the entire shuffle of the data by the byte[] key go totally wrong.

> Code generated for binary key in BatchExecExchange causes incorrect shuffle
> ---------------------------------------------------------------------------
>
>                 Key: FLINK-37833
>                 URL: https://issues.apache.org/jira/browse/FLINK-37833
>             Project: Flink
>          Issue Type: Bug
>            Reporter: Venkata krishnan Sowrirajan
>            Priority: Major
>
> Context:
>  # 
> [CodegenUtils.hashCodeForType|https://github.com/apache/flink/blob/master/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/CodeGenUtils.scala#L319]
>  function generates hash function code for 
> [BinaryHashPartitioner|https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/partitioner/BinaryHashPartitioner.java]
>  evaluates 
> [BYTE_ARRAY_BASE_OFFSET|https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/data/binary/BinaryRowDataUtil.java#L31]
>  on the JobManager’s (JM) side and executes the code in TaskManager (TM) to 
> compute the hashcode.
>  # 
> [MurmurHashUtil.hashUnsafeBytes|https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/util/MurmurHashUtil.java#L52]
>  uses Java’s _Unsafe_ API to access the byte[] (16 bytes) as ints of 4 bytes, 
> operates on it and computes the hashcode. Note: this is what Spark does too 
> for efficient hash computation.
>  # JVM optimizes the record layout of the objects based on JVM memory size, 
> JVM args (-XX:-UseCompressedOops), processor architecture (64-bit, 32-bit) 
> etc. JVM optimizes the record layout of a byte[] and its base offset as below:
>  # 
> |JVM Config / Scenario|Likely ARRAY_BYTE_BASE_OFFSET|
> |64-bit JVM with compressed OOPs (default)|16|
> |64-bit JVM without compressed OOPs|24|
> |32-bit JVM|Typically 12 or less|
> |Custom JVM / non-HotSpot|Depends|
>  # In our environment, when JM heap size was > 32G while TM heap size was < 
> 32G. This means the ARRAY_BYTE_BASE_OFFSET of byte[] on JM was 24 while on TM 
> was 16.
>  # Due to the above HashPartitioner codegen bug, the offset to read from was 
> 24 (ARRAY_BYTE_BASE_OFFSET computed on JM and sent to TM) while the base 
> offset on TM should have been 16 given the JVM heap size is < 32G. The issue 
> here is {_}Offset misalignment relative to base{_}.
>  # This made the entire shuffle of the data by the byte[] key go totally 
> wrong.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to